diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-11-18 02:16:49 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-11-18 02:16:49 +0000 |
commit | 079b143bf2a7bf563b407d4e1dd688988bd2cf04 (patch) | |
tree | b494a3cda03c8b67575dd770f4b075a0849248a9 /src | |
parent | 540e1195c330298ecb3a29287643a54b59278522 (diff) |
interim checkin, actors
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/Actor.java | 119 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 17 |
2 files changed, 136 insertions, 0 deletions
diff --git a/src/jvm/clojure/lang/Actor.java b/src/jvm/clojure/lang/Actor.java new file mode 100644 index 00000000..7c532f51 --- /dev/null +++ b/src/jvm/clojure/lang/Actor.java @@ -0,0 +1,119 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Nov 17, 2007 */ + +package clojure.lang; + +import java.util.Queue; +import java.util.LinkedList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class Actor extends RestFn{ +volatile Object state; +final Queue q = new LinkedList(); +boolean busy = false; + +//todo - make tuneable +final public static Queue errors = new LinkedBlockingQueue(); +final static Executor executor = Executors.newCachedThreadPool(); +final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); + +static class Action implements Runnable{ + final Actor actor; + final IFn fn; + final ISeq args; + + + public Action(Actor actor, IFn fn, ISeq args){ + this.actor = actor; + this.args = args; + this.fn = fn; + } + + public void run(){ + nested.set(PersistentVector.EMPTY); + boolean hadError = false; + try + { + actor.state = fn.applyTo(RT.cons(actor, args)); + } + catch(Exception e) + { + //todo report/callback + errors.add(e); + hadError = true; + } + + if(!hadError) + { + for(ISeq s = nested.get().seq(); s != null; s = s.rest()) + { + Action a = (Action) s.first(); + a.actor.enqueue(a); + } + } + + synchronized(actor) + { + if(!actor.q.isEmpty()) + { + executor.execute((Runnable) actor.q.remove()); + } + else + { + actor.busy = false; + } + } + + nested.set(null); + } +} + +public Actor(Object state){ + super(1); + this.state = state; +} + +public Object getState(){ + return state; +} + +public Object doInvoke(Object fn, Object args){ + Action action = new Action(this, (IFn) fn, (ISeq) args); + LockingTransaction trans = LockingTransaction.getRunning(); + if(trans != null) + trans.enqueue(action); + else if(nested.get() != null) + { + nested.set(nested.get().cons(action)); + } + else + enqueue(action); + + return this; +} + +void enqueue(Action action){ + synchronized(this) + { + if(busy) + q.add(action); + else + { + busy = true; + executor.execute(action); + } + } +} + +} diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 84372f35..21baac6a 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -32,6 +32,7 @@ static final int COMMITTED = 4; final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>(); + static class RetryException extends Error{ } @@ -77,6 +78,7 @@ void stop(int status){ vals.clear(); sets.clear(); commutes.clear(); + actions.clear(); } } @@ -86,6 +88,7 @@ long readPoint; long startPoint; long startTime; final RetryException retryex = new RetryException(); +final ArrayList<Actor.Action> actions = new ArrayList<Actor.Action>(); final HashMap<Ref, Object> vals = new HashMap<Ref, Object>(); final HashSet<Ref> sets = new HashSet<Ref>(); final TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>(); @@ -168,6 +171,13 @@ static LockingTransaction getEx(){ return t; } +static LockingTransaction getRunning(){ + LockingTransaction t = transaction.get(); + if(t == null || t.info == null) + return null; + return t; +} + static public Object runInTransaction(IFn fn) throws Exception{ LockingTransaction t = transaction.get(); if(t == null) @@ -251,6 +261,10 @@ Object run(IFn fn) throws Exception{ ref.tvals.msecs = msecs; } } + for(Actor.Action action : actions) + { + action.actor.enqueue(action); + } done = true; info.status.set(COMMITTED); } @@ -274,6 +288,9 @@ Object run(IFn fn) throws Exception{ return ret; } +public void enqueue(Actor.Action action){ + actions.add(action); +} Object doGet(Ref ref){ if(!info.running()) |