diff options
author | Konrad Hinsen <konrad.hinsen@laposte.net> | 2009-03-12 11:21:04 +0000 |
---|---|---|
committer | Konrad Hinsen <konrad.hinsen@laposte.net> | 2009-03-12 11:21:04 +0000 |
commit | 541a4bcfdbfe21f7a61a523a69a7dd4a91f7801b (patch) | |
tree | bf479dee51ddaeb99a2117224363311ac0b8ac3d /src/clojure/contrib | |
parent | a434ce5dfb6d9adfb8592402e0a3e009d8072f80 (diff) |
stream-utils: redesign
Diffstat (limited to 'src/clojure/contrib')
-rw-r--r-- | src/clojure/contrib/stream_utils.clj | 399 | ||||
-rw-r--r-- | src/clojure/contrib/stream_utils/examples.clj | 103 |
2 files changed, 290 insertions, 212 deletions
diff --git a/src/clojure/contrib/stream_utils.clj b/src/clojure/contrib/stream_utils.clj index c2bdc101..75b8568d 100644 --- a/src/clojure/contrib/stream_utils.clj +++ b/src/clojure/contrib/stream_utils.clj @@ -1,7 +1,7 @@ ;; Stream utilities ;; by Konrad Hinsen -;; last updated March 2, 2009 +;; last updated March 11, 2009 ;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use ;; and distribution terms for this software are covered by the Eclipse @@ -18,172 +18,177 @@ with future release. This library defines: - - the multimethod stream-next as consumer interface to data streams - - implementations of stream-next for three data stream types: - 1) Clojure seqs, sequences, and vectors - 2) Stream generator functions - 3) nil representing an empty stream - - the monad stream-m for writing stream transformers - - macros defst-seq and defst-gen for writing monad-based stream - transformers with a seq or a generator function interface - - various utility functions for working with data streams - - Stream generator functions are called with a single argument representing - the end-of-stream sentinel value. At each call, they return the next - element of their data stream and the new stream state. When the end of - the stream is reached, they return the passed-in end-of-stream object. - The new stream state is typically a closure. While stream generator - functions are less flexible than lazy seqs (because they cannot be used - with Clojure's seq-handling functions), they have a few advantages: - - The data stream is never cached. + - an abstract stream type, whose interface consists of the + multimethod stream-next + - a macro for implementing streams + - implementations of stream for + 1) Clojure sequences, and vectors + 2) nil, representing an empty stream + - tools for writing stream transformers, including the monad stream-m + - various utility functions for working with streams + + Streams are building blocks in the construction of computational + pipelines. A stream is represented by its current state plus + a function that takes a stream state and obtains the next item + in the stream as well as the new stream state. The state is implemented + as a Java class or a Clojure type (as defined by the function + clojure.core/type), and the function is provided as an implementation + of the multimethod stream-next for this class or type. + + While setting up pipelines using this mechanism is somewhat more + cumbersome than using Clojure's lazy seq mechanisms, there are a + few advantages: - The state of a stream can be stored in any Clojure data structure, and the stream can be re-generated from it any number of times. - Nothing prevents a generator function from storing the stream state - in a mutable data structure and just return itself as the new state, - but such functions are neither thread-safe nor safe to be used to - reproduce their stream more than once. - + Any number of states can be stored this way. + - The elements of the stream are never cached, so keeping a reference + to a stream state does not incur an uncontrollable memory penalty. + + Note that the stream mechanism is thread-safe as long as the + concrete stream implementations do not use any mutable state. + Stream transformers take any number of input streams and produce one output stream. They are typically written using the stream-m - monad. Input streams can be defined by any value that stream-next - can handle. The output stream can have a (lazy) seq interface or - a generator function interface. In the definition of a stream - transformer, (pick s) returns the next value of stream argument s, - whereas pick-all returns the next value of all stream arguments - in the form of a vector." - - (:use [clojure.contrib.monads :only (defmonad with-monad)])) - - -(let [eos (Object.) - skip (Object.)] - - (defn stream-eos? - "Returns true if x is the special end-of-stream value used by - stream-next and the stream-m monad." - [x] - (identical? x eos)) - - (defn stream-skip? - "Returns true if x is the special skip value used by the stream-m monad." - [x] - (identical? x skip)) - - (defmulti stream-next - "Returns a vector of length two whose first element is the next - item in the data stream defined by stream-state and whose second - element is the new state of the stream. At the end of the stream, - the returned value is a special end-of-stream object for which - stream-eos? returns true." - {:arglists '([stream-state])} - class) - - (defmethod stream-next nil - [s] - [eos nil]) - - (defmethod stream-next clojure.lang.ISeq - [s] - (if (seq s) - [(first s) (rest s)] - [eos nil])) - - (defmethod stream-next clojure.lang.IPersistentVector - [v] - (stream-next (seq v))) - - (defmethod stream-next clojure.lang.Fn - [g] - (g eos)) - - (defmonad stream-m - "Monad describing stream computations. The monadic values can be - of any type handled by stream-next." - [m-result (fn m-result-stream [v] - (fn [s] [v s])) - m-bind (fn m-bind-stream [mv f] - (fn [s] - (let [[v ss] (mv s)] - (if (or (stream-eos? v) (stream-skip? v)) - [v ss] - ((f v) ss))))) - m-zero (fn [s] [skip s]) - ]) - - (defn pick - "Return the next value of stream argument n inside a stream - transformer. When used inside of defst, the name of the stream - argument can be used instead of its index n." - [n] - (fn [streams] - (let [[v ns] (stream-next (streams n))] - [v (assoc streams n ns)]))) - - (defn pick-all - "Return a vector containing the next value of each stream argument - inside a stream transformer." - [streams] + monad. In the definition of a stream transformer, (pick s) returns + the next value of stream argument s, whereas pick-all returns the + next value of all stream arguments in the form of a vector." + + (:use [clojure.contrib.types :only (deftype)]) + (:use [clojure.contrib.monads :only (defmonad with-monad)]) + (:use [clojure.contrib.def :only (defvar defvar-)]) + (:require [clojure.contrib.seq-utils])) + + +; +; Stream type and interface +; +(defvar stream-type ::stream + "The root type for the stream hierarchy. For each stream type, + add a derivation from this type.") + +(defmacro defstream + "Define object of the given type as a stream whose implementation + of stream-next is defined by args and body. This macro adds + a type-specific method for stream-next and derives type + from stream-type." + [type-tag args & body] + `(do + (derive ~type-tag stream-type) + (defmethod stream-next ~type-tag ~args ~@body))) + +(defvar- stream-skip ::skip + "The skip-this-item value.") + +(defn- stream-skip? + "Returns true if x is the stream-skip." + [x] + (identical? x stream-skip)) + +(defmulti stream-next + "Returns a vector [next-value new-state] where next-value is the next + item in the data stream defined by stream-state and new-state + is the new state of the stream. At the end of the stream, + next-value and new-state are nil." + {:arglists '([stream-state])} + type) + +(defmethod stream-next nil + [s] + [nil nil]) + +(defmethod stream-next clojure.lang.ISeq + [s] + (if (seq s) + [(first s) (rest s)] + [nil nil])) + +(defmethod stream-next clojure.lang.IPersistentVector + [v] + (stream-next (seq v))) + +(defn stream-seq + "Return a lazy seq on the stream. Also accessible via + clojure.contrib.seq-utils/seq-on for streams." + [s] + (lazy-seq + (let [[v ns] (stream-next s)] + (if (nil? ns) + nil + (cons v (stream-seq ns)))))) + +(defmethod clojure.contrib.seq-utils/seq-on stream-type + [s] + (stream-seq s)) + +; +; Stream transformers +; +(defmonad stream-m + "Monad describing stream computations. The monadic values can be + of any type handled by stream-next." + [m-result (fn m-result-stream [v] + (fn [s] [v s])) + m-bind (fn m-bind-stream [mv f] + (fn [s] + (let [[v ss :as r] (mv s)] + (if (or (nil? ss) (stream-skip? v)) + r + ((f v) ss))))) + m-zero (fn [s] [stream-skip s]) + ]) + +(defn pick + "Return the next value of stream argument n inside a stream + transformer. When used inside of defst, the name of the stream + argument can be used instead of its index n." + [n] + (fn [streams] + (let [[v ns] (stream-next (streams n))] + (if (nil? ns) + [nil nil] + [v (assoc streams n ns)])))) + +(defn pick-all + "Return a vector containing the next value of each stream argument + inside a stream transformer." + [streams] + (let [next (map stream-next streams) + values (map first next) + streams (vec (map second next))] (if (some nil? streams) - [eos streams] - (let [next (map stream-next streams) - values (map first next) - streams (vec (map second next))] - (if (some stream-eos? values) - [eos streams] - [values streams])))) - - (defn st-as-seq - "Takes a stream transformer expression st (typically written using the - stream-m monad) and a vector of stream arguments and returns a lazy - seq representing the output stream of the transformer." - [st streams] - (lazy-seq - (loop [s streams] - (let [[v ns] (st s)] - (cond (stream-eos? v) nil - (stream-skip? v) (recur ns) - :else (cons v (st-as-seq st ns))))))) - - (defn st-as-generator - "Takes a stream transformer expression st (typically written using the - stream-m monad) and a vector of stream arguments and returns a stream - generator function representing the output stream of the transformer." - [st streams] - (letfn [(make-gen [s] - (fn [eos] - (loop [s s] - (let [[v ns] (st s)] - (cond (stream-eos? v) [eos nil] - (stream-skip? v) (recur ns) - :else [v (make-gen ns)])))))] - (make-gen streams))) -) - -(defn- defst [wrapper name args streams body] + [nil nil] + [values streams]))) + +(deftype ::stream-transformer st-as-stream + (fn [st streams] [st streams]) + seq) + +(defstream ::stream-transformer + [[st streams]] + (loop [s streams] + (let [[v ns] (st s)] + (cond (nil? ns) [nil nil] + (stream-skip? v) (recur ns) + :else [v (st-as-stream st ns)])))) + +(defmacro defst + "Define the stream transformer name by body. + The non-stream arguments args and the stream arguments streams + are given separately, with args being possibly empty." + [name args streams & body] (if (= (first streams) '&) `(defn ~name ~(vec (concat args streams)) (let [~'st (with-monad stream-m ~@body)] - (~wrapper ~'st ~(second streams)))) + (st-as-stream ~'st ~(second streams)))) `(defn ~name ~(vec (concat args streams)) (let [~'st (with-monad stream-m - (let [~streams (range ~(count streams))] + (let [~streams (range ~(count streams))] ~@body))] - (~wrapper ~'st ~streams))))) - -(defmacro defst-seq - "Define the seq-returning stream transformer name by body. - The non-stream arguments args and the stream arguments streams - are given separately, with args being possibly empty." - [name args streams & body] - (defst `st-as-seq name args streams body)) - -(defmacro defst-gen - "Define the generator-returning stream transformer name by body. - The non-stream arguments args and the stream arguments streams - are given separately, with args being possibly empty." - [name args streams & body] - (defst `st-as-generator name args streams body)) + (st-as-stream ~'st ~streams))))) +; +; Stream utilities +; (defn stream-drop "Return a stream containing all but the first n elements of stream." [n stream] @@ -192,29 +197,71 @@ (let [[_ s] (stream-next stream)] (recur (dec n) s)))) -(defn stream-as-seq - "Return a lazy seq of the stream s." - [s] - (lazy-seq - (let [[v ns] (stream-next s)] - (if (stream-eos? v) - nil - (cons v (stream-as-seq ns)))))) +; Map a function on a stream +(deftype ::stream-map stream-map-state) + +(defstream ::stream-map + [[f stream]] + (let [[v ns] (stream-next stream)] + (if (nil? ns) + [nil nil] + [(f v) (stream-map-state [f ns])]))) + +(defmulti stream-map + "Return a new stream by mapping the function f on the given stream." + {:arglists '([f stream])} + (fn [f stream] (type stream))) + +(defmethod stream-map :default + [f stream] + (stream-map-state [f stream])) + +(defmethod stream-map ::stream-map + [f [g stream]] + (stream-map-state [(comp f g) stream])) + +; Filter stream elements +(deftype ::stream-filter stream-filter-state) + +(defstream ::stream-filter + [[p stream]] + (loop [stream stream] + (let [[v ns] (stream-next stream)] + (cond (nil? ns) [nil nil] + (p v) [v (stream-filter-state [p ns])] + :else (recur ns))))) + +(defmulti stream-filter + "Return a new stream that contrains the elements of stream + that satisfy the predicate p." + {:arglists '([p stream])} + (fn [p stream] (type stream))) + +(defmethod stream-filter :default + [p stream] + (stream-filter-state [p stream])) + +(defmethod stream-filter ::stream-filter + [p [q stream]] + (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) + +; Flatten a stream of sequences +(deftype ::stream-flatten stream-flatten-state) + +(defstream ::stream-flatten + [[buffer stream]] + (loop [buffer buffer + stream stream] + (if (nil? buffer) + (let [[v new-stream] (stream-next stream)] + (cond (nil? new-stream) [nil nil] + (empty? v) (recur nil new-stream) + :else (recur v new-stream))) + [(first buffer) (stream-flatten-state [(next buffer) stream])]))) (defn stream-flatten "Converts a stream of sequences into a stream of the elements of the - sequences. Flattening is not recursive, only one level of sequences + sequences. Flattening is not recursive, only one level of nesting will be removed." [s] - (letfn [(buffer-gen [buffer stream] - (fn [eos] - (loop [buffer buffer - stream stream] - (if (nil? buffer) - (let [[v new-stream] (stream-next stream)] - (cond (stream-eos? v) [eos nil] - (empty? v) (recur nil new-stream) - :else (recur v new-stream))) - [(first buffer) (buffer-gen (next buffer) stream)]))))] - (buffer-gen nil s))) - + (stream-flatten-state [nil s])) diff --git a/src/clojure/contrib/stream_utils/examples.clj b/src/clojure/contrib/stream_utils/examples.clj index cc19f2c4..10207fff 100644 --- a/src/clojure/contrib/stream_utils/examples.clj +++ b/src/clojure/contrib/stream_utils/examples.clj @@ -8,74 +8,105 @@ (ns clojure.contrib.stream-utils.examples (:use [clojure.contrib.stream-utils - :only (defst-seq defst-gen pick pick-all stream-as-seq)]) - (:use [clojure.contrib.monads :only (domonad)])) + :only (defst stream-next + pick pick-all + stream-type defstream + stream-drop stream-map stream-filter stream-flatten)]) + (:use [clojure.contrib.monads :only (domonad)]) + (:use [clojure.contrib.seq-utils :only (seq-on)]) + (:use [clojure.contrib.types :only (deftype)])) -; Transform a stream of numbers into a stream of sums of -; two consecutive numbers. -(defst-seq sum-two [] [xs] - (domonad - [x1 (pick xs) - x2 (pick xs)] - (+ x1 x2))) +; +; Define a stream of Fibonacci numbers +; +(deftype ::fib-stream last-two-fib) + +(defstream ::fib-stream + [fs] + (let [[n1 n2] fs] + [n1 (last-two-fib [n2 (+ n1 n2)])])) + +(def fib-stream (last-two-fib [0 1])) + +(take 10 (seq-on fib-stream)) -(sum-two '(1 2 3 4 5 6 7 8)) +; +; A simple random number generator, implemented as a stream +; +(deftype ::random-seed rng-seed vector seq) -; The same example, but with a generator interface for the output stream -(defst-gen sum-two-gen [] [xs] +(defstream ::random-seed + [seed] + (let [[seed] seed + m 259200 + value (/ (float seed) (float m)) + next (rem (+ 54773 (* 7141 seed)) m)] + [value (rng-seed next)])) + +(take 10 (seq-on (rng-seed 1))) + +; +; Various stream utilities +; +(take 10 (seq-on (stream-drop 10 (rng-seed 1)))) +(seq-on (stream-map inc (range 5))) +(seq-on (stream-filter odd? (range 10))) +(seq-on (stream-flatten (partition 3 (range 9)))) + +; +; Stream transformers +; + +; Transform a stream of numbers into a stream of sums of two +; consecutive numbers. +(defst sum-two [] [xs] (domonad [x1 (pick xs) x2 (pick xs)] (+ x1 x2))) -(def g (sum-two-gen '(1 2 3 4 5 6 7 8))) -(let [[v1 g] (g :eos)] - (let [[v2 g] (g :eos)] - (let [[v3 g] (g :eos)] - (let [[v4 g] (g :eos)] - (let [[v5 g] (g :eos)] +(def s (sum-two '(1 2 3 4 5 6 7 8))) + +(let [[v1 s] (stream-next s)] + (let [[v2 s] (stream-next s)] + (let [[v3 s] (stream-next s)] + (let [[v4 s] (stream-next s)] + (let [[v5 s] (stream-next s)] [v1 v2 v3 v4 v5]))))) +(seq-on s) + ; Map (for a single stream) written as a stream transformer -(defst-seq my-map-1 [f] [xs] +(defst my-map-1 [f] [xs] (domonad [x (pick xs)] (f x))) -(my-map-1 inc [1 2 3]) +(seq-on (my-map-1 inc [1 2 3])) ; Map for two stream arguments -(defst-seq my-map-2 [f] [xs ys] +(defst my-map-2 [f] [xs ys] (domonad [x (pick xs) y (pick ys)] (f x y))) -(my-map-2 + '(1 2 3 4) '(10 20 30 40)) +(seq-on (my-map-2 + '(1 2 3 4) '(10 20 30 40))) ; Map for any number of stream arguments -(defst-seq my-map [f] [& streams] +(defst my-map [f] [& streams] (domonad [vs pick-all] (apply f vs))) -(my-map inc [1 2 3]) -(my-map + '(1 2 3 4) '(10 20 30 40)) +(seq-on (my-map inc [1 2 3])) +(seq-on (my-map + '(1 2 3 4) '(10 20 30 40))) ; Filter written as a stream transformer -(defst-seq my-filter [p] [xs] +(defst my-filter [p] [xs] (domonad [x (pick xs) :when (p x)] x)) -(my-filter odd? [1 2 3]) - -; A simple random number generator, implemented as a generator function -(defn rng [seed] - (fn [eos] - (let [m 259200 - value (/ (float seed) (float m)) - next (rem (+ 54773 (* 7141 seed)) m)] - [value (rng next)]))) +(seq-on (my-filter odd? [1 2 3])) -(take 10 (stream-as-seq (rng 1))) |