aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonrad Hinsen <konrad.hinsen@laposte.net>2009-03-12 11:21:04 +0000
committerKonrad Hinsen <konrad.hinsen@laposte.net>2009-03-12 11:21:04 +0000
commit541a4bcfdbfe21f7a61a523a69a7dd4a91f7801b (patch)
treebf479dee51ddaeb99a2117224363311ac0b8ac3d
parenta434ce5dfb6d9adfb8592402e0a3e009d8072f80 (diff)
stream-utils: redesign
-rw-r--r--src/clojure/contrib/stream_utils.clj399
-rw-r--r--src/clojure/contrib/stream_utils/examples.clj103
2 files changed, 290 insertions, 212 deletions
diff --git a/src/clojure/contrib/stream_utils.clj b/src/clojure/contrib/stream_utils.clj
index c2bdc101..75b8568d 100644
--- a/src/clojure/contrib/stream_utils.clj
+++ b/src/clojure/contrib/stream_utils.clj
@@ -1,7 +1,7 @@
;; Stream utilities
;; by Konrad Hinsen
-;; last updated March 2, 2009
+;; last updated March 11, 2009
;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use
;; and distribution terms for this software are covered by the Eclipse
@@ -18,172 +18,177 @@
with future release.
This library defines:
- - the multimethod stream-next as consumer interface to data streams
- - implementations of stream-next for three data stream types:
- 1) Clojure seqs, sequences, and vectors
- 2) Stream generator functions
- 3) nil representing an empty stream
- - the monad stream-m for writing stream transformers
- - macros defst-seq and defst-gen for writing monad-based stream
- transformers with a seq or a generator function interface
- - various utility functions for working with data streams
-
- Stream generator functions are called with a single argument representing
- the end-of-stream sentinel value. At each call, they return the next
- element of their data stream and the new stream state. When the end of
- the stream is reached, they return the passed-in end-of-stream object.
- The new stream state is typically a closure. While stream generator
- functions are less flexible than lazy seqs (because they cannot be used
- with Clojure's seq-handling functions), they have a few advantages:
- - The data stream is never cached.
+ - an abstract stream type, whose interface consists of the
+ multimethod stream-next
+ - a macro for implementing streams
+ - implementations of stream for
+ 1) Clojure sequences, and vectors
+ 2) nil, representing an empty stream
+ - tools for writing stream transformers, including the monad stream-m
+ - various utility functions for working with streams
+
+ Streams are building blocks in the construction of computational
+ pipelines. A stream is represented by its current state plus
+ a function that takes a stream state and obtains the next item
+ in the stream as well as the new stream state. The state is implemented
+ as a Java class or a Clojure type (as defined by the function
+ clojure.core/type), and the function is provided as an implementation
+ of the multimethod stream-next for this class or type.
+
+ While setting up pipelines using this mechanism is somewhat more
+ cumbersome than using Clojure's lazy seq mechanisms, there are a
+ few advantages:
- The state of a stream can be stored in any Clojure data structure,
and the stream can be re-generated from it any number of times.
- Nothing prevents a generator function from storing the stream state
- in a mutable data structure and just return itself as the new state,
- but such functions are neither thread-safe nor safe to be used to
- reproduce their stream more than once.
-
+ Any number of states can be stored this way.
+ - The elements of the stream are never cached, so keeping a reference
+ to a stream state does not incur an uncontrollable memory penalty.
+
+ Note that the stream mechanism is thread-safe as long as the
+ concrete stream implementations do not use any mutable state.
+
Stream transformers take any number of input streams and produce one
output stream. They are typically written using the stream-m
- monad. Input streams can be defined by any value that stream-next
- can handle. The output stream can have a (lazy) seq interface or
- a generator function interface. In the definition of a stream
- transformer, (pick s) returns the next value of stream argument s,
- whereas pick-all returns the next value of all stream arguments
- in the form of a vector."
-
- (:use [clojure.contrib.monads :only (defmonad with-monad)]))
-
-
-(let [eos (Object.)
- skip (Object.)]
-
- (defn stream-eos?
- "Returns true if x is the special end-of-stream value used by
- stream-next and the stream-m monad."
- [x]
- (identical? x eos))
-
- (defn stream-skip?
- "Returns true if x is the special skip value used by the stream-m monad."
- [x]
- (identical? x skip))
-
- (defmulti stream-next
- "Returns a vector of length two whose first element is the next
- item in the data stream defined by stream-state and whose second
- element is the new state of the stream. At the end of the stream,
- the returned value is a special end-of-stream object for which
- stream-eos? returns true."
- {:arglists '([stream-state])}
- class)
-
- (defmethod stream-next nil
- [s]
- [eos nil])
-
- (defmethod stream-next clojure.lang.ISeq
- [s]
- (if (seq s)
- [(first s) (rest s)]
- [eos nil]))
-
- (defmethod stream-next clojure.lang.IPersistentVector
- [v]
- (stream-next (seq v)))
-
- (defmethod stream-next clojure.lang.Fn
- [g]
- (g eos))
-
- (defmonad stream-m
- "Monad describing stream computations. The monadic values can be
- of any type handled by stream-next."
- [m-result (fn m-result-stream [v]
- (fn [s] [v s]))
- m-bind (fn m-bind-stream [mv f]
- (fn [s]
- (let [[v ss] (mv s)]
- (if (or (stream-eos? v) (stream-skip? v))
- [v ss]
- ((f v) ss)))))
- m-zero (fn [s] [skip s])
- ])
-
- (defn pick
- "Return the next value of stream argument n inside a stream
- transformer. When used inside of defst, the name of the stream
- argument can be used instead of its index n."
- [n]
- (fn [streams]
- (let [[v ns] (stream-next (streams n))]
- [v (assoc streams n ns)])))
-
- (defn pick-all
- "Return a vector containing the next value of each stream argument
- inside a stream transformer."
- [streams]
+ monad. In the definition of a stream transformer, (pick s) returns
+ the next value of stream argument s, whereas pick-all returns the
+ next value of all stream arguments in the form of a vector."
+
+ (:use [clojure.contrib.types :only (deftype)])
+ (:use [clojure.contrib.monads :only (defmonad with-monad)])
+ (:use [clojure.contrib.def :only (defvar defvar-)])
+ (:require [clojure.contrib.seq-utils]))
+
+
+;
+; Stream type and interface
+;
+(defvar stream-type ::stream
+ "The root type for the stream hierarchy. For each stream type,
+ add a derivation from this type.")
+
+(defmacro defstream
+ "Define object of the given type as a stream whose implementation
+ of stream-next is defined by args and body. This macro adds
+ a type-specific method for stream-next and derives type
+ from stream-type."
+ [type-tag args & body]
+ `(do
+ (derive ~type-tag stream-type)
+ (defmethod stream-next ~type-tag ~args ~@body)))
+
+(defvar- stream-skip ::skip
+ "The skip-this-item value.")
+
+(defn- stream-skip?
+ "Returns true if x is the stream-skip."
+ [x]
+ (identical? x stream-skip))
+
+(defmulti stream-next
+ "Returns a vector [next-value new-state] where next-value is the next
+ item in the data stream defined by stream-state and new-state
+ is the new state of the stream. At the end of the stream,
+ next-value and new-state are nil."
+ {:arglists '([stream-state])}
+ type)
+
+(defmethod stream-next nil
+ [s]
+ [nil nil])
+
+(defmethod stream-next clojure.lang.ISeq
+ [s]
+ (if (seq s)
+ [(first s) (rest s)]
+ [nil nil]))
+
+(defmethod stream-next clojure.lang.IPersistentVector
+ [v]
+ (stream-next (seq v)))
+
+(defn stream-seq
+ "Return a lazy seq on the stream. Also accessible via
+ clojure.contrib.seq-utils/seq-on for streams."
+ [s]
+ (lazy-seq
+ (let [[v ns] (stream-next s)]
+ (if (nil? ns)
+ nil
+ (cons v (stream-seq ns))))))
+
+(defmethod clojure.contrib.seq-utils/seq-on stream-type
+ [s]
+ (stream-seq s))
+
+;
+; Stream transformers
+;
+(defmonad stream-m
+ "Monad describing stream computations. The monadic values can be
+ of any type handled by stream-next."
+ [m-result (fn m-result-stream [v]
+ (fn [s] [v s]))
+ m-bind (fn m-bind-stream [mv f]
+ (fn [s]
+ (let [[v ss :as r] (mv s)]
+ (if (or (nil? ss) (stream-skip? v))
+ r
+ ((f v) ss)))))
+ m-zero (fn [s] [stream-skip s])
+ ])
+
+(defn pick
+ "Return the next value of stream argument n inside a stream
+ transformer. When used inside of defst, the name of the stream
+ argument can be used instead of its index n."
+ [n]
+ (fn [streams]
+ (let [[v ns] (stream-next (streams n))]
+ (if (nil? ns)
+ [nil nil]
+ [v (assoc streams n ns)]))))
+
+(defn pick-all
+ "Return a vector containing the next value of each stream argument
+ inside a stream transformer."
+ [streams]
+ (let [next (map stream-next streams)
+ values (map first next)
+ streams (vec (map second next))]
(if (some nil? streams)
- [eos streams]
- (let [next (map stream-next streams)
- values (map first next)
- streams (vec (map second next))]
- (if (some stream-eos? values)
- [eos streams]
- [values streams]))))
-
- (defn st-as-seq
- "Takes a stream transformer expression st (typically written using the
- stream-m monad) and a vector of stream arguments and returns a lazy
- seq representing the output stream of the transformer."
- [st streams]
- (lazy-seq
- (loop [s streams]
- (let [[v ns] (st s)]
- (cond (stream-eos? v) nil
- (stream-skip? v) (recur ns)
- :else (cons v (st-as-seq st ns)))))))
-
- (defn st-as-generator
- "Takes a stream transformer expression st (typically written using the
- stream-m monad) and a vector of stream arguments and returns a stream
- generator function representing the output stream of the transformer."
- [st streams]
- (letfn [(make-gen [s]
- (fn [eos]
- (loop [s s]
- (let [[v ns] (st s)]
- (cond (stream-eos? v) [eos nil]
- (stream-skip? v) (recur ns)
- :else [v (make-gen ns)])))))]
- (make-gen streams)))
-)
-
-(defn- defst [wrapper name args streams body]
+ [nil nil]
+ [values streams])))
+
+(deftype ::stream-transformer st-as-stream
+ (fn [st streams] [st streams])
+ seq)
+
+(defstream ::stream-transformer
+ [[st streams]]
+ (loop [s streams]
+ (let [[v ns] (st s)]
+ (cond (nil? ns) [nil nil]
+ (stream-skip? v) (recur ns)
+ :else [v (st-as-stream st ns)]))))
+
+(defmacro defst
+ "Define the stream transformer name by body.
+ The non-stream arguments args and the stream arguments streams
+ are given separately, with args being possibly empty."
+ [name args streams & body]
(if (= (first streams) '&)
`(defn ~name ~(vec (concat args streams))
(let [~'st (with-monad stream-m ~@body)]
- (~wrapper ~'st ~(second streams))))
+ (st-as-stream ~'st ~(second streams))))
`(defn ~name ~(vec (concat args streams))
(let [~'st (with-monad stream-m
- (let [~streams (range ~(count streams))]
+ (let [~streams (range ~(count streams))]
~@body))]
- (~wrapper ~'st ~streams)))))
-
-(defmacro defst-seq
- "Define the seq-returning stream transformer name by body.
- The non-stream arguments args and the stream arguments streams
- are given separately, with args being possibly empty."
- [name args streams & body]
- (defst `st-as-seq name args streams body))
-
-(defmacro defst-gen
- "Define the generator-returning stream transformer name by body.
- The non-stream arguments args and the stream arguments streams
- are given separately, with args being possibly empty."
- [name args streams & body]
- (defst `st-as-generator name args streams body))
+ (st-as-stream ~'st ~streams)))))
+;
+; Stream utilities
+;
(defn stream-drop
"Return a stream containing all but the first n elements of stream."
[n stream]
@@ -192,29 +197,71 @@
(let [[_ s] (stream-next stream)]
(recur (dec n) s))))
-(defn stream-as-seq
- "Return a lazy seq of the stream s."
- [s]
- (lazy-seq
- (let [[v ns] (stream-next s)]
- (if (stream-eos? v)
- nil
- (cons v (stream-as-seq ns))))))
+; Map a function on a stream
+(deftype ::stream-map stream-map-state)
+
+(defstream ::stream-map
+ [[f stream]]
+ (let [[v ns] (stream-next stream)]
+ (if (nil? ns)
+ [nil nil]
+ [(f v) (stream-map-state [f ns])])))
+
+(defmulti stream-map
+ "Return a new stream by mapping the function f on the given stream."
+ {:arglists '([f stream])}
+ (fn [f stream] (type stream)))
+
+(defmethod stream-map :default
+ [f stream]
+ (stream-map-state [f stream]))
+
+(defmethod stream-map ::stream-map
+ [f [g stream]]
+ (stream-map-state [(comp f g) stream]))
+
+; Filter stream elements
+(deftype ::stream-filter stream-filter-state)
+
+(defstream ::stream-filter
+ [[p stream]]
+ (loop [stream stream]
+ (let [[v ns] (stream-next stream)]
+ (cond (nil? ns) [nil nil]
+ (p v) [v (stream-filter-state [p ns])]
+ :else (recur ns)))))
+
+(defmulti stream-filter
+ "Return a new stream that contrains the elements of stream
+ that satisfy the predicate p."
+ {:arglists '([p stream])}
+ (fn [p stream] (type stream)))
+
+(defmethod stream-filter :default
+ [p stream]
+ (stream-filter-state [p stream]))
+
+(defmethod stream-filter ::stream-filter
+ [p [q stream]]
+ (stream-filter-state [(fn [v] (and (q v) (p v))) stream]))
+
+; Flatten a stream of sequences
+(deftype ::stream-flatten stream-flatten-state)
+
+(defstream ::stream-flatten
+ [[buffer stream]]
+ (loop [buffer buffer
+ stream stream]
+ (if (nil? buffer)
+ (let [[v new-stream] (stream-next stream)]
+ (cond (nil? new-stream) [nil nil]
+ (empty? v) (recur nil new-stream)
+ :else (recur v new-stream)))
+ [(first buffer) (stream-flatten-state [(next buffer) stream])])))
(defn stream-flatten
"Converts a stream of sequences into a stream of the elements of the
- sequences. Flattening is not recursive, only one level of sequences
+ sequences. Flattening is not recursive, only one level of nesting
will be removed."
[s]
- (letfn [(buffer-gen [buffer stream]
- (fn [eos]
- (loop [buffer buffer
- stream stream]
- (if (nil? buffer)
- (let [[v new-stream] (stream-next stream)]
- (cond (stream-eos? v) [eos nil]
- (empty? v) (recur nil new-stream)
- :else (recur v new-stream)))
- [(first buffer) (buffer-gen (next buffer) stream)]))))]
- (buffer-gen nil s)))
-
+ (stream-flatten-state [nil s]))
diff --git a/src/clojure/contrib/stream_utils/examples.clj b/src/clojure/contrib/stream_utils/examples.clj
index cc19f2c4..10207fff 100644
--- a/src/clojure/contrib/stream_utils/examples.clj
+++ b/src/clojure/contrib/stream_utils/examples.clj
@@ -8,74 +8,105 @@
(ns clojure.contrib.stream-utils.examples
(:use [clojure.contrib.stream-utils
- :only (defst-seq defst-gen pick pick-all stream-as-seq)])
- (:use [clojure.contrib.monads :only (domonad)]))
+ :only (defst stream-next
+ pick pick-all
+ stream-type defstream
+ stream-drop stream-map stream-filter stream-flatten)])
+ (:use [clojure.contrib.monads :only (domonad)])
+ (:use [clojure.contrib.seq-utils :only (seq-on)])
+ (:use [clojure.contrib.types :only (deftype)]))
-; Transform a stream of numbers into a stream of sums of
-; two consecutive numbers.
-(defst-seq sum-two [] [xs]
- (domonad
- [x1 (pick xs)
- x2 (pick xs)]
- (+ x1 x2)))
+;
+; Define a stream of Fibonacci numbers
+;
+(deftype ::fib-stream last-two-fib)
+
+(defstream ::fib-stream
+ [fs]
+ (let [[n1 n2] fs]
+ [n1 (last-two-fib [n2 (+ n1 n2)])]))
+
+(def fib-stream (last-two-fib [0 1]))
+
+(take 10 (seq-on fib-stream))
-(sum-two '(1 2 3 4 5 6 7 8))
+;
+; A simple random number generator, implemented as a stream
+;
+(deftype ::random-seed rng-seed vector seq)
-; The same example, but with a generator interface for the output stream
-(defst-gen sum-two-gen [] [xs]
+(defstream ::random-seed
+ [seed]
+ (let [[seed] seed
+ m 259200
+ value (/ (float seed) (float m))
+ next (rem (+ 54773 (* 7141 seed)) m)]
+ [value (rng-seed next)]))
+
+(take 10 (seq-on (rng-seed 1)))
+
+;
+; Various stream utilities
+;
+(take 10 (seq-on (stream-drop 10 (rng-seed 1))))
+(seq-on (stream-map inc (range 5)))
+(seq-on (stream-filter odd? (range 10)))
+(seq-on (stream-flatten (partition 3 (range 9))))
+
+;
+; Stream transformers
+;
+
+; Transform a stream of numbers into a stream of sums of two
+; consecutive numbers.
+(defst sum-two [] [xs]
(domonad
[x1 (pick xs)
x2 (pick xs)]
(+ x1 x2)))
-(def g (sum-two-gen '(1 2 3 4 5 6 7 8)))
-(let [[v1 g] (g :eos)]
- (let [[v2 g] (g :eos)]
- (let [[v3 g] (g :eos)]
- (let [[v4 g] (g :eos)]
- (let [[v5 g] (g :eos)]
+(def s (sum-two '(1 2 3 4 5 6 7 8)))
+
+(let [[v1 s] (stream-next s)]
+ (let [[v2 s] (stream-next s)]
+ (let [[v3 s] (stream-next s)]
+ (let [[v4 s] (stream-next s)]
+ (let [[v5 s] (stream-next s)]
[v1 v2 v3 v4 v5])))))
+(seq-on s)
+
; Map (for a single stream) written as a stream transformer
-(defst-seq my-map-1 [f] [xs]
+(defst my-map-1 [f] [xs]
(domonad
[x (pick xs)]
(f x)))
-(my-map-1 inc [1 2 3])
+(seq-on (my-map-1 inc [1 2 3]))
; Map for two stream arguments
-(defst-seq my-map-2 [f] [xs ys]
+(defst my-map-2 [f] [xs ys]
(domonad
[x (pick xs)
y (pick ys)]
(f x y)))
-(my-map-2 + '(1 2 3 4) '(10 20 30 40))
+(seq-on (my-map-2 + '(1 2 3 4) '(10 20 30 40)))
; Map for any number of stream arguments
-(defst-seq my-map [f] [& streams]
+(defst my-map [f] [& streams]
(domonad
[vs pick-all]
(apply f vs)))
-(my-map inc [1 2 3])
-(my-map + '(1 2 3 4) '(10 20 30 40))
+(seq-on (my-map inc [1 2 3]))
+(seq-on (my-map + '(1 2 3 4) '(10 20 30 40)))
; Filter written as a stream transformer
-(defst-seq my-filter [p] [xs]
+(defst my-filter [p] [xs]
(domonad
[x (pick xs) :when (p x)]
x))
-(my-filter odd? [1 2 3])
-
-; A simple random number generator, implemented as a generator function
-(defn rng [seed]
- (fn [eos]
- (let [m 259200
- value (/ (float seed) (float m))
- next (rem (+ 54773 (* 7141 seed)) m)]
- [value (rng next)])))
+(seq-on (my-filter odd? [1 2 3]))
-(take 10 (stream-as-seq (rng 1)))