summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/clj/clojure/core.clj233
-rw-r--r--src/jvm/clojure/lang/APersistentVector.java8
-rw-r--r--src/jvm/clojure/lang/ASeq.java7
-rw-r--r--src/jvm/clojure/lang/AStream.java149
-rw-r--r--src/jvm/clojure/lang/ArrayStream.java56
-rw-r--r--src/jvm/clojure/lang/Closer.java33
-rw-r--r--src/jvm/clojure/lang/Compiler.java2
-rw-r--r--src/jvm/clojure/lang/Cons.java4
-rw-r--r--src/jvm/clojure/lang/IteratorStream.java7
-rw-r--r--src/jvm/clojure/lang/LispReader.java2
-rw-r--r--src/jvm/clojure/lang/RT.java36
-rw-r--r--src/jvm/clojure/lang/Range.java6
-rw-r--r--src/jvm/clojure/lang/StreamSeq.java64
13 files changed, 372 insertions, 235 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 5a4cef08..482a1790 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -448,6 +448,31 @@
(recur (first zs) (rest zs)))))]
(cat (concat x y) zs))))
+;;;;;;;;;;;;;;;;;;;;;;;;;;;; streams ;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn stream
+ "Creates a stream of the items in coll."
+ {:tag clojure.lang.AStream}
+ [coll] (clojure.lang.RT/stream coll))
+
+(defn stream-iter
+ "Returns an iter on (stream coll). Only one iter on a stream is
+ supported at a time."
+ {:tag clojure.lang.AStream$Iter}
+ [coll] (.iter (stream coll)))
+
+(defn next!
+ "Takes a stream iter and an eos value, returns (and consumes) the next element in the stream, or eos."
+ [#^clojure.lang.AStream$Iter iter eos] (.next iter eos))
+
+(defn push-back!
+ "Takes a stream iter and pushes x onto front of stream, returns iter."
+ [#^clojure.lang.AStream$Iter iter x] (.pushBack iter x))
+
+(defn detach!
+ "Takes a stream iter and disconnects it from the underlying stream,
+ returning the stream. All further operations on the iter will fail."
+ [#^clojure.lang.AStream$Iter iter] (.detach iter))
+
;;;;;;;;;;;;;;;;at this point all the support for syntax-quote exists;;;;;;;;;;;;;;;;;;;;;;
(defmacro if-not
"Evaluates test. If logical false, evaluates and returns then expr, otherwise else expr, if supplied, else nil."
@@ -1274,6 +1299,8 @@
(. ref (touch))
(. ref (get)))
+(def #^{:private true :tag clojure.lang.Closer} *io-context* nil)
+
(defmacro sync
"transaction-flags => TBD, pass nil for now
@@ -1281,23 +1308,15 @@
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of sync. The exprs may be run more than
- once, but any effects on Refs will be atomic."
+ once, but any effects on Refs will be atomic. Transactions are not
+ allowed in io! blocks - will throw IllegalStateException."
[flags-ignored-for-now & body]
- `(. clojure.lang.LockingTransaction
- (runInTransaction (fn [] ~@body))))
+ `(if *io-context*
+ (throw (IllegalStateException. "Transaction in io!"))
+ (. clojure.lang.LockingTransaction
+ (runInTransaction (fn [] ~@body)))))
-(defmacro io!
- "If an io! block occurs in a transaction, throws an
- IllegalStateException, else runs body in an implicit do. If the
- first expression in body is a literal string, will use that as the
- exception message."
- [& body]
- (let [message (when (string? (first body)) (first body))
- body (if message (rest body) body)]
- `(if (clojure.lang.LockingTransaction/isRunning)
- (throw (new IllegalStateException ~(or message "I/O in transaction")))
- (do ~@body))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fn stuff ;;;;;;;;;;;;;;;;
@@ -1380,10 +1399,44 @@
(map f (rest c1) (rest c2) (rest c3)))))
([f c1 c2 c3 & colls]
(let [step (fn step [cs]
- (when (every? seq cs)
+ (when (every? seq cs)
(lazy-cons (map first cs) (step (map rest cs)))))]
(map #(apply f %) (step (conj colls c3 c2 c1))))))
+(defn map-stream
+ "Returns a stream consisting of the result of applying f to the
+ set of first items of each coll, followed by applying f to the set
+ of second items in each coll, until any one of the colls is
+ exhausted. Any remaining items in other colls are ignored. Function
+ f should accept number-of-colls arguments."
+ ([f coll]
+ (identity (let [iter (stream-iter coll)]
+ (stream
+ #(let [x (next! iter %)]
+ (if (= % x) x (f x)))))))
+ ([f c1 c2]
+ (identity (let [s1 (stream-iter c1), s2 (stream-iter c2)]
+ (stream
+ #(let [x1 (next! s1 %), x2 (next! s2 %)]
+ (if (or (= % x1) (= % x2))
+ %
+ (f x1 x2)))))))
+ ([f c1 c2 c3]
+ (identity (let [s1 (stream-iter c1), s2 (stream-iter c2), s3 (stream-iter c3)]
+ (stream
+ #(let [x1 (next! s1 %), x2 (next! s2 %), x3 (next! s3 %)]
+ (if (or (= % x1) (= % x2) (= % x3))
+ %
+ (f x1 x2 x3)))))))
+ ([f c1 c2 c3 & colls]
+ (identity (let [iters (map stream-iter (list* c1 c2 c3 colls))]
+ (stream
+ (fn [eos]
+ (let [xs (seq (map #(next! % eos) iters))]
+ (if (some #{eos} xs)
+ eos
+ (apply f xs)))))))))
+
(defn mapcat
"Returns the result of applying concat to the result of applying map
to f and colls. Thus function f should return a collection."
@@ -1391,13 +1444,25 @@
(apply concat (apply map f colls)))
(defn filter
- "Returns a lazy seq of the items in coll for which
+ "Returns a stream of the items in coll for which
(pred item) returns true. pred must be free of side-effects."
[pred coll]
- (when (seq coll)
- (if (pred (first coll))
- (lazy-cons (first coll) (filter pred (rest coll)))
- (recur pred (rest coll)))))
+ (seq
+ (let [iter (stream-iter coll)]
+ (stream
+ #(let [x (next! iter %)]
+ (if (or (= % x) (pred x))
+ x
+ (recur %)))))))
+
+;(defn filter
+; "Returns a lazy seq of the items in coll for which
+; (pred item) returns true. pred must be free of side-effects."
+; [pred coll]
+; (when (seq coll)
+; (if (pred (first coll))
+; (lazy-cons (first coll) (filter pred (rest coll)))
+; (recur pred (rest coll)))))
(defn remove
"Returns a lazy seq of the items in coll for which
@@ -1633,39 +1698,6 @@
(dorun n coll)
coll))
-(defn await
- "Blocks the current thread (indefinitely!) until all actions
- dispatched thus far, from this thread or agent, to the agent(s) have
- occurred."
- [& agents]
- (io! "await in transaction"
- (when *agent*
- (throw (new Exception "Can't await in agent action")))
- (let [latch (new java.util.concurrent.CountDownLatch (count agents))
- count-down (fn [agent] (. latch (countDown)) agent)]
- (doseq [agent agents]
- (send agent count-down))
- (. latch (await)))))
-
-(defn await1 [#^clojure.lang.Agent a]
- (when (pos? (.getQueueCount a))
- (await a))
- a)
-
-(defn await-for
- "Blocks the current thread until all actions dispatched thus
- far (from this thread or agent) to the agents have occurred, or the
- timeout (in milliseconds) has elapsed. Returns nil if returning due
- to timeout, non-nil otherwise."
- [timeout-ms & agents]
- (io! "await-for in transaction"
- (when *agent*
- (throw (new Exception "Can't await in agent action")))
- (let [latch (new java.util.concurrent.CountDownLatch (count agents))
- count-down (fn [agent] (. latch (countDown)) agent)]
- (doseq [agent agents]
- (send agent count-down))
- (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
(defmacro dotimes
"bindings => name n
@@ -1947,6 +1979,97 @@
:else (throw (IllegalArgumentException.
"with-open only allows Symbols in bindings"))))
+
+(defmacro io!
+ "If an io! block occurs in a transaction, throws an
+ IllegalStateException, else runs body in an implicit do. If the
+ first expression in body is a literal string, will use that as the
+ exception message. Establishes a dynamic io context for use with io-scope."
+ [& body]
+ (let [message (when (string? (first body)) (first body))
+ body (if message (rest body) body)]
+ `(if (clojure.lang.LockingTransaction/isRunning)
+ (throw (new IllegalStateException ~(or message "I/O in transaction")))
+ (binding [*io-context* (clojure.lang.Closer.)]
+ (try
+ ~@body
+ (finally
+ (.close *io-context*)))))))
+
+(def *scope* nil)
+
+(defn run-scope-actions []
+ (let [failed (= (first *scope*) :failed)
+ entries (if failed (rest *scope*) *scope*)]
+ (doseq [e entries]
+ (let [cause (first e)
+ action (second e)]
+ (when (or (= cause :exits)
+ (and (= cause :fails) failed)
+ (and (= cause :succeeds) (not failed)))
+ (action))))))
+
+(defmacro scope
+ "Creates a scope for use with when-scope."
+ [& body]
+ `(binding [*scope* (list)]
+ (try
+ ~@body
+ (catch Throwable t#
+ (set! *scope* (conj *scope* :failed))
+ (throw t#))
+ (finally
+ (run-scope-actions)))))
+
+(defmacro when-scope
+ "Causes a body of expressions to be executed at the termination of
+ the nearest dynamically enclosing scope (created with scope). If no
+ scope is in effect, throws IllegalStateException. Cause must be one of:
+
+ :exits - will run unconditionally on scope exit
+ :fails - will run only if scope exits due to an exception
+ :succeeds - will run only if scope exits normally"
+
+ [cause & body]
+ `(do
+ (when-not *scope*
+ (throw (IllegalStateException. "No scope in effect")))
+ (set! *scope* (conj *scope* [~cause (fn [] ~@body)]))))
+
+(defn await
+ "Blocks the current thread (indefinitely!) until all actions
+ dispatched thus far, from this thread or agent, to the agent(s) have
+ occurred."
+ [& agents]
+ (io! "await in transaction"
+ (when *agent*
+ (throw (new Exception "Can't await in agent action")))
+ (let [latch (new java.util.concurrent.CountDownLatch (count agents))
+ count-down (fn [agent] (. latch (countDown)) agent)]
+ (doseq [agent agents]
+ (send agent count-down))
+ (. latch (await)))))
+
+(defn await1 [#^clojure.lang.Agent a]
+ (when (pos? (.getQueueCount a))
+ (await a))
+ a)
+
+(defn await-for
+ "Blocks the current thread until all actions dispatched thus
+ far (from this thread or agent) to the agents have occurred, or the
+ timeout (in milliseconds) has elapsed. Returns nil if returning due
+ to timeout, non-nil otherwise."
+ [timeout-ms & agents]
+ (io! "await-for in transaction"
+ (when *agent*
+ (throw (new Exception "Can't await in agent action")))
+ (let [latch (new java.util.concurrent.CountDownLatch (count agents))
+ count-down (fn [agent] (. latch (countDown)) agent)]
+ (doseq [agent agents]
+ (send agent count-down))
+ (. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
+
(defmacro doto
"Evaluates x then calls all of the methods and functions with the
value of x supplied at the from of the given arguments. The forms
@@ -2883,7 +3006,8 @@
exprs and any nested calls. Starts a transaction if none is already
running on this thread. Any uncaught exception will abort the
transaction and flow out of dosync. The exprs may be run more than
- once, but any effects on Refs will be atomic."
+ once, but any effects on Refs will be atomic. Transactions are not
+ allowed in io! blocks - will throw IllegalStateException."
[& exprs]
`(sync nil ~@exprs))
@@ -3772,3 +3896,4 @@
(load "genclass")
+
diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java
index a3d7f96b..e19c1220 100644
--- a/src/jvm/clojure/lang/APersistentVector.java
+++ b/src/jvm/clojure/lang/APersistentVector.java
@@ -13,8 +13,6 @@
package clojure.lang;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.Callable;
public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable,
List,
@@ -363,7 +361,7 @@ public AStream stream() throws Exception {
return new AStream(new Src(this));
}
- static class Src implements Callable{
+ static class Src extends AFn{
final IPersistentVector v;
int i = 0;
@@ -371,10 +369,10 @@ public AStream stream() throws Exception {
this.v = v;
}
- public Object call() throws Exception {
+ public Object invoke(Object eos) throws Exception {
if (i < v.count())
return v.nth(i++);
- return RT.eos();
+ return eos;
}
}
diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java
index ba041c9b..7b4c5a4e 100644
--- a/src/jvm/clojure/lang/ASeq.java
+++ b/src/jvm/clojure/lang/ASeq.java
@@ -12,7 +12,6 @@ package clojure.lang;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.Callable;
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
@@ -181,21 +180,21 @@ public AStream stream() throws Exception {
return new AStream(new Src(this));
}
-static class Src implements Callable{
+static class Src extends AFn{
ISeq s;
public Src(ISeq s) {
this.s = s;
}
- public Object call() throws Exception {
+ public Object invoke(Object eos) throws Exception {
if(s != null)
{
Object ret = s.first();
s = s.rest();
return ret;
}
- return RT.eos();
+ return eos;
}
}
}
diff --git a/src/jvm/clojure/lang/AStream.java b/src/jvm/clojure/lang/AStream.java
index 51060ff1..c0fabb29 100644
--- a/src/jvm/clojure/lang/AStream.java
+++ b/src/jvm/clojure/lang/AStream.java
@@ -12,87 +12,146 @@
package clojure.lang;
-import java.util.concurrent.Callable;
+final public class AStream implements Seqable, Streamable {
-final public class AStream implements Seqable {
+ static final ISeq NO_SEQ = new Cons(null, null);
- ISeq seq = null;
- Callable src;
+ ISeq seq = NO_SEQ;
+ final IFn src;
+ Cons pushed = null;
+ Iter iter = null;
- public AStream(Callable src){
- this.src = src;
+ public AStream(IFn src) {
+ this.src = src;
}
- final synchronized public ISeq seq(){
- if (src != null)
+ final synchronized public ISeq seq() {
+ if (seq == NO_SEQ)
{
- seq = Seq.create(src);
- src = null;
+ iter();
+ seq = Seq.create(pushed,src);
}
return seq;
}
- final synchronized public Object next() throws Exception {
- if (src == null)
- return RT.eos();
- return src.call();
+ final synchronized public AStream stream() throws Exception {
+ if (seq == NO_SEQ)
+ return this;
+ return RT.stream(seq);
}
- static class Seq extends ASeq {
- Callable src;
- final Object _first;
- ISeq _rest;
+ final synchronized public Iter iter() {
+ if (iter != null)
+ throw new IllegalStateException("Already iterating");
- static Seq create(Callable src) {
- Object x;
- try
+ return iter = new Iter(this);
+ }
+
+ static public class Iter {
+ final AStream s;
+
+ Iter(AStream s) {
+ this.s = s;
+ }
+
+ final public Iter pushBack(Object x) throws Exception {
+ synchronized (s)
{
- x = src.call();
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ s.pushed = new Cons(x,s.pushed);
+ return this;
}
- catch (Exception e)
+ }
+
+ final public AStream detach() {
+ synchronized (s)
{
- throw new RuntimeException(e);
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ s.iter = null;
+ return s;
+ }
+ }
+
+ final public Object next(Object eos) {
+ synchronized (s)
+ {
+ if (s.iter != this)
+ throw new IllegalAccessError("Invalid iterator");
+ if (s.pushed != null)
+ {
+ Object ret = s.pushed.first();
+ s.pushed = (Cons) s.pushed.rest();
+ return ret;
+ }
+ try
+ {
+ return s.src.invoke(eos);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
- if (RT.isEOS(x))
- return null;
- else
- return new Seq(x, src);
+ }
+
+ }
+
+ static class Seq extends ASeq {
+ static final Object EOS = new Object();
+
+ final Object _first;
+ ISeq _rest = NO_SEQ;
+ final ISeq pushed;
+ final IFn src;
+
+ Seq(Object first, ISeq pushed, IFn src) {
+ _first = first;
+ this.pushed = pushed;
+ this.src = src;
}
Seq(IPersistentMap meta, Object _first, ISeq _rest) {
super(meta);
this._first = _first;
this._rest = _rest;
+ this.pushed = null;
this.src = null;
}
- Seq(Object first, Callable src) {
- this._first = first;
- this.src = src;
- }
-
-
- public Object first() {
+ final public Object first() {
return _first;
}
- synchronized public ISeq rest() {
- if (src != null)
+ final synchronized public ISeq rest() {
+ if (_rest == NO_SEQ)
{
- _rest = create(src);
- src = null;
+ _rest = create(pushed, src);
}
return _rest;
}
- synchronized public Obj withMeta(IPersistentMap meta) {
- if (meta != this.meta())
+ static Seq create(ISeq pushed, IFn src) {
+ if(pushed != null)
+ return new Seq(pushed.first(),pushed.rest(),src);
+ try
{
- rest();
- return new Seq(meta, _first, _rest);
+ Object x = src.invoke(EOS);
+ if (x == EOS)
+ return null;
+ return new Seq(x, null, src);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
}
- return this;
}
- }
+ public Obj withMeta(IPersistentMap meta) {
+ rest();
+ return new Seq(meta, _first, _rest);
+ }
+
+ }
}
diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java
index fcf7a519..6ebc2ecf 100644
--- a/src/jvm/clojure/lang/ArrayStream.java
+++ b/src/jvm/clojure/lang/ArrayStream.java
@@ -12,9 +12,7 @@
package clojure.lang;
-import java.util.concurrent.Callable;
-
-public class ArrayStream implements Callable{
+public class ArrayStream extends AFn{
int i = 0;
final Object[] array;
@@ -23,10 +21,10 @@ public ArrayStream(Object[] array){
this.array = array;
}
-public Object call() throws Exception{
+public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
static AStream createFromObject(Object array){
@@ -52,7 +50,7 @@ static AStream createFromObject(Object array){
throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
}
-static public class ArrayStream_int implements Callable{
+static public class ArrayStream_int extends AFn{
int i = 0;
final int[] array;
@@ -61,14 +59,14 @@ static public class ArrayStream_int implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_long implements Callable{
+static public class ArrayStream_long extends AFn{
int i = 0;
final long[] array;
@@ -77,14 +75,14 @@ static public class ArrayStream_long implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_float implements Callable{
+static public class ArrayStream_float extends AFn{
int i = 0;
final float[] array;
@@ -93,14 +91,14 @@ static public class ArrayStream_float implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_double implements Callable{
+static public class ArrayStream_double extends AFn{
int i = 0;
final double[] array;
@@ -109,14 +107,14 @@ static public class ArrayStream_double implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_char implements Callable{
+static public class ArrayStream_char extends AFn{
int i = 0;
final char[] array;
@@ -125,14 +123,14 @@ static public class ArrayStream_char implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_byte implements Callable{
+static public class ArrayStream_byte extends AFn{
int i = 0;
final byte[] array;
@@ -141,14 +139,14 @@ static public class ArrayStream_byte implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_short implements Callable{
+static public class ArrayStream_short extends AFn{
int i = 0;
final short[] array;
@@ -157,14 +155,14 @@ static public class ArrayStream_short implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
-static public class ArrayStream_boolean implements Callable{
+static public class ArrayStream_boolean extends AFn{
int i = 0;
final boolean[] array;
@@ -173,10 +171,10 @@ static public class ArrayStream_boolean implements Callable{
this.array = array;
}
- public Object call() throws Exception{
+ public Object invoke(Object eos) throws Exception{
if(i < array.length)
return array[i++];
- return RT.eos();
+ return eos;
}
}
diff --git a/src/jvm/clojure/lang/Closer.java b/src/jvm/clojure/lang/Closer.java
new file mode 100644
index 00000000..a34664b8
--- /dev/null
+++ b/src/jvm/clojure/lang/Closer.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+ * which can be found in the file epl-v10.html at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Jan 10, 2009 */
+
+package clojure.lang;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class Closer implements Closeable{
+ ISeq closes;
+
+
+ public void close() throws IOException {
+ for(ISeq s = closes;s!=null;s = s.rest())
+ {
+ ((Closeable)s.first()).close();
+ }
+ }
+
+ public Closer register(Closeable c) {
+ closes = new Cons(c, closes);
+ return this;
+ }
+}
diff --git a/src/jvm/clojure/lang/Compiler.java b/src/jvm/clojure/lang/Compiler.java
index 76685840..580a9655 100644
--- a/src/jvm/clojure/lang/Compiler.java
+++ b/src/jvm/clojure/lang/Compiler.java
@@ -3721,7 +3721,7 @@ public static class LetExpr implements Expr{
ISeq body = RT.rest(RT.rest(form));
- if(context == C.EVAL)
+ if(context == C.EVAL || (context == C.EXPRESSION && isLoop))
return analyze(context, RT.list(RT.list(FN, PersistentVector.EMPTY, form)));
IPersistentMap dynamicBindings = RT.map(LOCAL_ENV, LOCAL_ENV.get(),
diff --git a/src/jvm/clojure/lang/Cons.java b/src/jvm/clojure/lang/Cons.java
index c9e65eb3..8f99a521 100644
--- a/src/jvm/clojure/lang/Cons.java
+++ b/src/jvm/clojure/lang/Cons.java
@@ -29,11 +29,11 @@ public Cons(IPersistentMap meta, Object _first, ISeq _rest){
this._rest = _rest;
}
-public Object first(){
+final public Object first(){
return _first;
}
-public ISeq rest(){
+final public ISeq rest(){
return _rest;
}
diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java
index cfb85cec..0f3702a1 100644
--- a/src/jvm/clojure/lang/IteratorStream.java
+++ b/src/jvm/clojure/lang/IteratorStream.java
@@ -13,9 +13,8 @@
package clojure.lang;
import java.util.Iterator;
-import java.util.concurrent.Callable;
-public class IteratorStream implements Callable{
+public class IteratorStream extends AFn{
final Iterator iter;
static public AStream create(Iterator iter){
@@ -26,9 +25,9 @@ IteratorStream(Iterator iter){
this.iter = iter;
}
-public Object call() throws Exception{
+public Object invoke(Object eos) throws Exception{
if(iter.hasNext())
return iter.next();
- return RT.eos();
+ return eos;
}
}
diff --git a/src/jvm/clojure/lang/LispReader.java b/src/jvm/clojure/lang/LispReader.java
index 922d1cbf..228e0922 100644
--- a/src/jvm/clojure/lang/LispReader.java
+++ b/src/jvm/clojure/lang/LispReader.java
@@ -712,7 +712,7 @@ public static class SyntaxQuoteReader extends AFn{
{
ret = RT.list(APPLY, HASHSET, RT.cons(CONCAT, sqExpandList(((IPersistentSet) form).seq())));
}
- else if(form instanceof ISeq)
+ else if(form instanceof ISeq || form instanceof IPersistentList)
{
ISeq seq = RT.seq(form);
ret = RT.cons(CONCAT, sqExpandList(seq));
diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java
index a6142ea6..5b7df45b 100644
--- a/src/jvm/clojure/lang/RT.java
+++ b/src/jvm/clojure/lang/RT.java
@@ -243,6 +243,12 @@ static public void addURL(Object url) throws Exception{
getRootClassLoader().addURL(u);
}
+static final public IFn EMPTY_GEN = new AFn(){
+ synchronized public Object invoke(Object eos) throws Exception {
+ return eos;
+ }
+};
+
static
{
Keyword dockw = Keyword.intern(null, "doc");
@@ -455,21 +461,19 @@ static public ISeq seq(Object coll){
static public AStream stream(final Object coll) throws Exception{
if(coll == null)
- return EMPTY_STREAM;
- else if(coll instanceof AStream)
- return (AStream) coll;
+ return new AStream(EMPTY_GEN);
+ else if(coll instanceof Streamable)
+ return ((Streamable) coll).stream();
else if(coll instanceof Fn)
- return new AStream((Callable)coll);
- else if(coll instanceof Streamable)
- return ((Streamable)coll).stream();
+ return new AStream((IFn)coll);
else if(coll instanceof Iterable)
return IteratorStream.create(((Iterable) coll).iterator());
else if (coll.getClass().isArray())
return ArrayStream.createFromObject(coll);
else if (coll instanceof String)
return ArrayStream.createFromObject(((String)coll).toCharArray());
-
- throw new IllegalArgumentException("Don't know how to create IStream from: " + coll.getClass().getSimpleName());
+ else
+ return new AStream(new ASeq.Src(RT.seq(coll)));
}
static ISeq seqFrom(Object coll){
@@ -1208,7 +1212,7 @@ static public void print(Object x, Writer w) throws Exception{
}
if(x == null)
w.write("nil");
- else if(x instanceof ISeq || x instanceof IPersistentList)
+ else if(x instanceof ISeq || x instanceof IPersistentList || x instanceof AStream)
{
w.write('(');
printInnerSeq(seq(x), w);
@@ -1679,21 +1683,7 @@ static public int alength(Object xs){
return Array.getLength(xs);
}
-final static private Object EOS = new Object();
-static public Object eos() {
- return EOS;
- }
-
-static public boolean isEOS(Object o){
- return o == EOS;
- }
-
-static final public AStream EMPTY_STREAM = new AStream(new Callable(){
- synchronized public Object call() throws Exception {
- return eos();
- }
-});
synchronized public static DynamicClassLoader getRootClassLoader() {
if(ROOT_CLASSLOADER == null)
diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java
index b52eec43..68e96a1e 100644
--- a/src/jvm/clojure/lang/Range.java
+++ b/src/jvm/clojure/lang/Range.java
@@ -67,7 +67,7 @@ public AStream stream() throws Exception {
return new AStream(new Src(n,end));
}
- static class Src implements Callable{
+ static class Src extends AFn{
int n;
final int end;
@@ -76,10 +76,10 @@ public AStream stream() throws Exception {
this.end = end;
}
- public Object call() throws Exception {
+ public Object invoke(Object eos) throws Exception {
if(n < end)
return n++;
- return RT.eos();
+ return eos;
}
}
}
diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java
deleted file mode 100644
index c935ebfa..00000000
--- a/src/jvm/clojure/lang/StreamSeq.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
- * which can be found in the file epl-v10.html at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich Dec 8, 2008 */
-
-package clojure.lang;
-
-public class StreamSeq extends ASeq {
- AStream stream;
- final Object _first;
- ISeq _rest;
-
- static public StreamSeq create(AStream stream) throws Exception {
- Object x = stream.next();
- if (RT.isEOS(x))
- return null;
- return new StreamSeq(x, stream);
- }
-
- StreamSeq(IPersistentMap meta, Object _first, ISeq _rest) {
- super(meta);
- this._first = _first;
- this._rest = _rest;
- this.stream = null;
- }
-
- StreamSeq(Object first, AStream stream) {
- this._first = first;
- this.stream = stream;
- }
-
-
- public Object first() {
- r