diff options
-rw-r--r-- | src/clj/clojure/core.clj | 233 | ||||
-rw-r--r-- | src/jvm/clojure/lang/APersistentVector.java | 8 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ASeq.java | 7 | ||||
-rw-r--r-- | src/jvm/clojure/lang/AStream.java | 149 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ArrayStream.java | 56 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Closer.java | 33 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Compiler.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Cons.java | 4 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IteratorStream.java | 7 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LispReader.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/RT.java | 36 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Range.java | 6 | ||||
-rw-r--r-- | src/jvm/clojure/lang/StreamSeq.java | 64 |
13 files changed, 372 insertions, 235 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj index 5a4cef08..482a1790 100644 --- a/src/clj/clojure/core.clj +++ b/src/clj/clojure/core.clj @@ -448,6 +448,31 @@ (recur (first zs) (rest zs)))))] (cat (concat x y) zs)))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;; streams ;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn stream + "Creates a stream of the items in coll." + {:tag clojure.lang.AStream} + [coll] (clojure.lang.RT/stream coll)) + +(defn stream-iter + "Returns an iter on (stream coll). Only one iter on a stream is + supported at a time." + {:tag clojure.lang.AStream$Iter} + [coll] (.iter (stream coll))) + +(defn next! + "Takes a stream iter and an eos value, returns (and consumes) the next element in the stream, or eos." + [#^clojure.lang.AStream$Iter iter eos] (.next iter eos)) + +(defn push-back! + "Takes a stream iter and pushes x onto front of stream, returns iter." + [#^clojure.lang.AStream$Iter iter x] (.pushBack iter x)) + +(defn detach! + "Takes a stream iter and disconnects it from the underlying stream, + returning the stream. All further operations on the iter will fail." + [#^clojure.lang.AStream$Iter iter] (.detach iter)) + ;;;;;;;;;;;;;;;;at this point all the support for syntax-quote exists;;;;;;;;;;;;;;;;;;;;;; (defmacro if-not "Evaluates test. If logical false, evaluates and returns then expr, otherwise else expr, if supplied, else nil." @@ -1274,6 +1299,8 @@ (. ref (touch)) (. ref (get))) +(def #^{:private true :tag clojure.lang.Closer} *io-context* nil) + (defmacro sync "transaction-flags => TBD, pass nil for now @@ -1281,23 +1308,15 @@ exprs and any nested calls. Starts a transaction if none is already running on this thread. Any uncaught exception will abort the transaction and flow out of sync. The exprs may be run more than - once, but any effects on Refs will be atomic." + once, but any effects on Refs will be atomic. Transactions are not + allowed in io! blocks - will throw IllegalStateException." [flags-ignored-for-now & body] - `(. clojure.lang.LockingTransaction - (runInTransaction (fn [] ~@body)))) + `(if *io-context* + (throw (IllegalStateException. "Transaction in io!")) + (. clojure.lang.LockingTransaction + (runInTransaction (fn [] ~@body))))) -(defmacro io! - "If an io! block occurs in a transaction, throws an - IllegalStateException, else runs body in an implicit do. If the - first expression in body is a literal string, will use that as the - exception message." - [& body] - (let [message (when (string? (first body)) (first body)) - body (if message (rest body) body)] - `(if (clojure.lang.LockingTransaction/isRunning) - (throw (new IllegalStateException ~(or message "I/O in transaction"))) - (do ~@body)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fn stuff ;;;;;;;;;;;;;;;; @@ -1380,10 +1399,44 @@ (map f (rest c1) (rest c2) (rest c3))))) ([f c1 c2 c3 & colls] (let [step (fn step [cs] - (when (every? seq cs) + (when (every? seq cs) (lazy-cons (map first cs) (step (map rest cs)))))] (map #(apply f %) (step (conj colls c3 c2 c1)))))) +(defn map-stream + "Returns a stream consisting of the result of applying f to the + set of first items of each coll, followed by applying f to the set + of second items in each coll, until any one of the colls is + exhausted. Any remaining items in other colls are ignored. Function + f should accept number-of-colls arguments." + ([f coll] + (identity (let [iter (stream-iter coll)] + (stream + #(let [x (next! iter %)] + (if (= % x) x (f x))))))) + ([f c1 c2] + (identity (let [s1 (stream-iter c1), s2 (stream-iter c2)] + (stream + #(let [x1 (next! s1 %), x2 (next! s2 %)] + (if (or (= % x1) (= % x2)) + % + (f x1 x2))))))) + ([f c1 c2 c3] + (identity (let [s1 (stream-iter c1), s2 (stream-iter c2), s3 (stream-iter c3)] + (stream + #(let [x1 (next! s1 %), x2 (next! s2 %), x3 (next! s3 %)] + (if (or (= % x1) (= % x2) (= % x3)) + % + (f x1 x2 x3))))))) + ([f c1 c2 c3 & colls] + (identity (let [iters (map stream-iter (list* c1 c2 c3 colls))] + (stream + (fn [eos] + (let [xs (seq (map #(next! % eos) iters))] + (if (some #{eos} xs) + eos + (apply f xs))))))))) + (defn mapcat "Returns the result of applying concat to the result of applying map to f and colls. Thus function f should return a collection." @@ -1391,13 +1444,25 @@ (apply concat (apply map f colls))) (defn filter - "Returns a lazy seq of the items in coll for which + "Returns a stream of the items in coll for which (pred item) returns true. pred must be free of side-effects." [pred coll] - (when (seq coll) - (if (pred (first coll)) - (lazy-cons (first coll) (filter pred (rest coll))) - (recur pred (rest coll))))) + (seq + (let [iter (stream-iter coll)] + (stream + #(let [x (next! iter %)] + (if (or (= % x) (pred x)) + x + (recur %))))))) + +;(defn filter +; "Returns a lazy seq of the items in coll for which +; (pred item) returns true. pred must be free of side-effects." +; [pred coll] +; (when (seq coll) +; (if (pred (first coll)) +; (lazy-cons (first coll) (filter pred (rest coll))) +; (recur pred (rest coll))))) (defn remove "Returns a lazy seq of the items in coll for which @@ -1633,39 +1698,6 @@ (dorun n coll) coll)) -(defn await - "Blocks the current thread (indefinitely!) until all actions - dispatched thus far, from this thread or agent, to the agent(s) have - occurred." - [& agents] - (io! "await in transaction" - (when *agent* - (throw (new Exception "Can't await in agent action"))) - (let [latch (new java.util.concurrent.CountDownLatch (count agents)) - count-down (fn [agent] (. latch (countDown)) agent)] - (doseq [agent agents] - (send agent count-down)) - (. latch (await))))) - -(defn await1 [#^clojure.lang.Agent a] - (when (pos? (.getQueueCount a)) - (await a)) - a) - -(defn await-for - "Blocks the current thread until all actions dispatched thus - far (from this thread or agent) to the agents have occurred, or the - timeout (in milliseconds) has elapsed. Returns nil if returning due - to timeout, non-nil otherwise." - [timeout-ms & agents] - (io! "await-for in transaction" - (when *agent* - (throw (new Exception "Can't await in agent action"))) - (let [latch (new java.util.concurrent.CountDownLatch (count agents)) - count-down (fn [agent] (. latch (countDown)) agent)] - (doseq [agent agents] - (send agent count-down)) - (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS)))))) (defmacro dotimes "bindings => name n @@ -1947,6 +1979,97 @@ :else (throw (IllegalArgumentException. "with-open only allows Symbols in bindings")))) + +(defmacro io! + "If an io! block occurs in a transaction, throws an + IllegalStateException, else runs body in an implicit do. If the + first expression in body is a literal string, will use that as the + exception message. Establishes a dynamic io context for use with io-scope." + [& body] + (let [message (when (string? (first body)) (first body)) + body (if message (rest body) body)] + `(if (clojure.lang.LockingTransaction/isRunning) + (throw (new IllegalStateException ~(or message "I/O in transaction"))) + (binding [*io-context* (clojure.lang.Closer.)] + (try + ~@body + (finally + (.close *io-context*))))))) + +(def *scope* nil) + +(defn run-scope-actions [] + (let [failed (= (first *scope*) :failed) + entries (if failed (rest *scope*) *scope*)] + (doseq [e entries] + (let [cause (first e) + action (second e)] + (when (or (= cause :exits) + (and (= cause :fails) failed) + (and (= cause :succeeds) (not failed))) + (action)))))) + +(defmacro scope + "Creates a scope for use with when-scope." + [& body] + `(binding [*scope* (list)] + (try + ~@body + (catch Throwable t# + (set! *scope* (conj *scope* :failed)) + (throw t#)) + (finally + (run-scope-actions))))) + +(defmacro when-scope + "Causes a body of expressions to be executed at the termination of + the nearest dynamically enclosing scope (created with scope). If no + scope is in effect, throws IllegalStateException. Cause must be one of: + + :exits - will run unconditionally on scope exit + :fails - will run only if scope exits due to an exception + :succeeds - will run only if scope exits normally" + + [cause & body] + `(do + (when-not *scope* + (throw (IllegalStateException. "No scope in effect"))) + (set! *scope* (conj *scope* [~cause (fn [] ~@body)])))) + +(defn await + "Blocks the current thread (indefinitely!) until all actions + dispatched thus far, from this thread or agent, to the agent(s) have + occurred." + [& agents] + (io! "await in transaction" + (when *agent* + (throw (new Exception "Can't await in agent action"))) + (let [latch (new java.util.concurrent.CountDownLatch (count agents)) + count-down (fn [agent] (. latch (countDown)) agent)] + (doseq [agent agents] + (send agent count-down)) + (. latch (await))))) + +(defn await1 [#^clojure.lang.Agent a] + (when (pos? (.getQueueCount a)) + (await a)) + a) + +(defn await-for + "Blocks the current thread until all actions dispatched thus + far (from this thread or agent) to the agents have occurred, or the + timeout (in milliseconds) has elapsed. Returns nil if returning due + to timeout, non-nil otherwise." + [timeout-ms & agents] + (io! "await-for in transaction" + (when *agent* + (throw (new Exception "Can't await in agent action"))) + (let [latch (new java.util.concurrent.CountDownLatch (count agents)) + count-down (fn [agent] (. latch (countDown)) agent)] + (doseq [agent agents] + (send agent count-down)) + (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS)))))) + (defmacro doto "Evaluates x then calls all of the methods and functions with the value of x supplied at the from of the given arguments. The forms @@ -2883,7 +3006,8 @@ exprs and any nested calls. Starts a transaction if none is already running on this thread. Any uncaught exception will abort the transaction and flow out of dosync. The exprs may be run more than - once, but any effects on Refs will be atomic." + once, but any effects on Refs will be atomic. Transactions are not + allowed in io! blocks - will throw IllegalStateException." [& exprs] `(sync nil ~@exprs)) @@ -3772,3 +3896,4 @@ (load "genclass") + diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java index a3d7f96b..e19c1220 100644 --- a/src/jvm/clojure/lang/APersistentVector.java +++ b/src/jvm/clojure/lang/APersistentVector.java @@ -13,8 +13,6 @@ package clojure.lang; import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.Callable; public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable, List, @@ -363,7 +361,7 @@ public AStream stream() throws Exception { return new AStream(new Src(this)); } - static class Src implements Callable{ + static class Src extends AFn{ final IPersistentVector v; int i = 0; @@ -371,10 +369,10 @@ public AStream stream() throws Exception { this.v = v; } - public Object call() throws Exception { + public Object invoke(Object eos) throws Exception { if (i < v.count()) return v.nth(i++); - return RT.eos(); + return eos; } } diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java index ba041c9b..7b4c5a4e 100644 --- a/src/jvm/clojure/lang/ASeq.java +++ b/src/jvm/clojure/lang/ASeq.java @@ -12,7 +12,6 @@ package clojure.lang; import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.Callable;
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
@@ -181,21 +180,21 @@ public AStream stream() throws Exception { return new AStream(new Src(this));
}
-static class Src implements Callable{
+static class Src extends AFn{
ISeq s;
public Src(ISeq s) {
this.s = s;
}
- public Object call() throws Exception {
+ public Object invoke(Object eos) throws Exception {
if(s != null)
{
Object ret = s.first();
s = s.rest();
return ret;
}
- return RT.eos();
+ return eos;
}
}
}
diff --git a/src/jvm/clojure/lang/AStream.java b/src/jvm/clojure/lang/AStream.java index 51060ff1..c0fabb29 100644 --- a/src/jvm/clojure/lang/AStream.java +++ b/src/jvm/clojure/lang/AStream.java @@ -12,87 +12,146 @@ package clojure.lang; -import java.util.concurrent.Callable; +final public class AStream implements Seqable, Streamable { -final public class AStream implements Seqable { + static final ISeq NO_SEQ = new Cons(null, null); - ISeq seq = null; - Callable src; + ISeq seq = NO_SEQ; + final IFn src; + Cons pushed = null; + Iter iter = null; - public AStream(Callable src){ - this.src = src; + public AStream(IFn src) { + this.src = src; } - final synchronized public ISeq seq(){ - if (src != null) + final synchronized public ISeq seq() { + if (seq == NO_SEQ) { - seq = Seq.create(src); - src = null; + iter(); + seq = Seq.create(pushed,src); } return seq; } - final synchronized public Object next() throws Exception { - if (src == null) - return RT.eos(); - return src.call(); + final synchronized public AStream stream() throws Exception { + if (seq == NO_SEQ) + return this; + return RT.stream(seq); } - static class Seq extends ASeq { - Callable src; - final Object _first; - ISeq _rest; + final synchronized public Iter iter() { + if (iter != null) + throw new IllegalStateException("Already iterating"); - static Seq create(Callable src) { - Object x; - try + return iter = new Iter(this); + } + + static public class Iter { + final AStream s; + + Iter(AStream s) { + this.s = s; + } + + final public Iter pushBack(Object x) throws Exception { + synchronized (s) { - x = src.call(); + if (s.iter != this) + throw new IllegalAccessError("Invalid iterator"); + s.pushed = new Cons(x,s.pushed); + return this; } - catch (Exception e) + } + + final public AStream detach() { + synchronized (s) { - throw new RuntimeException(e); + if (s.iter != this) + throw new IllegalAccessError("Invalid iterator"); + s.iter = null; + return s; + } + } + + final public Object next(Object eos) { + synchronized (s) + { + if (s.iter != this) + throw new IllegalAccessError("Invalid iterator"); + if (s.pushed != null) + { + Object ret = s.pushed.first(); + s.pushed = (Cons) s.pushed.rest(); + return ret; + } + try + { + return s.src.invoke(eos); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } - if (RT.isEOS(x)) - return null; - else - return new Seq(x, src); + } + + } + + static class Seq extends ASeq { + static final Object EOS = new Object(); + + final Object _first; + ISeq _rest = NO_SEQ; + final ISeq pushed; + final IFn src; + + Seq(Object first, ISeq pushed, IFn src) { + _first = first; + this.pushed = pushed; + this.src = src; } Seq(IPersistentMap meta, Object _first, ISeq _rest) { super(meta); this._first = _first; this._rest = _rest; + this.pushed = null; this.src = null; } - Seq(Object first, Callable src) { - this._first = first; - this.src = src; - } - - - public Object first() { + final public Object first() { return _first; } - synchronized public ISeq rest() { - if (src != null) + final synchronized public ISeq rest() { + if (_rest == NO_SEQ) { - _rest = create(src); - src = null; + _rest = create(pushed, src); } return _rest; } - synchronized public Obj withMeta(IPersistentMap meta) { - if (meta != this.meta()) + static Seq create(ISeq pushed, IFn src) { + if(pushed != null) + return new Seq(pushed.first(),pushed.rest(),src); + try { - rest(); - return new Seq(meta, _first, _rest); + Object x = src.invoke(EOS); + if (x == EOS) + return null; + return new Seq(x, null, src); + } + catch (Exception e) + { + throw new RuntimeException(e); } - return this; } - } + public Obj withMeta(IPersistentMap meta) { + rest(); + return new Seq(meta, _first, _rest); + } + + } } diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java index fcf7a519..6ebc2ecf 100644 --- a/src/jvm/clojure/lang/ArrayStream.java +++ b/src/jvm/clojure/lang/ArrayStream.java @@ -12,9 +12,7 @@ package clojure.lang; -import java.util.concurrent.Callable; - -public class ArrayStream implements Callable{ +public class ArrayStream extends AFn{ int i = 0; final Object[] array; @@ -23,10 +21,10 @@ public ArrayStream(Object[] array){ this.array = array; } -public Object call() throws Exception{ +public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } static AStream createFromObject(Object array){ @@ -52,7 +50,7 @@ static AStream createFromObject(Object array){ throw new IllegalArgumentException(String.format("Unsupported array type %s", array)); } -static public class ArrayStream_int implements Callable{ +static public class ArrayStream_int extends AFn{ int i = 0; final int[] array; @@ -61,14 +59,14 @@ static public class ArrayStream_int implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_long implements Callable{ +static public class ArrayStream_long extends AFn{ int i = 0; final long[] array; @@ -77,14 +75,14 @@ static public class ArrayStream_long implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_float implements Callable{ +static public class ArrayStream_float extends AFn{ int i = 0; final float[] array; @@ -93,14 +91,14 @@ static public class ArrayStream_float implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_double implements Callable{ +static public class ArrayStream_double extends AFn{ int i = 0; final double[] array; @@ -109,14 +107,14 @@ static public class ArrayStream_double implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_char implements Callable{ +static public class ArrayStream_char extends AFn{ int i = 0; final char[] array; @@ -125,14 +123,14 @@ static public class ArrayStream_char implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_byte implements Callable{ +static public class ArrayStream_byte extends AFn{ int i = 0; final byte[] array; @@ -141,14 +139,14 @@ static public class ArrayStream_byte implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_short implements Callable{ +static public class ArrayStream_short extends AFn{ int i = 0; final short[] array; @@ -157,14 +155,14 @@ static public class ArrayStream_short implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } -static public class ArrayStream_boolean implements Callable{ +static public class ArrayStream_boolean extends AFn{ int i = 0; final boolean[] array; @@ -173,10 +171,10 @@ static public class ArrayStream_boolean implements Callable{ this.array = array; } - public Object call() throws Exception{ + public Object invoke(Object eos) throws Exception{ if(i < array.length) return array[i++]; - return RT.eos(); + return eos; } } diff --git a/src/jvm/clojure/lang/Closer.java b/src/jvm/clojure/lang/Closer.java new file mode 100644 index 00000000..a34664b8 --- /dev/null +++ b/src/jvm/clojure/lang/Closer.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) + * which can be found in the file epl-v10.html at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jan 10, 2009 */ + +package clojure.lang; + +import java.io.Closeable; +import java.io.IOException; + +public class Closer implements Closeable{ + ISeq closes; + + + public void close() throws IOException { + for(ISeq s = closes;s!=null;s = s.rest()) + { + ((Closeable)s.first()).close(); + } + } + + public Closer register(Closeable c) { + closes = new Cons(c, closes); + return this; + } +} diff --git a/src/jvm/clojure/lang/Compiler.java b/src/jvm/clojure/lang/Compiler.java index 76685840..580a9655 100644 --- a/src/jvm/clojure/lang/Compiler.java +++ b/src/jvm/clojure/lang/Compiler.java @@ -3721,7 +3721,7 @@ public static class LetExpr implements Expr{ ISeq body = RT.rest(RT.rest(form)); - if(context == C.EVAL) + if(context == C.EVAL || (context == C.EXPRESSION && isLoop)) return analyze(context, RT.list(RT.list(FN, PersistentVector.EMPTY, form))); IPersistentMap dynamicBindings = RT.map(LOCAL_ENV, LOCAL_ENV.get(), diff --git a/src/jvm/clojure/lang/Cons.java b/src/jvm/clojure/lang/Cons.java index c9e65eb3..8f99a521 100644 --- a/src/jvm/clojure/lang/Cons.java +++ b/src/jvm/clojure/lang/Cons.java @@ -29,11 +29,11 @@ public Cons(IPersistentMap meta, Object _first, ISeq _rest){ this._rest = _rest; } -public Object first(){ +final public Object first(){ return _first; } -public ISeq rest(){ +final public ISeq rest(){ return _rest; } diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java index cfb85cec..0f3702a1 100644 --- a/src/jvm/clojure/lang/IteratorStream.java +++ b/src/jvm/clojure/lang/IteratorStream.java @@ -13,9 +13,8 @@ package clojure.lang; import java.util.Iterator; -import java.util.concurrent.Callable; -public class IteratorStream implements Callable{ +public class IteratorStream extends AFn{ final Iterator iter; static public AStream create(Iterator iter){ @@ -26,9 +25,9 @@ IteratorStream(Iterator iter){ this.iter = iter; } -public Object call() throws Exception{ +public Object invoke(Object eos) throws Exception{ if(iter.hasNext()) return iter.next(); - return RT.eos(); + return eos; } } diff --git a/src/jvm/clojure/lang/LispReader.java b/src/jvm/clojure/lang/LispReader.java index 922d1cbf..228e0922 100644 --- a/src/jvm/clojure/lang/LispReader.java +++ b/src/jvm/clojure/lang/LispReader.java @@ -712,7 +712,7 @@ public static class SyntaxQuoteReader extends AFn{ {
ret = RT.list(APPLY, HASHSET, RT.cons(CONCAT, sqExpandList(((IPersistentSet) form).seq())));
}
- else if(form instanceof ISeq)
+ else if(form instanceof ISeq || form instanceof IPersistentList)
{
ISeq seq = RT.seq(form);
ret = RT.cons(CONCAT, sqExpandList(seq));
diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java index a6142ea6..5b7df45b 100644 --- a/src/jvm/clojure/lang/RT.java +++ b/src/jvm/clojure/lang/RT.java @@ -243,6 +243,12 @@ static public void addURL(Object url) throws Exception{ getRootClassLoader().addURL(u); } +static final public IFn EMPTY_GEN = new AFn(){ + synchronized public Object invoke(Object eos) throws Exception { + return eos; + } +}; + static { Keyword dockw = Keyword.intern(null, "doc"); @@ -455,21 +461,19 @@ static public ISeq seq(Object coll){ static public AStream stream(final Object coll) throws Exception{ if(coll == null) - return EMPTY_STREAM; - else if(coll instanceof AStream) - return (AStream) coll; + return new AStream(EMPTY_GEN); + else if(coll instanceof Streamable) + return ((Streamable) coll).stream(); else if(coll instanceof Fn) - return new AStream((Callable)coll); - else if(coll instanceof Streamable) - return ((Streamable)coll).stream(); + return new AStream((IFn)coll); else if(coll instanceof Iterable) return IteratorStream.create(((Iterable) coll).iterator()); else if (coll.getClass().isArray()) return ArrayStream.createFromObject(coll); else if (coll instanceof String) return ArrayStream.createFromObject(((String)coll).toCharArray()); - - throw new IllegalArgumentException("Don't know how to create IStream from: " + coll.getClass().getSimpleName()); + else + return new AStream(new ASeq.Src(RT.seq(coll))); } static ISeq seqFrom(Object coll){ @@ -1208,7 +1212,7 @@ static public void print(Object x, Writer w) throws Exception{ } if(x == null) w.write("nil"); - else if(x instanceof ISeq || x instanceof IPersistentList) + else if(x instanceof ISeq || x instanceof IPersistentList || x instanceof AStream) { w.write('('); printInnerSeq(seq(x), w); @@ -1679,21 +1683,7 @@ static public int alength(Object xs){ return Array.getLength(xs); } -final static private Object EOS = new Object(); -static public Object eos() { - return EOS; - } - -static public boolean isEOS(Object o){ - return o == EOS; - } - -static final public AStream EMPTY_STREAM = new AStream(new Callable(){ - synchronized public Object call() throws Exception { - return eos(); - } -}); synchronized public static DynamicClassLoader getRootClassLoader() { if(ROOT_CLASSLOADER == null) diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java index b52eec43..68e96a1e 100644 --- a/src/jvm/clojure/lang/Range.java +++ b/src/jvm/clojure/lang/Range.java @@ -67,7 +67,7 @@ public AStream stream() throws Exception { return new AStream(new Src(n,end)); } - static class Src implements Callable{ + static class Src extends AFn{ int n; final int end; @@ -76,10 +76,10 @@ public AStream stream() throws Exception { this.end = end; } - public Object call() throws Exception { + public Object invoke(Object eos) throws Exception { if(n < end) return n++; - return RT.eos(); + return eos; } } } diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java deleted file mode 100644 index c935ebfa..00000000 --- a/src/jvm/clojure/lang/StreamSeq.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) - * which can be found in the file epl-v10.html at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich Dec 8, 2008 */ - -package clojure.lang; - -public class StreamSeq extends ASeq { - AStream stream; - final Object _first; - ISeq _rest; - - static public StreamSeq create(AStream stream) throws Exception { - Object x = stream.next(); - if (RT.isEOS(x)) - return null; - return new StreamSeq(x, stream); - } - - StreamSeq(IPersistentMap meta, Object _first, ISeq _rest) { - super(meta); - this._first = _first; - this._rest = _rest; - this.stream = null; - } - - StreamSeq(Object first, AStream stream) { - this._first = first; - this.stream = stream; - } - - - public Object first() { - r |