diff options
author | Rich Hickey <richhickey@gmail.com> | 2008-12-11 20:27:17 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2008-12-11 20:27:17 +0000 |
commit | 14d49b3eafae4a2095f5b80e47afa2b2878567ea (patch) | |
tree | f2170f3960b42cc217355a9e56ec3e4a0a96c4c8 | |
parent | 8aaeff138f515fff4291fe61330b5f69cb7e3ce9 (diff) |
release coll on nth of seq, stream work in progress
-rw-r--r-- | src/jvm/clojure/lang/APersistentVector.java | 17 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ASeq.java | 24 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ArrayStream.java | 194 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IStream.java | 17 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IteratorStream.java | 29 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Range.java | 15 | ||||
-rw-r--r-- | src/jvm/clojure/lang/StreamSeq.java | 64 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Streamable.java | 17 |
8 files changed, 373 insertions, 4 deletions
diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java index e57aae43..9d77f713 100644 --- a/src/jvm/clojure/lang/APersistentVector.java +++ b/src/jvm/clojure/lang/APersistentVector.java @@ -13,10 +13,11 @@ package clojure.lang; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable, List, - RandomAccess, Comparable{ + RandomAccess, Comparable, Streamable{ int _hash = -1; public APersistentVector(IPersistentMap meta){ @@ -357,7 +358,19 @@ public int compareTo(Object o){ return 0; } -static class Seq extends ASeq implements IndexedSeq, IReduce{ +public IStream stream() throws Exception { + final AtomicInteger ai = new AtomicInteger(0); + return new IStream(){ + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if(i < count()) + return nth(i); + return RT.eos(); + } + }; +} + + static class Seq extends ASeq implements IndexedSeq, IReduce{ //todo - something more efficient final IPersistentVector v; final int i; diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java index a7c42639..cdb3ac28 100644 --- a/src/jvm/clojure/lang/ASeq.java +++ b/src/jvm/clojure/lang/ASeq.java @@ -12,8 +12,9 @@ package clojure.lang; import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
-public abstract class ASeq extends Obj implements ISeq, Collection{
+public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
public String toString(){
@@ -175,4 +176,25 @@ public Iterator iterator(){ return new SeqIterator(this);
}
+public IStream stream() throws Exception {
+ return new Stream(this);
+}
+
+ static class Stream implements IStream{
+ ISeq s;
+
+ public Stream(ISeq s) {
+ this.s = s;
+ }
+
+ synchronized public Object next() throws Exception {
+ if(s != null)
+ {
+ Object ret = s.first();
+ s = s.rest();
+ return ret;
+ }
+ return RT.eos();
+ }
+}
}
diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java new file mode 100644 index 00000000..d4457e3c --- /dev/null +++ b/src/jvm/clojure/lang/ArrayStream.java @@ -0,0 +1,194 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 7, 2008 */ + +package clojure.lang; + +import java.util.concurrent.atomic.AtomicInteger; +import java.lang.reflect.Array; + +public class ArrayStream implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final Object[] array; + + public ArrayStream(Object[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + + static IStream createFromObject(Object array) { + Class aclass = array.getClass().getComponentType(); + if (!aclass.isPrimitive()) + return new ArrayStream((Object[]) array); + if (aclass == int.class) + return new ArrayStream_int((int[]) array); + if (aclass == long.class) + return new ArrayStream_long((long[]) array); + if (aclass == float.class) + return new ArrayStream_float((float[]) array); + if (aclass == double.class) + return new ArrayStream_double((double[]) array); + if (aclass == char.class) + return new ArrayStream_char((char[]) array); + if (aclass == byte.class) + return new ArrayStream_byte((byte[]) array); + if (aclass == short.class) + return new ArrayStream_short((short[]) array); + if (aclass == boolean.class) + return new ArrayStream_boolean((boolean[]) array); + throw new IllegalArgumentException(String.format("Unsupported array type %s", array)); + } + + static public class ArrayStream_int implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final int[] array; + + public ArrayStream_int(int[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_long implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final long[] array; + + public ArrayStream_long(long[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_float implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final float[] array; + + public ArrayStream_float(float[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_double implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final double[] array; + + public ArrayStream_double(double[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_char implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final char[] array; + + public ArrayStream_char(char[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_byte implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final byte[] array; + + public ArrayStream_byte(byte[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_short implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final short[] array; + + public ArrayStream_short(short[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + static public class ArrayStream_boolean implements IStream { + + final AtomicInteger ai = new AtomicInteger(0); + final boolean[] array; + + public ArrayStream_boolean(boolean[] array) { + this.array = array; + } + + public Object next() throws Exception { + int i = ai.getAndIncrement(); + if (i < array.length) + return array[i]; + return RT.eos(); + } + } + + +} diff --git a/src/jvm/clojure/lang/IStream.java b/src/jvm/clojure/lang/IStream.java new file mode 100644 index 00000000..45d17a6c --- /dev/null +++ b/src/jvm/clojure/lang/IStream.java @@ -0,0 +1,17 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 7, 2008 */ + +package clojure.lang; + +public interface IStream { + Object next() throws Exception; +} diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java new file mode 100644 index 00000000..f7d4bc35 --- /dev/null +++ b/src/jvm/clojure/lang/IteratorStream.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 7, 2008 */ + +package clojure.lang; + +import java.util.Iterator; + +public class IteratorStream implements IStream{ + final Iterator iter; + + public IteratorStream(Iterator iter) { + this.iter = iter; + } + + synchronized public Object next() throws Exception { + if(iter.hasNext()) + return iter.next(); + return RT.eos(); + } +} diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java index ccb98b19..95b38bcc 100644 --- a/src/jvm/clojure/lang/Range.java +++ b/src/jvm/clojure/lang/Range.java @@ -12,7 +12,9 @@ package clojure.lang; -public class Range extends ASeq implements IReduce{ +import java.util.concurrent.atomic.AtomicInteger; + +public class Range extends ASeq implements IReduce, Streamable{ final int end; final int n; @@ -57,4 +59,15 @@ public Object reduce(IFn f, Object start) throws Exception{ return ret; } +public IStream stream() throws Exception { + final AtomicInteger an = new AtomicInteger(n); + return new IStream(){ + public Object next() throws Exception { + int i = an.getAndIncrement(); + if (i < end) + return i; + return RT.eos(); + } + }; +} } diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java new file mode 100644 index 00000000..b39f1392 --- /dev/null +++ b/src/jvm/clojure/lang/StreamSeq.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 8, 2008 */ + +package clojure.lang; + +public class StreamSeq extends ASeq { + IStream stream; + final Object _first; + ISeq _rest; + + static public StreamSeq create(IStream stream) throws Exception { + Object x = stream.next(); + if (RT.isEOS(x)) + return null; + return new StreamSeq(x, stream); + } + + StreamSeq(IPersistentMap meta, Object _first, ISeq _rest) { + super(meta); + this._first = _first; + this._rest = _rest; + this.stream = null; + } + + StreamSeq(Object first, IStream stream) { + this._first = first; + this.stream = stream; + } + + + public Object first() { + return _first; + } + + synchronized public ISeq rest() { + if (stream != null) { + try { + _rest = create(stream); + } catch (Exception e) { + throw new RuntimeException(e); + } + stream = null; + } + return _rest; + } + + public Obj withMeta(IPersistentMap meta) { + if(meta != this.meta()) + { + rest(); + return new StreamSeq(meta, _first, _rest); + } + return this; + } +} diff --git a/src/jvm/clojure/lang/Streamable.java b/src/jvm/clojure/lang/Streamable.java new file mode 100644 index 00000000..81f17b75 --- /dev/null +++ b/src/jvm/clojure/lang/Streamable.java @@ -0,0 +1,17 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Dec 7, 2008 */ + +package clojure.lang; + +public interface Streamable { + IStream stream() throws Exception; +} |