summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2008-12-11 20:27:17 +0000
committerRich Hickey <richhickey@gmail.com>2008-12-11 20:27:17 +0000
commit14d49b3eafae4a2095f5b80e47afa2b2878567ea (patch)
treef2170f3960b42cc217355a9e56ec3e4a0a96c4c8
parent8aaeff138f515fff4291fe61330b5f69cb7e3ce9 (diff)
release coll on nth of seq, stream work in progress
-rw-r--r--src/jvm/clojure/lang/APersistentVector.java17
-rw-r--r--src/jvm/clojure/lang/ASeq.java24
-rw-r--r--src/jvm/clojure/lang/ArrayStream.java194
-rw-r--r--src/jvm/clojure/lang/IStream.java17
-rw-r--r--src/jvm/clojure/lang/IteratorStream.java29
-rw-r--r--src/jvm/clojure/lang/Range.java15
-rw-r--r--src/jvm/clojure/lang/StreamSeq.java64
-rw-r--r--src/jvm/clojure/lang/Streamable.java17
8 files changed, 373 insertions, 4 deletions
diff --git a/src/jvm/clojure/lang/APersistentVector.java b/src/jvm/clojure/lang/APersistentVector.java
index e57aae43..9d77f713 100644
--- a/src/jvm/clojure/lang/APersistentVector.java
+++ b/src/jvm/clojure/lang/APersistentVector.java
@@ -13,10 +13,11 @@
package clojure.lang;
import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable,
List,
- RandomAccess, Comparable{
+ RandomAccess, Comparable, Streamable{
int _hash = -1;
public APersistentVector(IPersistentMap meta){
@@ -357,7 +358,19 @@ public int compareTo(Object o){
return 0;
}
-static class Seq extends ASeq implements IndexedSeq, IReduce{
+public IStream stream() throws Exception {
+ final AtomicInteger ai = new AtomicInteger(0);
+ return new IStream(){
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if(i < count())
+ return nth(i);
+ return RT.eos();
+ }
+ };
+}
+
+ static class Seq extends ASeq implements IndexedSeq, IReduce{
//todo - something more efficient
final IPersistentVector v;
final int i;
diff --git a/src/jvm/clojure/lang/ASeq.java b/src/jvm/clojure/lang/ASeq.java
index a7c42639..cdb3ac28 100644
--- a/src/jvm/clojure/lang/ASeq.java
+++ b/src/jvm/clojure/lang/ASeq.java
@@ -12,8 +12,9 @@ package clojure.lang;
import java.util.Collection;
import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicReference;
-public abstract class ASeq extends Obj implements ISeq, Collection{
+public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
transient int _hash = -1;
public String toString(){
@@ -175,4 +176,25 @@ public Iterator iterator(){
return new SeqIterator(this);
}
+public IStream stream() throws Exception {
+ return new Stream(this);
+}
+
+ static class Stream implements IStream{
+ ISeq s;
+
+ public Stream(ISeq s) {
+ this.s = s;
+ }
+
+ synchronized public Object next() throws Exception {
+ if(s != null)
+ {
+ Object ret = s.first();
+ s = s.rest();
+ return ret;
+ }
+ return RT.eos();
+ }
+}
}
diff --git a/src/jvm/clojure/lang/ArrayStream.java b/src/jvm/clojure/lang/ArrayStream.java
new file mode 100644
index 00000000..d4457e3c
--- /dev/null
+++ b/src/jvm/clojure/lang/ArrayStream.java
@@ -0,0 +1,194 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 7, 2008 */
+
+package clojure.lang;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.lang.reflect.Array;
+
+public class ArrayStream implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final Object[] array;
+
+ public ArrayStream(Object[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+
+ static IStream createFromObject(Object array) {
+ Class aclass = array.getClass().getComponentType();
+ if (!aclass.isPrimitive())
+ return new ArrayStream((Object[]) array);
+ if (aclass == int.class)
+ return new ArrayStream_int((int[]) array);
+ if (aclass == long.class)
+ return new ArrayStream_long((long[]) array);
+ if (aclass == float.class)
+ return new ArrayStream_float((float[]) array);
+ if (aclass == double.class)
+ return new ArrayStream_double((double[]) array);
+ if (aclass == char.class)
+ return new ArrayStream_char((char[]) array);
+ if (aclass == byte.class)
+ return new ArrayStream_byte((byte[]) array);
+ if (aclass == short.class)
+ return new ArrayStream_short((short[]) array);
+ if (aclass == boolean.class)
+ return new ArrayStream_boolean((boolean[]) array);
+ throw new IllegalArgumentException(String.format("Unsupported array type %s", array));
+ }
+
+ static public class ArrayStream_int implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final int[] array;
+
+ public ArrayStream_int(int[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_long implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final long[] array;
+
+ public ArrayStream_long(long[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_float implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final float[] array;
+
+ public ArrayStream_float(float[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_double implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final double[] array;
+
+ public ArrayStream_double(double[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_char implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final char[] array;
+
+ public ArrayStream_char(char[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_byte implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final byte[] array;
+
+ public ArrayStream_byte(byte[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_short implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final short[] array;
+
+ public ArrayStream_short(short[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+ static public class ArrayStream_boolean implements IStream {
+
+ final AtomicInteger ai = new AtomicInteger(0);
+ final boolean[] array;
+
+ public ArrayStream_boolean(boolean[] array) {
+ this.array = array;
+ }
+
+ public Object next() throws Exception {
+ int i = ai.getAndIncrement();
+ if (i < array.length)
+ return array[i];
+ return RT.eos();
+ }
+ }
+
+
+}
diff --git a/src/jvm/clojure/lang/IStream.java b/src/jvm/clojure/lang/IStream.java
new file mode 100644
index 00000000..45d17a6c
--- /dev/null
+++ b/src/jvm/clojure/lang/IStream.java
@@ -0,0 +1,17 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 7, 2008 */
+
+package clojure.lang;
+
+public interface IStream {
+ Object next() throws Exception;
+}
diff --git a/src/jvm/clojure/lang/IteratorStream.java b/src/jvm/clojure/lang/IteratorStream.java
new file mode 100644
index 00000000..f7d4bc35
--- /dev/null
+++ b/src/jvm/clojure/lang/IteratorStream.java
@@ -0,0 +1,29 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 7, 2008 */
+
+package clojure.lang;
+
+import java.util.Iterator;
+
+public class IteratorStream implements IStream{
+ final Iterator iter;
+
+ public IteratorStream(Iterator iter) {
+ this.iter = iter;
+ }
+
+ synchronized public Object next() throws Exception {
+ if(iter.hasNext())
+ return iter.next();
+ return RT.eos();
+ }
+}
diff --git a/src/jvm/clojure/lang/Range.java b/src/jvm/clojure/lang/Range.java
index ccb98b19..95b38bcc 100644
--- a/src/jvm/clojure/lang/Range.java
+++ b/src/jvm/clojure/lang/Range.java
@@ -12,7 +12,9 @@
package clojure.lang;
-public class Range extends ASeq implements IReduce{
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Range extends ASeq implements IReduce, Streamable{
final int end;
final int n;
@@ -57,4 +59,15 @@ public Object reduce(IFn f, Object start) throws Exception{
return ret;
}
+public IStream stream() throws Exception {
+ final AtomicInteger an = new AtomicInteger(n);
+ return new IStream(){
+ public Object next() throws Exception {
+ int i = an.getAndIncrement();
+ if (i < end)
+ return i;
+ return RT.eos();
+ }
+ };
+}
}
diff --git a/src/jvm/clojure/lang/StreamSeq.java b/src/jvm/clojure/lang/StreamSeq.java
new file mode 100644
index 00000000..b39f1392
--- /dev/null
+++ b/src/jvm/clojure/lang/StreamSeq.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 8, 2008 */
+
+package clojure.lang;
+
+public class StreamSeq extends ASeq {
+ IStream stream;
+ final Object _first;
+ ISeq _rest;
+
+ static public StreamSeq create(IStream stream) throws Exception {
+ Object x = stream.next();
+ if (RT.isEOS(x))
+ return null;
+ return new StreamSeq(x, stream);
+ }
+
+ StreamSeq(IPersistentMap meta, Object _first, ISeq _rest) {
+ super(meta);
+ this._first = _first;
+ this._rest = _rest;
+ this.stream = null;
+ }
+
+ StreamSeq(Object first, IStream stream) {
+ this._first = first;
+ this.stream = stream;
+ }
+
+
+ public Object first() {
+ return _first;
+ }
+
+ synchronized public ISeq rest() {
+ if (stream != null) {
+ try {
+ _rest = create(stream);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ stream = null;
+ }
+ return _rest;
+ }
+
+ public Obj withMeta(IPersistentMap meta) {
+ if(meta != this.meta())
+ {
+ rest();
+ return new StreamSeq(meta, _first, _rest);
+ }
+ return this;
+ }
+}
diff --git a/src/jvm/clojure/lang/Streamable.java b/src/jvm/clojure/lang/Streamable.java
new file mode 100644
index 00000000..81f17b75
--- /dev/null
+++ b/src/jvm/clojure/lang/Streamable.java
@@ -0,0 +1,17 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Dec 7, 2008 */
+
+package clojure.lang;
+
+public interface Streamable {
+ IStream stream() throws Exception;
+}