diff options
Diffstat (limited to 'src/clojure/contrib/stream_utils.clj')
-rw-r--r-- | src/clojure/contrib/stream_utils.clj | 375 |
1 files changed, 208 insertions, 167 deletions
diff --git a/src/clojure/contrib/stream_utils.clj b/src/clojure/contrib/stream_utils.clj index fd4e6a04..605426db 100644 --- a/src/clojure/contrib/stream_utils.clj +++ b/src/clojure/contrib/stream_utils.clj @@ -1,13 +1,7 @@ ;; Stream utilities ;; by Konrad Hinsen -;; last updated January 25, 2009 - -;; Note: this module is at least as experimental as Clojure's stream -;; facilities. It may change significantly, change name, or disappear -;; in the future. Don't rely on it for your applications. I put this -;; module in clojure.contrib so that it can serve as a testground and -;; a basis for discussions ont the group. +;; last updated February 23, 2009 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use ;; and distribution terms for this software are covered by the Eclipse @@ -18,163 +12,210 @@ ;; remove this notice, or any other, from this software. (ns clojure.contrib.stream-utils - (:use [clojure.contrib.monads])) - -;; Streams - -; Stream of integers -(defn int-stream - "Return a stream of integers starting from init and incrementing by step - until end, the first value not returned. The defaults are 0, 1, and nil, - where nil stands for an infinite stream." - ([] (int-stream 0 1 nil)) - ([init] (int-stream init 1 nil)) - ([init step] (int-stream init step nil)) - ([init step end] - (let [n (into-array [init])] - (stream - (fn [eos] - (let [current (aget n 0)] - (if (or (nil? end) (< current end)) - (do - (aset n 0 (+ current step)) - current) - eos))))))) - -;; Stream transformers - -(defn drop-first - "Return a stream that returns the elements of s except for the first n ones." - [n s] - (let [eos (Object.) - iter (stream-iter s) - gen (fn [eos] (next! iter eos))] - (doseq [_ (range n)] (gen eos)) - (stream gen))) - -;; Monadic stream transformers - -; Generator monad -; -; This monad represents computations with generators, as defined for -; Clojure streams: a function of one argument, an end-of-stream value, -; that returns at each call the "next" value of the stream, until the -; end of the stream is reached, and from then on it returns the supplied -; eos value forever. This monad permits the simple composition of such -; generators. -; -; The use of generators instead of streams as the basis of this monad -; has two reasons: -; 1) It is not possible to pass around a stream such that each -; monadic computation step takes one item of it, because there can -; be only one iterator per stream. -; 2) One could use iterators instead of generators as the monadic -; values, even with minimal changes, but each computational step -; in the monad would then create a new stream and a new iter, -; adding overhead for no apparent benefit. - -(defmonad generator - "Monad describing stream-transforming computations. The monadic - values are generator functions." - [m-result (fn m-result-generator [v] - (fn [eos] v)) - m-bind (fn m-bind-generator [mv f] - (fn [eos] - (let [v (mv eos)] - (if (identical? v eos) - eos - ((f v) eos))))) - ]) - -; A macro for defining stream transformers using the generator monad. -; It wraps the input streams into iters-that-look-like-generators, -; and converts the resulting generator into a stream. -(defmacro stream-transformer - [stream-args & body] - `(stream (let [~stream-args (map stream-iter ~stream-args)] - (with-monad generator ~@body)))) - - -; Derive a variant of the generator monad that permits the -; use of :when conditions. Invalid values in the output are -; represented by ::skip, which is filtered out in the end -; by applying remove-skip. The macro stream-transformer-cond -; takes care of all these administrative details. -(def generator-cond (maybe-t generator ::skip :m-plus-from-maybe)) - -(defn remove-skip [g] - (fn [eos] - (loop [v (g eos)] - (if (identical? v ::skip) - (recur (g eos)) - v)))) - -(defmacro stream-transformer-cond - [stream-args & body] - `(stream (remove-skip - (let [~stream-args (map stream-iter ~stream-args)] - (with-monad generator-cond ~@body))))) - - -;; Some examples of monadic stream transformers - -(defn skip-s [s] - "Return a stream that skips every second item of s" - (stream-transformer [s] - (domonad - [x1 s - x2 s] - x2))) - -(defn partition-s [n s] - "Return a stream in which each element is a sequence of n consecutive - elements of the input stream s." - (stream-transformer [s] - (m-seq (replicate n s)))) - -; Map a function on streams. -; Map has to be defined by an explicit expression for every number of -; arguments because the m-lift macro requires a constant as its first -; argument. -(defn map-s - "Return a stream of the results of applying f on one item of each - input sequence." - ([f s] - (stream-transformer [s] - ((m-lift 1 f) s))) - ([f s1 s2] - (stream-transformer [s1 s2] - ((m-lift 2 f) s1 s2))) - ([f s1 s2 s3] - (stream-transformer [s1 s2 s3] - ((m-lift 3 f) s1 s2 s3))) - ([f s1 s2 s3 s4] - (stream-transformer [s1 s2 s3 s4] - ((m-lift 4 f) s1 s2 s3 s4))) - ([f s1 s2 s3 s4 s5] - (stream-transformer [s1 s2 s3 s4 s5] - ((m-lift 5 f) s1 s2 s3 s4 s5)))) - -(defn filter-s [p s] - "Return a stream consisting of the elements of s that satisfy predicate p." - (stream-transformer-cond [s] - (domonad - [x s :when (p x)] - x))) - -(comment - -; Some example expressions - -(take 10 (drop-first 5 (int-stream))) - -(take 10 (skip-s (int-stream))) - -(take 10 (partition-s 3 (int-stream))) - -(take 10 (map-s inc (int-stream))) -(take 10 (map-s + (int-stream) (map-s inc (int-stream)))) - -(take 10 (filter-s #(zero? (rem % 3)) (int-stream))) - -)
\ No newline at end of file + "Functions for setting up computational pipelines via data streams. + + NOTE: This library is experimental. It may change significantly + with future release. + + This library defines: + - the multimethod stream-next as consumer interface to data streams + - implementations of stream-next for three data stream types: + 1) Clojure seqs, sequences, and vectors + 2) Stream generator functions + 3) nil representing an empty stream + - the monad stream-m for writing stream transformers + - macros defst-seq and defst-gen for writing monad-based stream + transformers with a seq or a generator function interface + - various utility functions for working with data streams + + Stream generator functions are called with a single argument representing + the end-of-stream sentinel value. At each call, they return the next + element of their data stream and the new stream state. When the end of + the stream is reached, they return the passed-in end-of-stream object. + The new stream state is typically a closure. While stream generator + functions are less flexible than lazy seqs (because they cannot be used + with Clojure's seq-handling functions), they have a few advantages: + - The data stream is never cached. + - The state of a stream can be stored in any Clojure data structure, + and the stream can be re-generated from it any number of times. + Nothing prevents a generator function from storing the stream state + in a mutable data structure and just return itself as the new state, + but such functions are neither thread-safe nor safe to be used to + reproduce their stream more than once. + + Stream transformers take any number of input streams and produce one + output stream. They are typically written using the stream-m + monad. Input streams can be defined by any value that stream-next + can handle. The output stream can have a (lazy) seq interface or + a generator function interface. In the definition of a stream + transformer, (pick s) returns the next value of stream argument s, + whereas pick-all returns the next value of all stream arguments + in the form of a vector." + + (:use [clojure.contrib.monads]) + (:use [clojure.contrib.macros :only (letfn)])) + + +(let [eos (Object.) + skip (Object.)] + + (defn stream-eos? + "Returns true if x is the special end-of-stream value used by + stream-next and the stream-m monad." + [x] + (identical? x eos)) + + (defn stream-skip? + "Returns true if x is the special skip value used by the stream-m monad." + [x] + (identical? x skip)) + + (defmulti stream-next + "Returns a vector of length two whose first element is the next + item in the data stream defined by stream-state and whose second + element is the new state of the stream. At the end of the stream, + the returned value is a special end-of-stream object for which + stream-eos? returns true." + {:arglists '([stream-state])} + class) + + (defmethod stream-next nil + [s] + [eos nil]) + + (defmethod stream-next clojure.lang.ISeq + [s] + (if (seq s) + [(first s) (rest s)] + [eos nil])) + + (defmethod stream-next clojure.lang.IPersistentVector + [v] + (stream-next (seq v))) + + (defmethod stream-next clojure.lang.Fn + [g] + (g eos)) + + (defmonad stream-m + "Monad describing stream computations. The monadic values can be + of any type handled by stream-next." + [m-result (fn m-result-stream [v] + (fn [s] [v s])) + m-bind (fn m-bind-stream [mv f] + (fn [s] + (let [[v ss] (mv s)] + (if (or (stream-eos? v) (stream-skip? v)) + [v ss] + ((f v) ss))))) + m-zero (fn [s] [skip s]) + ]) + + (defn pick + "Return the next value of stream argument n inside a stream + transformer. When used inside of defst, the name of the stream + argument can be used instead of its index n." + [n] + (fn [streams] + (let [[v ns] (stream-next (streams n))] + [v (assoc streams n ns)]))) + + (defn pick-all + "Return a vector containing the next value of each stream argument + inside a stream transformer." + [streams] + (if (some nil? streams) + [eos streams] + (let [next (map stream-next streams) + values (map first next) + streams (vec (map second next))] + (if (some stream-eos? values) + [eos streams] + [values streams])))) + + (defn st-as-seq + "Takes a stream transformer expression st (typically written using the + stream-m monad) and a vector of stream arguments and returns a lazy + seq representing the output stream of the transformer." + [st streams] + (lazy-seq + (loop [s streams] + (let [[v ns] (st s)] + (cond (stream-eos? v) nil + (stream-skip? v) (recur ns) + :else (cons v (st-as-seq st ns))))))) + + (defn st-as-generator + "Takes a stream transformer expression st (typically written using the + stream-m monad) and a vector of stream arguments and returns a stream + generator function representing the output stream of the transformer." + [st streams] + (letfn [make-gen [s] + (fn [eos] + (loop [s s] + (let [[v ns] (st s)] + (cond (stream-eos? v) [eos nil] + (stream-skip? v) (recur ns) + :else [v (make-gen ns)]))))] + (make-gen streams))) +) + +(defn- defst [wrapper name args streams body] + (if (= (first streams) '&) + `(defn ~name ~(vec (concat args streams)) + (let [~'st (with-monad stream-m ~@body)] + (~wrapper ~'st ~(second streams)))) + `(defn ~name ~(vec (concat args streams)) + (let [~'st (with-monad stream-m + (let [~streams (range ~(count streams))] + ~@body))] + (~wrapper ~'st ~streams))))) + +(defmacro defst-seq + "Define the seq-returning stream transformer name by body. + The non-stream arguments args and the stream arguments streams + are given separately, with args being possibly empty." + [name args streams & body] + (defst 'st-as-seq name args streams body)) + +(defmacro defst-gen + "Define the generator-returning stream transformer name by body. + The non-stream arguments args and the stream arguments streams + are given separately, with args being possibly empty." + [name args streams & body] + (defst 'st-as-generator name args streams body)) + +(defn stream-drop + "Return a stream containing all but the first n elements of stream." + [n stream] + (if (zero? n) + stream + (let [[_ s] (stream-next stream)] + (recur (dec n) s)))) + +(defn stream-as-seq + "Return a lazy seq of the stream s." + [s] + (lazy-seq + (let [[v ns] (stream-next s)] + (if (stream-eos? v) + nil + (cons v (stream-as-seq ns)))))) + +(defn stream-flatten + "Converts a stream of sequences into a stream of the elements of the + sequences. Flattening is not recursive, only one level of sequences + will be removed." + [s] + (letfn [buffer-gen [buffer stream] + (fn [eos] + (loop [buffer buffer + stream stream] + (if (nil? buffer) + (let [[v new-stream] (stream-next stream)] + (cond (stream-eos? v) [eos nil] + (empty? v) (recur nil new-stream) + :else (recur v new-stream))) + [(first buffer) (buffer-gen (next buffer) stream)])))] + (buffer-gen nil s))) + |