diff options
author | Stuart Sierra <mail@stuartsierra.com> | 2010-08-07 16:41:53 -0400 |
---|---|---|
committer | Stuart Sierra <mail@stuartsierra.com> | 2010-08-07 16:41:53 -0400 |
commit | a6a92b9b3d2bfd9a56e1e5e9cfba706d1aeeaae5 (patch) | |
tree | f1f3da9887dc2dc557df3282b0bcbd4d701ec593 /modules/stream-utils | |
parent | e7930c85290f77815cdb00a60604feedfa2d0194 (diff) |
Split all namespaces into sub-modules.
* Examples and tests have not been copied over.
* Clojure test/compile phases are commented out in parent POM.
* May require installing parent POM before full build.
Diffstat (limited to 'modules/stream-utils')
-rw-r--r-- | modules/stream-utils/pom.xml | 41 | ||||
-rw-r--r-- | modules/stream-utils/src/main/clojure/clojure/contrib/stream_utils.clj | 276 |
2 files changed, 317 insertions, 0 deletions
diff --git a/modules/stream-utils/pom.xml b/modules/stream-utils/pom.xml new file mode 100644 index 00000000..f303b1cf --- /dev/null +++ b/modules/stream-utils/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http//www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.clojure.contrib</groupId> + <artifactId>parent</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../parent</relativePath> + </parent> + <artifactId>stream-utils</artifactId> + <dependencies> + <dependency> + <groupId>org.clojure.contrib</groupId> + <artifactId>def</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.clojure.contrib</groupId> + <artifactId>monads</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.clojure.contrib</groupId> + <artifactId>types</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.clojure.contrib</groupId> + <artifactId>seq</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.clojure.contrib</groupId> + <artifactId>generic</artifactId> + <version>1.3.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project>
\ No newline at end of file diff --git a/modules/stream-utils/src/main/clojure/clojure/contrib/stream_utils.clj b/modules/stream-utils/src/main/clojure/clojure/contrib/stream_utils.clj new file mode 100644 index 00000000..5e1738dd --- /dev/null +++ b/modules/stream-utils/src/main/clojure/clojure/contrib/stream_utils.clj @@ -0,0 +1,276 @@ +;; Stream utilities + +;; by Konrad Hinsen +;; last updated May 3, 2009 + +;; Copyright (c) Konrad Hinsen, 2009. All rights reserved. The use +;; and distribution terms for this software are covered by the Eclipse +;; Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) +;; which can be found in the file epl-v10.html at the root of this +;; distribution. By using this software in any fashion, you are +;; agreeing to be bound by the terms of this license. You must not +;; remove this notice, or any other, from this software. + +(ns + ^{:author "Konrad Hinsen" + :doc "Functions for setting up computational pipelines via data streams. + + NOTE: This library is experimental. It may change significantly + with future release. + + This library defines: + - an abstract stream type, whose interface consists of the + multimethod stream-next + - a macro for implementing streams + - implementations of stream for + 1) Clojure sequences, and vectors + 2) nil, representing an empty stream + - tools for writing stream transformers, including the + monad stream-m + - various utility functions for working with streams + + Streams are building blocks in the construction of computational + pipelines. A stream is represented by its current state plus + a function that takes a stream state and obtains the next item + in the stream as well as the new stream state. The state is + implemented as a Java class or a Clojure type (as defined by the + function clojure.core/type), and the function is provided as an + implementation of the multimethod stream-next for this class or type. + + While setting up pipelines using this mechanism is somewhat more + cumbersome than using Clojure's lazy seq mechanisms, there are a + few advantages: + - The state of a stream can be stored in any Clojure data structure, + and the stream can be re-generated from it any number of times. + Any number of states can be stored this way. + - The elements of the stream are never cached, so keeping a reference + to a stream state does not incur an uncontrollable memory penalty. + + Note that the stream mechanism is thread-safe as long as the + concrete stream implementations do not use any mutable state. + + Stream transformers take any number of input streams and produce one + output stream. They are typically written using the stream-m + monad. In the definition of a stream transformer, (pick s) returns + the next value of stream argument s, whereas pick-all returns the + next value of all stream arguments in the form of a vector."} + clojure.contrib.stream-utils + (:refer-clojure :exclude (deftype)) + (:use [clojure.contrib.types :only (deftype deftype-)]) + (:use [clojure.contrib.monads :only (defmonad with-monad)]) + (:use [clojure.contrib.def :only (defvar defvar-)]) + (:require [clojure.contrib.seq]) + (:require [clojure.contrib.generic.collection])) + + +; +; Stream type and interface +; +(defvar stream-type ::stream + "The root type for the stream hierarchy. For each stream type, + add a derivation from this type.") + +(defmacro defstream + "Define object of the given type as a stream whose implementation + of stream-next is defined by args and body. This macro adds + a type-specific method for stream-next and derives type + from stream-type." + [type-tag args & body] + `(do + (derive ~type-tag stream-type) + (defmethod stream-next ~type-tag ~args ~@body))) + +(defvar- stream-skip ::skip + "The skip-this-item value.") + +(defn- stream-skip? + "Returns true if x is the stream-skip." + [x] + (identical? x stream-skip)) + +(defmulti stream-next + "Returns a vector [next-value new-state] where next-value is the next + item in the data stream defined by stream-state and new-state + is the new state of the stream. At the end of the stream, + next-value and new-state are nil." + {:arglists '([stream-state])} + type) + +(defmethod stream-next nil + [s] + [nil nil]) + +(defmethod stream-next clojure.lang.ISeq + [s] + (if (seq s) + [(first s) (rest s)] + [nil nil])) + +(defmethod stream-next clojure.lang.IPersistentVector + [v] + (stream-next (seq v))) + +(defn stream-seq + "Return a lazy seq on the stream. Also accessible via + clojure.contrib.seq/seq-on and + clojure.contrib.generic.collection/seq for streams." + [s] + (lazy-seq + (let [[v ns] (stream-next s)] + (if (nil? ns) + nil + (cons v (stream-seq ns)))))) + +(defmethod clojure.contrib.seq/seq-on stream-type + [s] + (stream-seq s)) + +(defmethod clojure.contrib.generic.collection/seq stream-type + [s] + (stream-seq s)) + +; +; Stream transformers +; +(defmonad stream-m + "Monad describing stream computations. The monadic values can be + of any type handled by stream-next." + [m-result (fn m-result-stream [v] + (fn [s] [v s])) + m-bind (fn m-bind-stream [mv f] + (fn [s] + (let [[v ss :as r] (mv s)] + (if (or (nil? ss) (stream-skip? v)) + r + ((f v) ss))))) + m-zero (fn [s] [stream-skip s]) + ]) + +(defn pick + "Return the next value of stream argument n inside a stream + transformer. When used inside of defst, the name of the stream + argument can be used instead of its index n." + [n] + (fn [streams] + (let [[v ns] (stream-next (streams n))] + (if (nil? ns) + [nil nil] + [v (assoc streams n ns)])))) + +(defn pick-all + "Return a vector containing the next value of each stream argument + inside a stream transformer." + [streams] + (let [next (map stream-next streams) + values (map first next) + streams (vec (map second next))] + (if (some nil? streams) + [nil nil] + [values streams]))) + +(deftype ::stream-transformer st-as-stream + (fn [st streams] [st streams]) + seq) + +(defstream ::stream-transformer + [[st streams]] + (loop [s streams] + (let [[v ns] (st s)] + (cond (nil? ns) [nil nil] + (stream-skip? v) (recur ns) + :else [v (st-as-stream st ns)])))) + +(defmacro defst + "Define the stream transformer name by body. + The non-stream arguments args and the stream arguments streams + are given separately, with args being possibly empty." + [name args streams & body] + (if (= (first streams) '&) + `(defn ~name ~(vec (concat args streams)) + (let [~'st (with-monad stream-m ~@body)] + (st-as-stream ~'st ~(second streams)))) + `(defn ~name ~(vec (concat args streams)) + (let [~'st (with-monad stream-m + (let [~streams (range ~(count streams))] + ~@body))] + (st-as-stream ~'st ~streams))))) + +; +; Stream utilities +; +(defn stream-drop + "Return a stream containing all but the first n elements of stream." + [n stream] + (if (zero? n) + stream + (let [[_ s] (stream-next stream)] + (recur (dec n) s)))) + +; Map a function on a stream +(deftype- ::stream-map stream-map-state) + +(defstream ::stream-map + [[f stream]] + (let [[v ns] (stream-next stream)] + (if (nil? ns) + [nil nil] + [(f v) (stream-map-state [f ns])]))) + +(defmulti stream-map + "Return a new stream by mapping the function f on the given stream." + {:arglists '([f stream])} + (fn [f stream] (type stream))) + +(defmethod stream-map :default + [f stream] + (stream-map-state [f stream])) + +(defmethod stream-map ::stream-map + [f [g stream]] + (stream-map-state [(comp f g) stream])) + +; Filter stream elements +(deftype- ::stream-filter stream-filter-state) + +(defstream ::stream-filter + [[p stream]] + (loop [stream stream] + (let [[v ns] (stream-next stream)] + (cond (nil? ns) [nil nil] + (p v) [v (stream-filter-state [p ns])] + :else (recur ns))))) + +(defmulti stream-filter + "Return a new stream that contrains the elements of stream + that satisfy the predicate p." + {:arglists '([p stream])} + (fn [p stream] (type stream))) + +(defmethod stream-filter :default + [p stream] + (stream-filter-state [p stream])) + +(defmethod stream-filter ::stream-filter + [p [q stream]] + (stream-filter-state [(fn [v] (and (q v) (p v))) stream])) + +; Flatten a stream of sequences +(deftype- ::stream-flatten stream-flatten-state) + +(defstream ::stream-flatten + [[buffer stream]] + (loop [buffer buffer + stream stream] + (if (nil? buffer) + (let [[v new-stream] (stream-next stream)] + (cond (nil? new-stream) [nil nil] + (empty? v) (recur nil new-stream) + :else (recur v new-stream))) + [(first buffer) (stream-flatten-state [(next buffer) stream])]))) + +(defn stream-flatten + "Converts a stream of sequences into a stream of the elements of the + sequences. Flattening is not recursive, only one level of nesting + will be removed." + [s] + (stream-flatten-state [nil s])) |