aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonrad Hinsen <konrad.hinsen@laposte.net>2009-02-23 17:40:53 +0000
committerKonrad Hinsen <konrad.hinsen@laposte.net>2009-02-23 17:40:53 +0000
commit4e6c3a8c9abce25c0f8a601e130b379382267c90 (patch)
tree6c5c9b3a17447613e3fb8ceda9986eade34ced34
parentcbf439ccf0e8e7d5533862a5c2d41cdf66f719d5 (diff)
stream-utils: almost completely new content
-rw-r--r--build.xml1
-rw-r--r--src/clojure/contrib/load_all.clj2
-rw-r--r--src/clojure/contrib/stream_utils.clj375
-rw-r--r--src/clojure/contrib/stream_utils/examples.clj79
4 files changed, 289 insertions, 168 deletions
diff --git a/build.xml b/build.xml
index 706a97d7..0740cef8 100644
--- a/build.xml
+++ b/build.xml
@@ -88,6 +88,7 @@
<arg value="clojure.contrib.sql.internal"/>
<arg value="clojure.contrib.sql"/>
<arg value="clojure.contrib.str-utils"/>
+ <arg value="clojure.contrib.stream-utils"/>
<arg value="clojure.contrib.test-clojure"/>
<arg value="clojure.contrib.test-is"/>
<arg value="clojure.contrib.trace"/>
diff --git a/src/clojure/contrib/load_all.clj b/src/clojure/contrib/load_all.clj
index bf16b9e1..28865cb1 100644
--- a/src/clojure/contrib/load_all.clj
+++ b/src/clojure/contrib/load_all.clj
@@ -67,7 +67,7 @@ shell-out
sql
stacktrace
str-utils
-;; stream-utils
+stream-utils
template
test-is
test-is.tests
diff --git a/src/clojure/contrib/stream_utils.clj b/src/clojure/contrib/stream_utils.clj
index fd4e6a04..605426db 100644
--- a/src/clojure/contrib/stream_utils.clj
+++ b/src/clojure/contrib/stream_utils.clj
@@ -1,13 +1,7 @@
;; Stream utilities
;; by Konrad Hinsen
-;; last updated January 25, 2009
-
-;; Note: this module is at least as experimental as Clojure's stream
-;; facilities. It may change significantly, change name, or disappear
-;; in the future. Don't rely on it for your applications. I put this
-;; module in clojure.contrib so that it can serve as a testground and
-;; a basis for discussions ont the group.
+;; last updated February 23, 2009
;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use
;; and distribution terms for this software are covered by the Eclipse
@@ -18,163 +12,210 @@
;; remove this notice, or any other, from this software.
(ns clojure.contrib.stream-utils
- (:use [clojure.contrib.monads]))
-
-;; Streams
-
-; Stream of integers
-(defn int-stream
- "Return a stream of integers starting from init and incrementing by step
- until end, the first value not returned. The defaults are 0, 1, and nil,
- where nil stands for an infinite stream."
- ([] (int-stream 0 1 nil))
- ([init] (int-stream init 1 nil))
- ([init step] (int-stream init step nil))
- ([init step end]
- (let [n (into-array [init])]
- (stream
- (fn [eos]
- (let [current (aget n 0)]
- (if (or (nil? end) (< current end))
- (do
- (aset n 0 (+ current step))
- current)
- eos)))))))
-
-;; Stream transformers
-
-(defn drop-first
- "Return a stream that returns the elements of s except for the first n ones."
- [n s]
- (let [eos (Object.)
- iter (stream-iter s)
- gen (fn [eos] (next! iter eos))]
- (doseq [_ (range n)] (gen eos))
- (stream gen)))
-
-;; Monadic stream transformers
-
-; Generator monad
-;
-; This monad represents computations with generators, as defined for
-; Clojure streams: a function of one argument, an end-of-stream value,
-; that returns at each call the "next" value of the stream, until the
-; end of the stream is reached, and from then on it returns the supplied
-; eos value forever. This monad permits the simple composition of such
-; generators.
-;
-; The use of generators instead of streams as the basis of this monad
-; has two reasons:
-; 1) It is not possible to pass around a stream such that each
-; monadic computation step takes one item of it, because there can
-; be only one iterator per stream.
-; 2) One could use iterators instead of generators as the monadic
-; values, even with minimal changes, but each computational step
-; in the monad would then create a new stream and a new iter,
-; adding overhead for no apparent benefit.
-
-(defmonad generator
- "Monad describing stream-transforming computations. The monadic
- values are generator functions."
- [m-result (fn m-result-generator [v]
- (fn [eos] v))
- m-bind (fn m-bind-generator [mv f]
- (fn [eos]
- (let [v (mv eos)]
- (if (identical? v eos)
- eos
- ((f v) eos)))))
- ])
-
-; A macro for defining stream transformers using the generator monad.
-; It wraps the input streams into iters-that-look-like-generators,
-; and converts the resulting generator into a stream.
-(defmacro stream-transformer
- [stream-args & body]
- `(stream (let [~stream-args (map stream-iter ~stream-args)]
- (with-monad generator ~@body))))
-
-
-; Derive a variant of the generator monad that permits the
-; use of :when conditions. Invalid values in the output are
-; represented by ::skip, which is filtered out in the end
-; by applying remove-skip. The macro stream-transformer-cond
-; takes care of all these administrative details.
-(def generator-cond (maybe-t generator ::skip :m-plus-from-maybe))
-
-(defn remove-skip [g]
- (fn [eos]
- (loop [v (g eos)]
- (if (identical? v ::skip)
- (recur (g eos))
- v))))
-
-(defmacro stream-transformer-cond
- [stream-args & body]
- `(stream (remove-skip
- (let [~stream-args (map stream-iter ~stream-args)]
- (with-monad generator-cond ~@body)))))
-
-
-;; Some examples of monadic stream transformers
-
-(defn skip-s [s]
- "Return a stream that skips every second item of s"
- (stream-transformer [s]
- (domonad
- [x1 s
- x2 s]
- x2)))
-
-(defn partition-s [n s]
- "Return a stream in which each element is a sequence of n consecutive
- elements of the input stream s."
- (stream-transformer [s]
- (m-seq (replicate n s))))
-
-; Map a function on streams.
-; Map has to be defined by an explicit expression for every number of
-; arguments because the m-lift macro requires a constant as its first
-; argument.
-(defn map-s
- "Return a stream of the results of applying f on one item of each
- input sequence."
- ([f s]
- (stream-transformer [s]
- ((m-lift 1 f) s)))
- ([f s1 s2]
- (stream-transformer [s1 s2]
- ((m-lift 2 f) s1 s2)))
- ([f s1 s2 s3]
- (stream-transformer [s1 s2 s3]
- ((m-lift 3 f) s1 s2 s3)))
- ([f s1 s2 s3 s4]
- (stream-transformer [s1 s2 s3 s4]
- ((m-lift 4 f) s1 s2 s3 s4)))
- ([f s1 s2 s3 s4 s5]
- (stream-transformer [s1 s2 s3 s4 s5]
- ((m-lift 5 f) s1 s2 s3 s4 s5))))
-
-(defn filter-s [p s]
- "Return a stream consisting of the elements of s that satisfy predicate p."
- (stream-transformer-cond [s]
- (domonad
- [x s :when (p x)]
- x)))
-
-(comment
-
-; Some example expressions
-
-(take 10 (drop-first 5 (int-stream)))
-
-(take 10 (skip-s (int-stream)))
-
-(take 10 (partition-s 3 (int-stream)))
-
-(take 10 (map-s inc (int-stream)))
-(take 10 (map-s + (int-stream) (map-s inc (int-stream))))
-
-(take 10 (filter-s #(zero? (rem % 3)) (int-stream)))
-
-) \ No newline at end of file
+ "Functions for setting up computational pipelines via data streams.
+
+ NOTE: This library is experimental. It may change significantly
+ with future release.
+
+ This library defines:
+ - the multimethod stream-next as consumer interface to data streams
+ - implementations of stream-next for three data stream types:
+ 1) Clojure seqs, sequences, and vectors
+ 2) Stream generator functions
+ 3) nil representing an empty stream
+ - the monad stream-m for writing stream transformers
+ - macros defst-seq and defst-gen for writing monad-based stream
+ transformers with a seq or a generator function interface
+ - various utility functions for working with data streams
+
+ Stream generator functions are called with a single argument representing
+ the end-of-stream sentinel value. At each call, they return the next
+ element of their data stream and the new stream state. When the end of
+ the stream is reached, they return the passed-in end-of-stream object.
+ The new stream state is typically a closure. While stream generator
+ functions are less flexible than lazy seqs (because they cannot be used
+ with Clojure's seq-handling functions), they have a few advantages:
+ - The data stream is never cached.
+ - The state of a stream can be stored in any Clojure data structure,
+ and the stream can be re-generated from it any number of times.
+ Nothing prevents a generator function from storing the stream state
+ in a mutable data structure and just return itself as the new state,
+ but such functions are neither thread-safe nor safe to be used to
+ reproduce their stream more than once.
+
+ Stream transformers take any number of input streams and produce one
+ output stream. They are typically written using the stream-m
+ monad. Input streams can be defined by any value that stream-next
+ can handle. The output stream can have a (lazy) seq interface or
+ a generator function interface. In the definition of a stream
+ transformer, (pick s) returns the next value of stream argument s,
+ whereas pick-all returns the next value of all stream arguments
+ in the form of a vector."
+
+ (:use [clojure.contrib.monads])
+ (:use [clojure.contrib.macros :only (letfn)]))
+
+
+(let [eos (Object.)
+ skip (Object.)]
+
+ (defn stream-eos?
+ "Returns true if x is the special end-of-stream value used by
+ stream-next and the stream-m monad."
+ [x]
+ (identical? x eos))
+
+ (defn stream-skip?
+ "Returns true if x is the special skip value used by the stream-m monad."
+ [x]
+ (identical? x skip))
+
+ (defmulti stream-next
+ "Returns a vector of length two whose first element is the next
+ item in the data stream defined by stream-state and whose second
+ element is the new state of the stream. At the end of the stream,
+ the returned value is a special end-of-stream object for which
+ stream-eos? returns true."
+ {:arglists '([stream-state])}
+ class)
+
+ (defmethod stream-next nil
+ [s]
+ [eos nil])
+
+ (defmethod stream-next clojure.lang.ISeq
+ [s]
+ (if (seq s)
+ [(first s) (rest s)]
+ [eos nil]))
+
+ (defmethod stream-next clojure.lang.IPersistentVector
+ [v]
+ (stream-next (seq v)))
+
+ (defmethod stream-next clojure.lang.Fn
+ [g]
+ (g eos))
+
+ (defmonad stream-m
+ "Monad describing stream computations. The monadic values can be
+ of any type handled by stream-next."
+ [m-result (fn m-result-stream [v]
+ (fn [s] [v s]))
+ m-bind (fn m-bind-stream [mv f]
+ (fn [s]
+ (let [[v ss] (mv s)]
+ (if (or (stream-eos? v) (stream-skip? v))
+ [v ss]
+ ((f v) ss)))))
+ m-zero (fn [s] [skip s])
+ ])
+
+ (defn pick
+ "Return the next value of stream argument n inside a stream
+ transformer. When used inside of defst, the name of the stream
+ argument can be used instead of its index n."
+ [n]
+ (fn [streams]
+ (let [[v ns] (stream-next (streams n))]
+ [v (assoc streams n ns)])))
+
+ (defn pick-all
+ "Return a vector containing the next value of each stream argument
+ inside a stream transformer."
+ [streams]
+ (if (some nil? streams)
+ [eos streams]
+ (let [next (map stream-next streams)
+ values (map first next)
+ streams (vec (map second next))]
+ (if (some stream-eos? values)
+ [eos streams]
+ [values streams]))))
+
+ (defn st-as-seq
+ "Takes a stream transformer expression st (typically written using the
+ stream-m monad) and a vector of stream arguments and returns a lazy
+ seq representing the output stream of the transformer."
+ [st streams]
+ (lazy-seq
+ (loop [s streams]
+ (let [[v ns] (st s)]
+ (cond (stream-eos? v) nil
+ (stream-skip? v) (recur ns)
+ :else (cons v (st-as-seq st ns)))))))
+
+ (defn st-as-generator
+ "Takes a stream transformer expression st (typically written using the
+ stream-m monad) and a vector of stream arguments and returns a stream
+ generator function representing the output stream of the transformer."
+ [st streams]
+ (letfn [make-gen [s]
+ (fn [eos]
+ (loop [s s]
+ (let [[v ns] (st s)]
+ (cond (stream-eos? v) [eos nil]
+ (stream-skip? v) (recur ns)
+ :else [v (make-gen ns)]))))]
+ (make-gen streams)))
+)
+
+(defn- defst [wrapper name args streams body]
+ (if (= (first streams) '&)
+ `(defn ~name ~(vec (concat args streams))
+ (let [~'st (with-monad stream-m ~@body)]
+ (~wrapper ~'st ~(second streams))))
+ `(defn ~name ~(vec (concat args streams))
+ (let [~'st (with-monad stream-m
+ (let [~streams (range ~(count streams))]
+ ~@body))]
+ (~wrapper ~'st ~streams)))))
+
+(defmacro defst-seq
+ "Define the seq-returning stream transformer name by body.
+ The non-stream arguments args and the stream arguments streams
+ are given separately, with args being possibly empty."
+ [name args streams & body]
+ (defst 'st-as-seq name args streams body))
+
+(defmacro defst-gen
+ "Define the generator-returning stream transformer name by body.
+ The non-stream arguments args and the stream arguments streams
+ are given separately, with args being possibly empty."
+ [name args streams & body]
+ (defst 'st-as-generator name args streams body))
+
+(defn stream-drop
+ "Return a stream containing all but the first n elements of stream."
+ [n stream]
+ (if (zero? n)
+ stream
+ (let [[_ s] (stream-next stream)]
+ (recur (dec n) s))))
+
+(defn stream-as-seq
+ "Return a lazy seq of the stream s."
+ [s]
+ (lazy-seq
+ (let [[v ns] (stream-next s)]
+ (if (stream-eos? v)
+ nil
+ (cons v (stream-as-seq ns))))))
+
+(defn stream-flatten
+ "Converts a stream of sequences into a stream of the elements of the
+ sequences. Flattening is not recursive, only one level of sequences
+ will be removed."
+ [s]
+ (letfn [buffer-gen [buffer stream]
+ (fn [eos]
+ (loop [buffer buffer
+ stream stream]
+ (if (nil? buffer)
+ (let [[v new-stream] (stream-next stream)]
+ (cond (stream-eos? v) [eos nil]
+ (empty? v) (recur nil new-stream)
+ :else (recur v new-stream)))
+ [(first buffer) (buffer-gen (next buffer) stream)])))]
+ (buffer-gen nil s)))
+
diff --git a/src/clojure/contrib/stream_utils/examples.clj b/src/clojure/contrib/stream_utils/examples.clj
new file mode 100644
index 00000000..0d0454a7
--- /dev/null
+++ b/src/clojure/contrib/stream_utils/examples.clj
@@ -0,0 +1,79 @@
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;
+;; Stream application examples
+;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(use 'clojure.contrib.stream-utils)
+(use 'clojure.contrib.monads)
+
+; Transform a stream of numbers into a stream of sums of
+; two consecutive numbers.
+(defst-seq sum-two [] [xs]
+ (domonad
+ [x1 (pick xs)
+ x2 (pick xs)]
+ (+ x1 x2)))
+
+(sum-two '(1 2 3 4 5 6 7 8))
+
+; The same example, but with a generator interface for the output stream
+(defst-gen sum-two-gen [] [xs]
+ (domonad
+ [x1 (pick xs)
+ x2 (pick xs)]
+ (+ x1 x2)))
+
+(def g (sum-two-gen '(1 2 3 4 5 6 7 8)))
+(let [[v1 g] (g :eos)]
+ (let [[v2 g] (g :eos)]
+ (let [[v3 g] (g :eos)]
+ (let [[v4 g] (g :eos)]
+ (let [[v5 g] (g :eos)]
+ [v1 v2 v3 v4 v5])))))
+
+; Map (for a single stream) written as a stream transformer
+(defst-seq my-map-1 [f] [xs]
+ (domonad
+ [x (pick xs)]
+ (f x)))
+
+(my-map-1 inc [1 2 3])
+
+; Map for two stream arguments
+(defst-seq my-map-2 [f] [xs ys]
+ (domonad
+ [x (pick xs)
+ y (pick ys)]
+ (f x y)))
+
+(my-map-2 + '(1 2 3 4) '(10 20 30 40))
+
+; Map for any number of stream arguments
+(defst-seq my-map [f] [& streams]
+ (domonad
+ [vs pick-all]
+ (apply f vs)))
+
+(my-map inc [1 2 3])
+(my-map + '(1 2 3 4) '(10 20 30 40))
+
+; Filter written as a stream transformer
+(defst-seq my-filter [p] [xs]
+ (domonad
+ [x (pick xs) :when (p x)]
+ x))
+
+(my-filter odd? [1 2 3])
+
+; A simple random number generator, implemented as a generator function
+(defn rng [seed]
+ (fn [eos]
+ (let [m 259200
+ value (/ (float seed) (float m))
+ next (rem (+ 54773 (* 7141 seed)) m)]
+ [value (rng next)])))
+
+(take 10 (stream-as-seq (rng 1)))