summaryrefslogtreecommitdiff
path: root/src/jvm/clojure/lang/Agent.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/jvm/clojure/lang/Agent.java')
-rw-r--r--src/jvm/clojure/lang/Agent.java146
1 files changed, 146 insertions, 0 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
new file mode 100644
index 00000000..f886de62
--- /dev/null
+++ b/src/jvm/clojure/lang/Agent.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Nov 17, 2007 */
+
+package clojure.lang;
+
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class Agent implements Ref{
+volatile Object state;
+final Queue q = new LinkedList();
+boolean busy = false;
+
+volatile ISeq errors = null;
+//todo - make tuneable
+final static Executor executor = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
+//final static Executor executor = Executors.newCachedThreadPool();
+final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
+
+static class Action implements Runnable{
+ final Agent agent;
+ final IFn fn;
+ final ISeq args;
+
+
+ public Action(Agent agent, IFn fn, ISeq args){
+ this.agent = agent;
+ this.args = args;
+ this.fn = fn;
+ }
+
+ public void run(){
+ nested.set(PersistentVector.EMPTY);
+ boolean hadError = false;
+ try
+ {
+ agent.setState(fn.applyTo(RT.cons(agent.state, args)));
+ }
+ catch(Exception e)
+ {
+ //todo report/callback
+ agent.errors = RT.cons(e, agent.errors);
+ hadError = true;
+ }
+
+ if(!hadError)
+ {
+ for(ISeq s = nested.get().seq(); s != null; s = s.rest())
+ {
+ Action a = (Action) s.first();
+ a.agent.enqueue(a);
+ }
+ }
+
+ synchronized(agent)
+ {
+ if(!agent.q.isEmpty())
+ {
+ executor.execute((Runnable) agent.q.remove());
+ }
+ else
+ {
+ agent.busy = false;
+ }
+ }
+
+ nested.set(null);
+ }
+}
+
+public Agent(Object state){
+ setState(state);
+}
+
+void setState(Object newState){
+ if(newState instanceof IObj)
+ {
+ IObj o = (IObj) newState;
+ if(RT.get(o.meta(), RT.AGENT_KEY) != this)
+ {
+ newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.AGENT_KEY, this));
+ }
+ }
+ state = newState;
+}
+
+public Object get() throws Exception{
+ if(errors != null)
+ {
+ throw new Exception("Agent has errors", (Exception) RT.first(errors));
+ }
+ return state;
+}
+
+public ISeq getErrors(){
+ return errors;
+}
+
+public void clearErrors(){
+ errors = null;
+}
+
+public Object dispatch(IFn fn, ISeq args) throws Exception{
+ if(errors != null)
+ {
+ throw new Exception("Agent has errors", (Exception) RT.first(errors));
+ }
+ Action action = new Action(this, fn, args);
+ LockingTransaction trans = LockingTransaction.getRunning();
+ if(trans != null)
+ trans.enqueue(action);
+ else if(nested.get() != null)
+ {
+ nested.set(nested.get().cons(action));
+ }
+ else
+ enqueue(action);
+
+ return this;
+}
+
+void enqueue(Action action){
+ synchronized(this)
+ {
+ if(busy)
+ q.add(action);
+ else
+ {
+ busy = true;
+ executor.execute(action);
+ }
+ }
+}
+
+}