diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-01-09 20:14:12 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-01-09 20:14:12 +0000 |
commit | 0f38a4e448fff18f57cd4b967b1b7144a76484e1 (patch) | |
tree | 1551363d10b276538aacfab3316012f8edab9372 | |
parent | 2f7ea599893a36ea0691dd5ea29d981920357896 (diff) |
intial import of streams stuff, Java side
-rw-r--r-- | src/jvm/clojure/lang/APersistentVector.java | 26 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ASeq.java | 12 | ||||
-rw-r--r-- | src/jvm/clojure/lang/AStream.java | 98 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ArrayStream.java | 344 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Compiler.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IPersistentCollection.java | 4 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IteratorStream.java | 25 | ||||
-rw-r--r-- | src/jvm/clojure/lang/RT.java | 33 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Range.java | 28 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Seqable.java (renamed from src/jvm/clojure/lang/IStream.java) | 6 | ||||
-rw-r--r-- | src/jvm/clojure/lang/StreamSeq.java | 6 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Streamable.java | 2 |
12 files changed, 343 insertions, 243 deletions
diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java index 0ecf9263..a3d7f96b 100644 --- a/src/jvm/clojure/lang/APersistentVector.java +++ b/src/jvm/clojure/lang/APersistentVector.java @@ -14,6 +14,7 @@ package clojure.lang; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable, List, @@ -358,17 +359,24 @@ public int compareTo(Object o){ return 0; } -public IStream stream() throws Exception { - final AtomicInteger ai = new AtomicInteger(0); - return new IStream(){ - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if(i < count()) - return nth(i); +public AStream stream() throws Exception { + return new AStream(new Src(this)); +} + + static class Src implements Callable{ + final IPersistentVector v; + int i = 0; + + Src(IPersistentVector v) { + this.v = v; + } + + public Object call() throws Exception { + if (i < v.count()) + return v.nth(i++); return RT.eos(); } - }; -} + } static class Seq extends ASeq implements IndexedSeq, IReduce{ //todo - something more efficient diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java index 9e298301..ba041c9b 100644 --- a/src/jvm/clojure/lang/ASeq.java +++ b/src/jvm/clojure/lang/ASeq.java @@ -12,7 +12,7 @@ package clojure.lang; import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
@@ -177,18 +177,18 @@ public Iterator iterator(){ return new SeqIterator(this);
}
-public IStream stream() throws Exception {
- return new Stream(this);
+public AStream stream() throws Exception {
+ return new AStream(new Src(this));
}
- static class Stream implements IStream{
+static class Src implements Callable{
ISeq s;
- public Stream(ISeq s) {
+ public Src(ISeq s) {
this.s = s;
}
- synchronized public Object next() throws Exception {
+ public Object call() throws Exception {
if(s != null)
{
Object ret = s.first();
diff --git a/src/jvm/clojure/lang/AStream.java b/src/jvm/clojure/lang/AStream.java new file mode 100644 index 00000000..51060ff1 --- /dev/null +++ b/src/jvm/clojure/lang/AStream.java @@ -0,0 +1,98 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) + * which can be found in the file epl-v10.html at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 14, 2008 */ + +package clojure.lang; + +import java.util.concurrent.Callable; + +final public class AStream implements Seqable { + + ISeq seq = null; + Callable src; + + public AStream(Callable src){ + this.src = src; + } + + final synchronized public ISeq seq(){ + if (src != null) + { + seq = Seq.create(src); + src = null; + } + return seq; + } + + final synchronized public Object next() throws Exception { + if (src == null) + return RT.eos(); + return src.call(); + } + + static class Seq extends ASeq { + Callable src; + final Object _first; + ISeq _rest; + + static Seq create(Callable src) { + Object x; + try + { + x = src.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + if (RT.isEOS(x)) + return null; + else + return new Seq(x, src); + } + + Seq(IPersistentMap meta, Object _first, ISeq _rest) { + super(meta); + this._first = _first; + this._rest = _rest; + this.src = null; + } + + Seq(Object first, Callable src) { + this._first = first; + this.src = src; + } + + + public Object first() { + return _first; + } + + synchronized public ISeq rest() { + if (src != null) + { + _rest = create(src); + src = null; + } + return _rest; + } + + synchronized public Obj withMeta(IPersistentMap meta) { + if (meta != this.meta()) + { + rest(); + return new Seq(meta, _first, _rest); + } + return this; + } + } + +} diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java index 8fadbd94..fcf7a519 100644 --- a/src/jvm/clojure/lang/ArrayStream.java +++ b/src/jvm/clojure/lang/ArrayStream.java @@ -12,183 +12,173 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicInteger; -import java.lang.reflect.Array; - -public class ArrayStream implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final Object[] array; - - public ArrayStream(Object[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - - static IStream createFromObject(Object array) { - Class aclass = array.getClass().getComponentType(); - if (!aclass.isPrimitive()) - return new ArrayStream((Object[]) array); - if (aclass == int.class) - return new ArrayStream_int((int[]) array); - if (aclass == long.class) - return new ArrayStream_long((long[]) array); - if (aclass == float.class) - return new ArrayStream_float((float[]) array); - if (aclass == double.class) - return new ArrayStream_double((double[]) array); - if (aclass == char.class) - return new ArrayStream_char((char[]) array); - if (aclass == byte.class) - return new ArrayStream_byte((byte[]) array); - if (aclass == short.class) - return new ArrayStream_short((short[]) array); - if (aclass == boolean.class) - return new ArrayStream_boolean((boolean[]) array); - throw new IllegalArgumentException(String.format("Unsupported array type %s", array)); - } - - static public class ArrayStream_int implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final int[] array; - - public ArrayStream_int(int[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_long implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final long[] array; - - public ArrayStream_long(long[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_float implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final float[] array; - - public ArrayStream_float(float[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_double implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final double[] array; - - public ArrayStream_double(double[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_char implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final char[] array; - - public ArrayStream_char(char[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_byte implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final byte[] array; - - public ArrayStream_byte(byte[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_short implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final short[] array; - - public ArrayStream_short(short[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } - - static public class ArrayStream_boolean implements IStream { - - final AtomicInteger ai = new AtomicInteger(0); - final boolean[] array; - - public ArrayStream_boolean(boolean[] array) { - this.array = array; - } - - public Object next() throws Exception { - int i = ai.getAndIncrement(); - if (i < array.length) - return array[i]; - return RT.eos(); - } - } +import java.util.concurrent.Callable; + +public class ArrayStream implements Callable{ + +int i = 0; +final Object[] array; + +public ArrayStream(Object[] array){ + this.array = array; +} + +public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); +} + +static AStream createFromObject(Object array){ + Class aclass = array.getClass().getComponentType(); + if(!aclass.isPrimitive()) + return new AStream(new ArrayStream((Object[]) array)); + if(aclass == int.class) + return new AStream(new ArrayStream_int((int[]) array)); + if(aclass == long.class) + return new AStream(new ArrayStream_long((long[]) array)); + if(aclass == float.class) + return new AStream(new ArrayStream_float((float[]) array)); + if(aclass == double.class) + return new AStream(new ArrayStream_double((double[]) array)); + if(aclass == char.class) + return new AStream(new ArrayStream_char((char[]) array)); + if(aclass == byte.class) + return new AStream(new ArrayStream_byte((byte[]) array)); + if(aclass == short.class) + return new AStream(new ArrayStream_short((short[]) array)); + if(aclass == boolean.class) + return new AStream(new ArrayStream_boolean((boolean[]) array)); + throw new IllegalArgumentException(String.format("Unsupported array type %s", array)); +} + +static public class ArrayStream_int implements Callable{ + + int i = 0; + final int[] array; + + public ArrayStream_int(int[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_long implements Callable{ + + int i = 0; + final long[] array; + + public ArrayStream_long(long[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_float implements Callable{ + + int i = 0; + final float[] array; + + public ArrayStream_float(float[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_double implements Callable{ + + int i = 0; + final double[] array; + + public ArrayStream_double(double[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_char implements Callable{ + + int i = 0; + final char[] array; + + public ArrayStream_char(char[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_byte implements Callable{ + + int i = 0; + final byte[] array; + + public ArrayStream_byte(byte[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_short implements Callable{ + + int i = 0; + final short[] array; + + public ArrayStream_short(short[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} + +static public class ArrayStream_boolean implements Callable{ + + int i = 0; + final boolean[] array; + + public ArrayStream_boolean(boolean[] array){ + this.array = array; + } + + public Object call() throws Exception{ + if(i < array.length) + return array[i++]; + return RT.eos(); + } +} } diff --git a/src/jvm/clojure/lang/Compiler.java b/src/jvm/clojure/lang/Compiler.java index c1802016..76685840 100644 --- a/src/jvm/clojure/lang/Compiler.java +++ b/src/jvm/clojure/lang/Compiler.java @@ -1,7 +1,7 @@ /** * Copyright (c) Rich Hickey. All rights reserved. * The use and distribution terms for this software are covered by the - * Eclipse Public License 1.0 (http://opensource.org/licenses/cpl1.0.php) + * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) * which can be found in the file epl-v10.html at the root of this distribution. * By using this software in any fashion, you are agreeing to be bound by * the terms of this license. diff --git a/src/jvm/clojure/lang/IPersistentCollection.java b/src/jvm/clojure/lang/IPersistentCollection.java index 21e436f2..d6442244 100644 --- a/src/jvm/clojure/lang/IPersistentCollection.java +++ b/src/jvm/clojure/lang/IPersistentCollection.java @@ -11,12 +11,10 @@ package clojure.lang; */
-public interface IPersistentCollection {
+public interface IPersistentCollection extends Seqable{
int count();
-ISeq seq();
-
IPersistentCollection cons(Object o);
IPersistentCollection empty();
diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java index edb4dd79..cfb85cec 100644 --- a/src/jvm/clojure/lang/IteratorStream.java +++ b/src/jvm/clojure/lang/IteratorStream.java @@ -13,17 +13,22 @@ package clojure.lang; import java.util.Iterator; +import java.util.concurrent.Callable; -public class IteratorStream implements IStream{ - final Iterator iter; +public class IteratorStream implements Callable{ +final Iterator iter; - public IteratorStream(Iterator iter) { - this.iter = iter; - } +static public AStream create(Iterator iter){ + return new AStream(new IteratorStream(iter)); +} + +IteratorStream(Iterator iter){ + this.iter = iter; +} - synchronized public Object next() throws Exception { - if(iter.hasNext()) - return iter.next(); - return RT.eos(); - } +public Object call() throws Exception{ + if(iter.hasNext()) + return iter.next(); + return RT.eos(); +} } diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java index 4954028c..a6142ea6 100644 --- a/src/jvm/clojure/lang/RT.java +++ b/src/jvm/clojure/lang/RT.java @@ -447,29 +447,23 @@ static public ISeq seq(Object coll){ return null; else if(coll instanceof ISeq) return (ISeq) coll; - else if(coll instanceof IPersistentCollection) - return ((IPersistentCollection) coll).seq(); + else if(coll instanceof Seqable) + return ((Seqable) coll).seq(); else return seqFrom(coll); } -static public IStream stream(final Object coll) throws Exception{ +static public AStream stream(final Object coll) throws Exception{ if(coll == null) return EMPTY_STREAM; - else if(coll instanceof IStream) - return (IStream) coll; + else if(coll instanceof AStream) + return (AStream) coll; + else if(coll instanceof Fn) + return new AStream((Callable)coll); else if(coll instanceof Streamable) return ((Streamable)coll).stream(); - else if(coll instanceof Fn) - { - return new IStream(){ - public Object next() throws Exception { - return ((IFn)coll).invoke(); - } - }; - } else if(coll instanceof Iterable) - return new IteratorStream(((Iterable) coll).iterator()); + return IteratorStream.create(((Iterable) coll).iterator()); else if (coll.getClass().isArray()) return ArrayStream.createFromObject(coll); else if (coll instanceof String) @@ -1686,8 +1680,8 @@ static public int alength(Object xs){ } final static private Object EOS = new Object(); - -final static public Object eos() { + +static public Object eos() { return EOS; } @@ -1695,12 +1689,11 @@ static public boolean isEOS(Object o){ return o == EOS; } -static final public IStream EMPTY_STREAM = new IStream(){ - - public Object next() throws Exception { +static final public AStream EMPTY_STREAM = new AStream(new Callable(){ + synchronized public Object call() throws Exception { return eos(); } -}; +}); synchronized public static DynamicClassLoader getRootClassLoader() { if(ROOT_CLASSLOADER == null) diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java index 226a77ba..b52eec43 100644 --- a/src/jvm/clojure/lang/Range.java +++ b/src/jvm/clojure/lang/Range.java @@ -12,7 +12,7 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Callable; public class Range extends ASeq implements IReduce, Streamable{ final int end; @@ -63,15 +63,23 @@ public int count() { return end - n; } - public IStream stream() throws Exception { - final AtomicInteger an = new AtomicInteger(n); - return new IStream(){ - public Object next() throws Exception { - int i = an.getAndIncrement(); - if (i < end) - return i; +public AStream stream() throws Exception { + return new AStream(new Src(n,end)); +} + + static class Src implements Callable{ + int n; + final int end; + + public Src(int n, int end) { + this.n = n; + this.end = end; + } + + public Object call() throws Exception { + if(n < end) + return n++; return RT.eos(); } - }; -} + } } diff --git a/src/jvm/clojure/lang/IStream.java b/src/jvm/clojure/lang/Seqable.java index ccd8a9d8..8732487f 100644 --- a/src/jvm/clojure/lang/IStream.java +++ b/src/jvm/clojure/lang/Seqable.java @@ -8,10 +8,10 @@ * You must not remove this notice, or any other, from this software. **/ -/* rich Dec 7, 2008 */ +/* rich Dec 14, 2008 */ package clojure.lang; -public interface IStream { - Object next() throws Exception; +public interface Seqable { + ISeq seq(); } diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java index 44cfe28c..c935ebfa 100644 --- a/src/jvm/clojure/lang/StreamSeq.java +++ b/src/jvm/clojure/lang/StreamSeq.java @@ -13,11 +13,11 @@ package clojure.lang; public class StreamSeq extends ASeq { - IStream stream; + AStream stream; final Object _first; ISeq _rest; - static public StreamSeq create(IStream stream) throws Exception { + static public StreamSeq create(AStream stream) throws Exception { Object x = stream.next(); if (RT.isEOS(x)) return null; @@ -31,7 +31,7 @@ public class StreamSeq extends ASeq { this.stream = null; } - StreamSeq(Object first, IStream stream) { + StreamSeq(Object first, AStream stream) { this._first = first; this.stream = stream; } diff --git a/src/jvm/clojure/lang/Streamable.java b/src/jvm/clojure/lang/Streamable.java index 67ed4987..68312700 100644 --- a/src/jvm/clojure/lang/Streamable.java +++ b/src/jvm/clojure/lang/Streamable.java @@ -13,5 +13,5 @@ package clojure.lang; public interface Streamable { - IStream stream() throws Exception; + AStream stream() throws Exception; } |