summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-01-09 20:14:12 +0000
committerRich Hickey <richhickey@gmail.com>2009-01-09 20:14:12 +0000
commit0f38a4e448fff18f57cd4b967b1b7144a76484e1 (patch)
tree1551363d10b276538aacfab3316012f8edab9372
parent2f7ea599893a36ea0691dd5ea29d981920357896 (diff)
intial import of streams stuff, Java side
-rw-r--r--src/jvm/clojure/lang/APersistentVector.java26
-rw-r--r--src/jvm/clojure/lang/ASeq.java12
-rw-r--r--src/jvm/clojure/lang/AStream.java98
-rw-r--r--src/jvm/clojure/lang/ArrayStream.java344
-rw-r--r--src/jvm/clojure/lang/Compiler.java2
-rw-r--r--src/jvm/clojure/lang/IPersistentCollection.java4
-rw-r--r--src/jvm/clojure/lang/IteratorStream.java25
-rw-r--r--src/jvm/clojure/lang/RT.java33
-rw-r--r--src/jvm/clojure/lang/Range.java28
-rw-r--r--src/jvm/clojure/lang/Seqable.java (renamed from src/jvm/clojure/lang/IStream.java)6
-rw-r--r--src/jvm/clojure/lang/StreamSeq.java6
-rw-r--r--src/jvm/clojure/lang/Streamable.java2
12 files changed, 343 insertions, 243 deletions
diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java
index 0ecf9263..a3d7f96b 100644
--- a/src/jvm/clojure/lang/APersistentVector.java
+++ b/src/jvm/clojure/lang/APersistentVector.java
@@ -14,6 +14,7 @@ package clojure.lang;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable,
List,
@@ -358,17 +359,24 @@ public int compareTo(Object o){
return 0;
}
-public IStream stream() throws Exception {
- final AtomicInteger ai = new AtomicInteger(0);
- return new IStream(){
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if(i < count())
- return nth(i);
+public AStream stream() throws Exception {
+ return new AStream(new Src(this));
+}
+
+ static class Src implements Callable{
+ final IPersistentVector v;
+ int i = 0;
+
+ Src(IPersistentVector v) {
+ this.v = v;
+ }
+
+ public Object call() throws Exception {
+ if (i < v.count())
+ return v.nth(i++);
return RT.eos();
}
- };
-}
+ }
static class Seq extends ASeq implements IndexedSeq, IReduce{
//todo - something more efficient
diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java
index 9e298301..ba041c9b 100644
--- a/src/jvm/clojure/lang/ASeq.java
+++ b/src/jvm/clojure/lang/ASeq.java
@@ -12,7 +12,7 @@ package clojure.lang;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
@@ -177,18 +177,18 @@ public Iterator iterator(){
return new SeqIterator(this);
}
-public IStream stream() throws Exception {
- return new Stream(this);
+public AStream stream() throws Exception {
+ return new AStream(new Src(this));
}
- static class Stream implements IStream{
+static class Src implements Callable{
ISeq s;
- public Stream(ISeq s) {
+ public Src(ISeq s) {
this.s = s;
}
- synchronized public Object next() throws Exception {
+ public Object call() throws Exception {
if(s != null)
{
Object ret = s.first();
diff --git a/src/jvm/clojure/lang/AStream.java b/src/jvm/clojure/lang/AStream.java
new file mode 100644
index 00000000..51060ff1
--- /dev/null
+++ b/src/jvm/clojure/lang/AStream.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+ * which can be found in the file epl-v10.html at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 14, 2008 */
+
+package clojure.lang;
+
+import java.util.concurrent.Callable;
+
+final public class AStream implements Seqable {
+
+ ISeq seq = null;
+ Callable src;
+
+ public AStream(Callable src){
+ this.src = src;
+ }
+
+ final synchronized public ISeq seq(){
+ if (src != null)
+ {
+ seq = Seq.create(src);
+ src = null;
+ }
+ return seq;
+ }
+
+ final synchronized public Object next() throws Exception {
+ if (src == null)
+ return RT.eos();
+ return src.call();
+ }
+
+ static class Seq extends ASeq {
+ Callable src;
+ final Object _first;
+ ISeq _rest;
+
+ static Seq create(Callable src) {
+ Object x;
+ try
+ {
+ x = src.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ if (RT.isEOS(x))
+ return null;
+ else
+ return new Seq(x, src);
+ }
+
+ Seq(IPersistentMap meta, Object _first, ISeq _rest) {
+ super(meta);
+ this._first = _first;
+ this._rest = _rest;
+ this.src = null;
+ }
+
+ Seq(Object first, Callable src) {
+ this._first = first;
+ this.src = src;
+ }
+
+
+ public Object first() {
+ return _first;
+ }
+
+ synchronized public ISeq rest() {
+ if (src != null)
+ {
+ _rest = create(src);
+ src = null;
+ }
+ return _rest;
+ }
+
+ synchronized public Obj withMeta(IPersistentMap meta) {
+ if (meta != this.meta())
+ {
+ rest();
+ return new Seq(meta, _first, _rest);
+ }
+ return this;
+ }
+ }
+
+}
diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java
index 8fadbd94..fcf7a519 100644
--- a/src/jvm/clojure/lang/ArrayStream.java
+++ b/src/jvm/clojure/lang/ArrayStream.java
@@ -12,183 +12,173 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.lang.reflect.Array;
-
-public class ArrayStream implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final Object[] array;
-
- public ArrayStream(Object[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
-
- static IStream createFromObject(Object array) {
- Class aclass = array.getClass().getComponentType();
- if (!aclass.isPrimitive())
- return new ArrayStream((Object[]) array);
- if (aclass == int.class)
- return new ArrayStream_int((int[]) array);
- if (aclass == long.class)
- return new ArrayStream_long((long[]) array);
- if (aclass == float.class)
- return new ArrayStream_float((float[]) array);
- if (aclass == double.class)
- return new ArrayStream_double((double[]) array);
- if (aclass == char.class)
- return new ArrayStream_char((char[]) array);
- if (aclass == byte.class)
- return new ArrayStream_byte((byte[]) array);
- if (aclass == short.class)
- return new ArrayStream_short((short[]) array);
- if (aclass == boolean.class)
- return new ArrayStream_boolean((boolean[]) array);
- throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
- }
-
- static public class ArrayStream_int implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final int[] array;
-
- public ArrayStream_int(int[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_long implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final long[] array;
-
- public ArrayStream_long(long[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_float implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final float[] array;
-
- public ArrayStream_float(float[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_double implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final double[] array;
-
- public ArrayStream_double(double[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_char implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final char[] array;
-
- public ArrayStream_char(char[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_byte implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final byte[] array;
-
- public ArrayStream_byte(byte[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_short implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final short[] array;
-
- public ArrayStream_short(short[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
-
- static public class ArrayStream_boolean implements IStream {
-
- final AtomicInteger ai = new AtomicInteger(0);
- final boolean[] array;
-
- public ArrayStream_boolean(boolean[] array) {
- this.array = array;
- }
-
- public Object next() throws Exception {
- int i = ai.getAndIncrement();
- if (i < array.length)
- return array[i];
- return RT.eos();
- }
- }
+import java.util.concurrent.Callable;
+
+public class ArrayStream implements Callable{
+
+int i = 0;
+final Object[] array;
+
+public ArrayStream(Object[] array){
+ this.array = array;
+}
+
+public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+}
+
+static AStream createFromObject(Object array){
+ Class aclass = array.getClass().getComponentType();
+ if(!aclass.isPrimitive())
+ return new AStream(new ArrayStream((Object[]) array));
+ if(aclass == int.class)
+ return new AStream(new ArrayStream_int((int[]) array));
+ if(aclass == long.class)
+ return new AStream(new ArrayStream_long((long[]) array));
+ if(aclass == float.class)
+ return new AStream(new ArrayStream_float((float[]) array));
+ if(aclass == double.class)
+ return new AStream(new ArrayStream_double((double[]) array));
+ if(aclass == char.class)
+ return new AStream(new ArrayStream_char((char[]) array));
+ if(aclass == byte.class)
+ return new AStream(new ArrayStream_byte((byte[]) array));
+ if(aclass == short.class)
+ return new AStream(new ArrayStream_short((short[]) array));
+ if(aclass == boolean.class)
+ return new AStream(new ArrayStream_boolean((boolean[]) array));
+ throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
+}
+
+static public class ArrayStream_int implements Callable{
+
+ int i = 0;
+ final int[] array;
+
+ public ArrayStream_int(int[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_long implements Callable{
+
+ int i = 0;
+ final long[] array;
+
+ public ArrayStream_long(long[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_float implements Callable{
+
+ int i = 0;
+ final float[] array;
+
+ public ArrayStream_float(float[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_double implements Callable{
+
+ int i = 0;
+ final double[] array;
+
+ public ArrayStream_double(double[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_char implements Callable{
+
+ int i = 0;
+ final char[] array;
+
+ public ArrayStream_char(char[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_byte implements Callable{
+
+ int i = 0;
+ final byte[] array;
+
+ public ArrayStream_byte(byte[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_short implements Callable{
+
+ int i = 0;
+ final short[] array;
+
+ public ArrayStream_short(short[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
+
+static public class ArrayStream_boolean implements Callable{
+
+ int i = 0;
+ final boolean[] array;
+
+ public ArrayStream_boolean(boolean[] array){
+ this.array = array;
+ }
+
+ public Object call() throws Exception{
+ if(i < array.length)
+ return array[i++];
+ return RT.eos();
+ }
+}
}
diff --git a/src/jvm/clojure/lang/Compiler.java b/src/jvm/clojure/lang/Compiler.java
index c1802016..76685840 100644
--- a/src/jvm/clojure/lang/Compiler.java
+++ b/src/jvm/clojure/lang/Compiler.java
@@ -1,7 +1,7 @@
/**
* Copyright (c) Rich Hickey. All rights reserved.
* The use and distribution terms for this software are covered by the
- * Eclipse Public License 1.0 (http://opensource.org/licenses/cpl1.0.php)
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
* which can be found in the file epl-v10.html at the root of this distribution.
* By using this software in any fashion, you are agreeing to be bound by
* the terms of this license.
diff --git a/src/jvm/clojure/lang/IPersistentCollection.java b/src/jvm/clojure/lang/IPersistentCollection.java
index 21e436f2..d6442244 100644
--- a/src/jvm/clojure/lang/IPersistentCollection.java
+++ b/src/jvm/clojure/lang/IPersistentCollection.java
@@ -11,12 +11,10 @@ package clojure.lang;
*/
-public interface IPersistentCollection {
+public interface IPersistentCollection extends Seqable{
int count();
-ISeq seq();
-
IPersistentCollection cons(Object o);
IPersistentCollection empty();
diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java
index edb4dd79..cfb85cec 100644
--- a/src/jvm/clojure/lang/IteratorStream.java
+++ b/src/jvm/clojure/lang/IteratorStream.java
@@ -13,17 +13,22 @@
package clojure.lang;
import java.util.Iterator;
+import java.util.concurrent.Callable;
-public class IteratorStream implements IStream{
- final Iterator iter;
+public class IteratorStream implements Callable{
+final Iterator iter;
- public IteratorStream(Iterator iter) {
- this.iter = iter;
- }
+static public AStream create(Iterator iter){
+ return new AStream(new IteratorStream(iter));
+}
+
+IteratorStream(Iterator iter){
+ this.iter = iter;
+}
- synchronized public Object next() throws Exception {
- if(iter.hasNext())
- return iter.next();
- return RT.eos();
- }
+public Object call() throws Exception{
+ if(iter.hasNext())
+ return iter.next();
+ return RT.eos();
+}
}
diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java
index 4954028c..a6142ea6 100644
--- a/src/jvm/clojure/lang/RT.java
+++ b/src/jvm/clojure/lang/RT.java
@@ -447,29 +447,23 @@ static public ISeq seq(Object coll){
return null;
else if(coll instanceof ISeq)
return (ISeq) coll;
- else if(coll instanceof IPersistentCollection)
- return ((IPersistentCollection) coll).seq();
+ else if(coll instanceof Seqable)
+ return ((Seqable) coll).seq();
else
return seqFrom(coll);
}
-static public IStream stream(final Object coll) throws Exception{
+static public AStream stream(final Object coll) throws Exception{
if(coll == null)
return EMPTY_STREAM;
- else if(coll instanceof IStream)
- return (IStream) coll;
+ else if(coll instanceof AStream)
+ return (AStream) coll;
+ else if(coll instanceof Fn)
+ return new AStream((Callable)coll);
else if(coll instanceof Streamable)
return ((Streamable)coll).stream();
- else if(coll instanceof Fn)
- {
- return new IStream(){
- public Object next() throws Exception {
- return ((IFn)coll).invoke();
- }
- };
- }
else if(coll instanceof Iterable)
- return new IteratorStream(((Iterable) coll).iterator());
+ return IteratorStream.create(((Iterable) coll).iterator());
else if (coll.getClass().isArray())
return ArrayStream.createFromObject(coll);
else if (coll instanceof String)
@@ -1686,8 +1680,8 @@ static public int alength(Object xs){
}
final static private Object EOS = new Object();
-
-final static public Object eos() {
+
+static public Object eos() {
return EOS;
}
@@ -1695,12 +1689,11 @@ static public boolean isEOS(Object o){
return o == EOS;
}
-static final public IStream EMPTY_STREAM = new IStream(){
-
- public Object next() throws Exception {
+static final public AStream EMPTY_STREAM = new AStream(new Callable(){
+ synchronized public Object call() throws Exception {
return eos();
}
-};
+});
synchronized public static DynamicClassLoader getRootClassLoader() {
if(ROOT_CLASSLOADER == null)
diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java
index 226a77ba..b52eec43 100644
--- a/src/jvm/clojure/lang/Range.java
+++ b/src/jvm/clojure/lang/Range.java
@@ -12,7 +12,7 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
public class Range extends ASeq implements IReduce, Streamable{
final int end;
@@ -63,15 +63,23 @@ public int count() {
return end - n;
}
- public IStream stream() throws Exception {
- final AtomicInteger an = new AtomicInteger(n);
- return new IStream(){
- public Object next() throws Exception {
- int i = an.getAndIncrement();
- if (i < end)
- return i;
+public AStream stream() throws Exception {
+ return new AStream(new Src(n,end));
+}
+
+ static class Src implements Callable{
+ int n;
+ final int end;
+
+ public Src(int n, int end) {
+ this.n = n;
+ this.end = end;
+ }
+
+ public Object call() throws Exception {
+ if(n < end)
+ return n++;
return RT.eos();
}
- };
-}
+ }
}
diff --git a/src/jvm/clojure/lang/IStream.java b/src/jvm/clojure/lang/Seqable.java
index ccd8a9d8..8732487f 100644
--- a/src/jvm/clojure/lang/IStream.java
+++ b/src/jvm/clojure/lang/Seqable.java
@@ -8,10 +8,10 @@
* You must not remove this notice, or any other, from this software.
**/
-/* rich Dec 7, 2008 */
+/* rich Dec 14, 2008 */
package clojure.lang;
-public interface IStream {
- Object next() throws Exception;
+public interface Seqable {
+ ISeq seq();
}
diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java
index 44cfe28c..c935ebfa 100644
--- a/src/jvm/clojure/lang/StreamSeq.java
+++ b/src/jvm/clojure/lang/StreamSeq.java
@@ -13,11 +13,11 @@
package clojure.lang;
public class StreamSeq extends ASeq {
- IStream stream;
+ AStream stream;
final Object _first;
ISeq _rest;
- static public StreamSeq create(IStream stream) throws Exception {
+ static public StreamSeq create(AStream stream) throws Exception {
Object x = stream.next();
if (RT.isEOS(x))
return null;
@@ -31,7 +31,7 @@ public class StreamSeq extends ASeq {
this.stream = null;
}
- StreamSeq(Object first, IStream stream) {
+ StreamSeq(Object first, AStream stream) {
this._first = first;
this.stream = stream;
}
diff --git a/src/jvm/clojure/lang/Streamable.java b/src/jvm/clojure/lang/Streamable.java
index 67ed4987..68312700 100644
--- a/src/jvm/clojure/lang/Streamable.java
+++ b/src/jvm/clojure/lang/Streamable.java
@@ -13,5 +13,5 @@
package clojure.lang;
public interface Streamable {
- IStream stream() throws Exception;
+ AStream stream() throws Exception;
}