diff options
Diffstat (limited to 'src/clojure/contrib/stream_utils.clj')
-rw-r--r-- | src/clojure/contrib/stream_utils.clj | 276 |
1 files changed, 0 insertions, 276 deletions
diff --git a/src/clojure/contrib/stream_utils.clj b/src/clojure/contrib/stream_utils.clj deleted file mode 100644 index 35c5c90e..00000000 --- a/src/clojure/contrib/stream_utils.clj +++ /dev/null @@ -1,276 +0,0 @@ -;; Stream utilities - -;; by Konrad Hinsen -;; last updated May 3, 2009 - -;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use -;; and distribution terms for this software are covered by the Eclipse -;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -;; which can be found in the file epl-v10.html at the root of this -;; distribution. By using this software in any fashion, you are -;; agreeing to be bound by the terms of this license. You must not -;; remove this notice, or any other, from this software. - -(ns - #^{:author "Konrad Hinsen" - :doc "Functions for setting up computational pipelines via data streams. - - NOTE: This library is experimental. It may change significantly - with future release. - - This library defines: - - an abstract stream type, whose interface consists of the - multimethod stream-next - - a macro for implementing streams - - implementations of stream for - 1) Clojure sequences, and vectors - 2) nil, representing an empty stream - - tools for writing stream transformers, including the - monad stream-m - - various utility functions for working with streams - - Streams are building blocks in the construction of computational - pipelines. A stream is represented by its current state plus - a function that takes a stream state and obtains the next item - in the stream as well as the new stream state. The state is - implemented as a Java class or a Clojure type (as defined by the - function clojure.core/type), and the function is provided as an - implementation of the multimethod stream-next for this class or type. - - While setting up pipelines using this mechanism is somewhat more - cumbersome than using Clojure's lazy seq mechanisms, there are a - few advantages: - - The state of a stream can be stored in any Clojure data structure, - and the stream can be re-generated from it any number of times. - Any number of states can be stored this way. - - The elements of the stream are never cached, so keeping a reference - to a stream state does not incur an uncontrollable memory penalty. - - Note that the stream mechanism is thread-safe as long as the - concrete stream implementations do not use any mutable state. - - Stream transformers take any number of input streams and produce one - output stream. They are typically written using the stream-m - monad. In the definition of a stream transformer, (pick s) returns - the next value of stream argument s, whereas pick-all returns the - next value of all stream arguments in the form of a vector."} - clojure.contrib.stream-utils - (:refer-clojure :exclude (deftype)) - (:use [clojure.contrib.types :only (deftype deftype-)]) - (:use [clojure.contrib.monads :only (defmonad with-monad)]) - (:use [clojure.contrib.def :only (defvar defvar-)]) - (:require [clojure.contrib.seq-utils]) - (:require [clojure.contrib.generic.collection])) - - -; -; Stream type and interface -; -(defvar stream-type ::stream - "The root type for the stream hierarchy. For each stream type, - add a derivation from this type.") - -(defmacro defstream - "Define object of the given type as a stream whose implementation - of stream-next is defined by args and body. This macro adds - a type-specific method for stream-next and derives type - from stream-type." - [type-tag args & body] - `(do - (derive ~type-tag stream-type) - (defmethod stream-next ~type-tag ~args ~@body))) - -(defvar- stream-skip ::skip - "The skip-this-item value.") - -(defn- stream-skip? - "Returns true if x is the stream-skip." - [x] - (identical? x stream-skip)) - -(defmulti stream-next - "Returns a vector [next-value new-state] where next-value is the next - item in the data stream defined by stream-state and new-state - is the new state of the stream. At the end of the stream, - next-value and new-state are nil." - {:arglists '([stream-state])} - type) - -(defmethod stream-next nil - [s] - [nil nil]) - -(defmethod stream-next clojure.lang.ISeq - [s] - (if (seq s) - [(first s) (rest s)] - [nil nil])) - -(defmethod stream-next clojure.lang.IPersistentVector - [v] - (stream-next (seq v))) - -(defn stream-seq - "Return a lazy seq on the stream. Also accessible via - clojure.contrib.seq-utils/seq-on and - clojure.contrib.generic.collection/seq for streams." - [s] - (lazy-seq - (let [[v ns] (stream-next s)] - (if (nil? ns) - nil - (cons v (stream-seq ns)))))) - -(defmethod clojure.contrib.seq-utils/seq-on stream-type - [s] - (stream-seq s)) - -(defmethod clojure.contrib.generic.collection/seq stream-type - [s] - (stream-seq s)) - -; -; Stream transformers -; -(defmonad stream-m - "Monad describing stream computations. The monadic values can be - of any type handled by stream-next." - [m-result (fn m-result-stream [v] - (fn [s] [v s])) - m-bind (fn m-bind-stream [mv f] - (fn [s] - (let [[v ss :as r] (mv s)] - (if (or (nil? ss) (stream-skip? v)) - r - ((f v) ss))))) - m-zero (fn [s] [stream-skip s]) - ]) - -(defn pick - "Return the next value of stream argument n inside a stream - transformer. When used inside of defst, the name of the stream - argument can be used instead of its index n." - [n] - (fn [streams] - (let [[v ns] (stream-next (streams n))] - (if (nil? ns) - [nil nil] - [v (assoc streams n ns)])))) - -(defn pick-all - "Return a vector containing the next value of each stream argument - inside a stream transformer." - [streams] - (let [next (map stream-next streams) - values (map first next) - streams (vec (map second next))] - (if (some nil? streams) - [nil nil] - [values streams]))) - -(deftype ::stream-transformer st-as-stream - (fn [st streams] [st streams]) - seq) - -(defstream ::stream-transformer - [[st streams]] - (loop [s streams] - (let [[v ns] (st s)] - (cond (nil? ns) [nil nil] - (stream-skip? v) (recur ns) - :else [v (st-as-stream st ns)])))) - -(defmacro defst - "Define the stream transformer name by body. - The non-stream arguments args and the stream arguments streams - are given separately, with args being possibly empty." - [name args streams & body] - (if (= (first streams) '&) - `(defn ~name ~(vec (concat args streams)) - (let [~'st (with-monad stream-m ~@body)] - (st-as-stream ~'st ~(second streams)))) - `(defn ~name ~(vec (concat args streams)) - (let [~'st (with-monad stream-m - (let [~streams (range ~(count streams))] - ~@body))] - (st-as-stream ~'st ~streams))))) - -; -; Stream utilities -; -(defn stream-drop - "Return a stream containing all but the first n elements of stream." - [n stream] - (if (zero? n) - stream - (let [[_ s] (stream-next stream)] - (recur (dec n) s)))) - -; Map a function on a stream -(deftype- ::stream-map stream-map-state) - -(defstream ::stream-map - [[f stream]] - (let [[v ns] (stream-next stream)] - (if (nil? ns) - [nil nil] - [(f v) (stream-map-state [f ns])]))) - -(defmulti stream-map - "Return a new stream by mapping the function f on the given stream." - {:arglists '([f stream])} - (fn [f stream] (type stream))) - -(defmethod stream-map :default - [f stream] - (stream-map-state [f stream])) - -(defmethod stream-map ::stream-map - [f [g stream]] - (stream-map-state [(comp f g) stream])) - -; Filter stream elements -(deftype- ::stream-filter stream-filter-state) - -(defstream ::stream-filter - [[p stream]] - (loop [stream stream] - (let [[v ns] (stream-next stream)] - (cond (nil? ns) [nil nil] - (p v) [v (stream-filter-state [p ns])] - :else (recur ns))))) - -(defmulti stream-filter - "Return a new stream that contrains the elements of stream - that satisfy the predicate p." - {:arglists '([p stream])} - (fn [p stream] (type stream))) - -(defmethod stream-filter :default - [p stream] - (stream-filter-state [p stream])) - -(defmethod stream-filter ::stream-filter - [p [q stream]] - (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) - -; Flatten a stream of sequences -(deftype- ::stream-flatten stream-flatten-state) - -(defstream ::stream-flatten - [[buffer stream]] - (loop [buffer buffer - stream stream] - (if (nil? buffer) - (let [[v new-stream] (stream-next stream)] - (cond (nil? new-stream) [nil nil] - (empty? v) (recur nil new-stream) - :else (recur v new-stream))) - [(first buffer) (stream-flatten-state [(next buffer) stream])]))) - -(defn stream-flatten - "Converts a stream of sequences into a stream of the elements of the - sequences. Flattening is not recursive, only one level of nesting - will be removed." - [s] - (stream-flatten-state [nil s])) |