diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-12-11 21:03:27 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-12-11 21:03:27 +0000 |
commit | c2d824b9bf74523156a47ffae27eb2a7c76b3074 (patch) | |
tree | 0697e4b260261468377bc6211efd14574716d3fb | |
parent | bb0c28d8a30c12ba0aedab6f902ef869b957fba8 (diff) |
lock-free agents
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 98 |
1 files changed, 49 insertions, 49 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index a02b8fc4..68e3afa6 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -15,20 +15,23 @@ package clojure.lang; import java.util.Queue; import java.util.LinkedList; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; public class Agent implements IRef{ volatile Object state; //final Queue q = new LinkedList(); -IPersistentStack q = PersistentQueue.EMPTY; -boolean busy = false; +AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); +//boolean busy = false; volatile ISeq errors = null; //todo - make tuneable final public static ThreadPoolExecutor executor = - new ThreadPoolExecutor(2 * Runtime.getRuntime().availableProcessors(), - 2 * Runtime.getRuntime().availableProcessors(), - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>()); + new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), +//2 * Runtime.getRuntime().availableProcessors(), +Integer.MAX_VALUE,//2 * Runtime.getRuntime().availableProcessors(), +200L, TimeUnit.MILLISECONDS, +new SynchronousQueue()); +// new LinkedBlockingQueue<Runnable>()); //Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); //final static Executor executor = Executors.newCachedThreadPool(); final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); @@ -45,48 +48,49 @@ static class Action implements Runnable{ this.fn = fn; } - public void run(){ - nested.set(PersistentVector.EMPTY); - boolean hadError = false; - try - { - agent.setState(fn.applyTo(RT.cons(agent.state, args))); - } - catch(Exception e) + static void doRun(Action action){ + while(action != null) { - //todo report/callback - agent.errors = RT.cons(e, agent.errors); - hadError = true; - } - - if(!hadError) - { - for(ISeq s = nested.get().seq(); s != null; s = s.rest()) + nested.set(PersistentVector.EMPTY); + boolean hadError = false; + try { - Action a = (Action) s.first(); - a.agent.enqueue(a); + action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); + } + catch(Exception e) + { + //todo report/callback + action.agent.errors = RT.cons(e, action.agent.errors); + hadError = true; } - } - Runnable exec = null; - synchronized(agent) - { - if(agent.q.count() > 0) -// if(!agent.q.isEmpty()) + if(!hadError) { -// exec = (Runnable) agent.q.remove(); - exec = (Runnable) agent.q.peek(); - agent.q = agent.q.pop(); + for(ISeq s = nested.get().seq(); s != null; s = s.rest()) + { + Action a = (Action) s.first(); + a.agent.enqueue(a); + } } - else + + boolean popped = false; + IPersistentStack next = null; + while(!popped) { - agent.busy = false; + IPersistentStack prior = action.agent.q.get(); + next = prior.pop(); + popped = action.agent.q.compareAndSet(prior, next); } + +// if(next.count() > 0) +// executor.execute((Runnable) next.peek()); + action = (Action) next.peek(); + nested.set(null); } - if(exec != null) - executor.execute(exec); + } - nested.set(null); + public void run(){ + doRun(this); } } @@ -142,19 +146,15 @@ public Object dispatch(IFn fn, ISeq args) throws Exception{ } void enqueue(Action action){ - boolean exec = false; - synchronized(this) + boolean queued = false; + IPersistentStack prior = null; + while(!queued) { - if(busy) - // q.add(action); - q = (IPersistentStack) q.cons(action); - else - { - busy = true; - exec = true; - } + prior = q.get(); + queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action)); } - if(exec) + + if(prior.count() == 0) executor.execute(action); } |