summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-11-18 02:16:49 +0000
committerRich Hickey <richhickey@gmail.com>2007-11-18 02:16:49 +0000
commit079b143bf2a7bf563b407d4e1dd688988bd2cf04 (patch)
treeb494a3cda03c8b67575dd770f4b075a0849248a9 /src
parent540e1195c330298ecb3a29287643a54b59278522 (diff)
interim checkin, actors
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/Actor.java119
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java17
2 files changed, 136 insertions, 0 deletions
diff --git a/src/jvm/clojure/lang/Actor.java b/src/jvm/clojure/lang/Actor.java
new file mode 100644
index 00000000..7c532f51
--- /dev/null
+++ b/src/jvm/clojure/lang/Actor.java
@@ -0,0 +1,119 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Nov 17, 2007 */
+
+package clojure.lang;
+
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class Actor extends RestFn{
+volatile Object state;
+final Queue q = new LinkedList();
+boolean busy = false;
+
+//todo - make tuneable
+final public static Queue errors = new LinkedBlockingQueue();
+final static Executor executor = Executors.newCachedThreadPool();
+final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
+
+static class Action implements Runnable{
+ final Actor actor;
+ final IFn fn;
+ final ISeq args;
+
+
+ public Action(Actor actor, IFn fn, ISeq args){
+ this.actor = actor;
+ this.args = args;
+ this.fn = fn;
+ }
+
+ public void run(){
+ nested.set(PersistentVector.EMPTY);
+ boolean hadError = false;
+ try
+ {
+ actor.state = fn.applyTo(RT.cons(actor, args));
+ }
+ catch(Exception e)
+ {
+ //todo report/callback
+ errors.add(e);
+ hadError = true;
+ }
+
+ if(!hadError)
+ {
+ for(ISeq s = nested.get().seq(); s != null; s = s.rest())
+ {
+ Action a = (Action) s.first();
+ a.actor.enqueue(a);
+ }
+ }
+
+ synchronized(actor)
+ {
+ if(!actor.q.isEmpty())
+ {
+ executor.execute((Runnable) actor.q.remove());
+ }
+ else
+ {
+ actor.busy = false;
+ }
+ }
+
+ nested.set(null);
+ }
+}
+
+public Actor(Object state){
+ super(1);
+ this.state = state;
+}
+
+public Object getState(){
+ return state;
+}
+
+public Object doInvoke(Object fn, Object args){
+ Action action = new Action(this, (IFn) fn, (ISeq) args);
+ LockingTransaction trans = LockingTransaction.getRunning();
+ if(trans != null)
+ trans.enqueue(action);
+ else if(nested.get() != null)
+ {
+ nested.set(nested.get().cons(action));
+ }
+ else
+ enqueue(action);
+
+ return this;
+}
+
+void enqueue(Action action){
+ synchronized(this)
+ {
+ if(busy)
+ q.add(action);
+ else
+ {
+ busy = true;
+ executor.execute(action);
+ }
+ }
+}
+
+}
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 84372f35..21baac6a 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -32,6 +32,7 @@ static final int COMMITTED = 4;
final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();
+
static class RetryException extends Error{
}
@@ -77,6 +78,7 @@ void stop(int status){
vals.clear();
sets.clear();
commutes.clear();
+ actions.clear();
}
}
@@ -86,6 +88,7 @@ long readPoint;
long startPoint;
long startTime;
final RetryException retryex = new RetryException();
+final ArrayList<Actor.Action> actions = new ArrayList<Actor.Action>();
final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
final HashSet<Ref> sets = new HashSet<Ref>();
final TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>();
@@ -168,6 +171,13 @@ static LockingTransaction getEx(){
return t;
}
+static LockingTransaction getRunning(){
+ LockingTransaction t = transaction.get();
+ if(t == null || t.info == null)
+ return null;
+ return t;
+}
+
static public Object runInTransaction(IFn fn) throws Exception{
LockingTransaction t = transaction.get();
if(t == null)
@@ -251,6 +261,10 @@ Object run(IFn fn) throws Exception{
ref.tvals.msecs = msecs;
}
}
+ for(Actor.Action action : actions)
+ {
+ action.actor.enqueue(action);
+ }
done = true;
info.status.set(COMMITTED);
}
@@ -274,6 +288,9 @@ Object run(IFn fn) throws Exception{
return ret;
}
+public void enqueue(Actor.Action action){
+ actions.add(action);
+}
Object doGet(Ref ref){
if(!info.running())