summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-12-11 21:03:27 +0000
committerRich Hickey <richhickey@gmail.com>2007-12-11 21:03:27 +0000
commitc2d824b9bf74523156a47ffae27eb2a7c76b3074 (patch)
tree0697e4b260261468377bc6211efd14574716d3fb
parentbb0c28d8a30c12ba0aedab6f902ef869b957fba8 (diff)
lock-free agents
-rw-r--r--src/jvm/clojure/lang/Agent.java98
1 files changed, 49 insertions, 49 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index a02b8fc4..68e3afa6 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -15,20 +15,23 @@ package clojure.lang;
import java.util.Queue;
import java.util.LinkedList;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicReference;
public class Agent implements IRef{
volatile Object state;
//final Queue q = new LinkedList();
-IPersistentStack q = PersistentQueue.EMPTY;
-boolean busy = false;
+AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
+//boolean busy = false;
volatile ISeq errors = null;
//todo - make tuneable
final public static ThreadPoolExecutor executor =
- new ThreadPoolExecutor(2 * Runtime.getRuntime().availableProcessors(),
- 2 * Runtime.getRuntime().availableProcessors(),
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+//2 * Runtime.getRuntime().availableProcessors(),
+Integer.MAX_VALUE,//2 * Runtime.getRuntime().availableProcessors(),
+200L, TimeUnit.MILLISECONDS,
+new SynchronousQueue());
+// new LinkedBlockingQueue<Runnable>());
//Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
//final static Executor executor = Executors.newCachedThreadPool();
final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
@@ -45,48 +48,49 @@ static class Action implements Runnable{
this.fn = fn;
}
- public void run(){
- nested.set(PersistentVector.EMPTY);
- boolean hadError = false;
- try
- {
- agent.setState(fn.applyTo(RT.cons(agent.state, args)));
- }
- catch(Exception e)
+ static void doRun(Action action){
+ while(action != null)
{
- //todo report/callback
- agent.errors = RT.cons(e, agent.errors);
- hadError = true;
- }
-
- if(!hadError)
- {
- for(ISeq s = nested.get().seq(); s != null; s = s.rest())
+ nested.set(PersistentVector.EMPTY);
+ boolean hadError = false;
+ try
{
- Action a = (Action) s.first();
- a.agent.enqueue(a);
+ action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
+ }
+ catch(Exception e)
+ {
+ //todo report/callback
+ action.agent.errors = RT.cons(e, action.agent.errors);
+ hadError = true;
}
- }
- Runnable exec = null;
- synchronized(agent)
- {
- if(agent.q.count() > 0)
-// if(!agent.q.isEmpty())
+ if(!hadError)
{
-// exec = (Runnable) agent.q.remove();
- exec = (Runnable) agent.q.peek();
- agent.q = agent.q.pop();
+ for(ISeq s = nested.get().seq(); s != null; s = s.rest())
+ {
+ Action a = (Action) s.first();
+ a.agent.enqueue(a);
+ }
}
- else
+
+ boolean popped = false;
+ IPersistentStack next = null;
+ while(!popped)
{
- agent.busy = false;
+ IPersistentStack prior = action.agent.q.get();
+ next = prior.pop();
+ popped = action.agent.q.compareAndSet(prior, next);
}
+
+// if(next.count() > 0)
+// executor.execute((Runnable) next.peek());
+ action = (Action) next.peek();
+ nested.set(null);
}
- if(exec != null)
- executor.execute(exec);
+ }
- nested.set(null);
+ public void run(){
+ doRun(this);
}
}
@@ -142,19 +146,15 @@ public Object dispatch(IFn fn, ISeq args) throws Exception{
}
void enqueue(Action action){
- boolean exec = false;
- synchronized(this)
+ boolean queued = false;
+ IPersistentStack prior = null;
+ while(!queued)
{
- if(busy)
- // q.add(action);
- q = (IPersistentStack) q.cons(action);
- else
- {
- busy = true;
- exec = true;
- }
+ prior = q.get();
+ queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action));
}
- if(exec)
+
+ if(prior.count() == 0)
executor.execute(action);
}