diff options
author | Rich Hickey <richhickey@gmail.com> | 2008-03-13 19:24:06 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2008-03-13 19:24:06 +0000 |
commit | 1527b871c6edb798c7b8877d95864fbba3aca5fc (patch) | |
tree | fd1371fbc0fda04d38827a64e78ff75385a13815 /src | |
parent | 2e300c1a0a82c39d5e6b5766f1e8df422ebf31b1 (diff) |
renamed ! to send, added send-off, *agent* is currently running agent
Diffstat (limited to 'src')
-rw-r--r-- | src/boot.clj | 17 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 136 | ||||
-rw-r--r-- | src/jvm/clojure/lang/RT.java | 2 |
3 files changed, 40 insertions, 115 deletions
diff --git a/src/boot.clj b/src/boot.clj index 2870a450..ae23b3e4 100644 --- a/src/boot.clj +++ b/src/boot.clj @@ -834,18 +834,25 @@ "Creates and returns an agent with an initial value of state." [state] (new clojure.lang.Agent state)) -(defn agent-of - {:private true} - [state] (:agent ^state)) +(defn ! [& args] (throw (new Exception "! is now send. See also send-off"))) -(defn ! +(defn send "Dispatch an action to an agent. Returns the agent immediately. Subsequently, in a thread in a thread pool, the state of the will be set to the value of: (apply action-fn state-of-agent args)" [#^clojure.lang.Agent a f & args] - (. a (dispatch f args))) + (. a (dispatch f args false))) + +(defn send-off + "Dispatch a potentially blocking action to an agent. Returns the + agent immediately. Subsequently, in a separate thread, the state of + the will be set to the value of: + + (apply action-fn state-of-agent args)" + [#^clojure.lang.Agent a f & args] + (. a (dispatch f args true))) (defn agent-errors "Returns a sequence of the exceptions thrown during asynchronous diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index af494e36..0fd4f628 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -17,121 +17,45 @@ import java.util.concurrent.atomic.AtomicReference; public class Agent implements IRef{ volatile Object state; -//final Queue q = new LinkedList(); AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); -//boolean busy = false; volatile ISeq errors = null; -//todo - make tuneable -//final public static ThreadPoolExecutor executor = -// new ThreadPoolExecutor( -// 2 * Runtime.getRuntime().availableProcessors(), -// 2 * Runtime.getRuntime().availableProcessors(), -// 0L, TimeUnit.MILLISECONDS, -// new LinkedBlockingQueue<Runnable>()); -final public static ThreadPoolExecutor executor = - new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), - Integer.MAX_VALUE, - 200L, TimeUnit.MILLISECONDS, - new SynchronousQueue()); - -// new LinkedBlockingQueue<Runnable>()); -//Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); -//final static Executor executor = Executors.newCachedThreadPool(); -final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); - -static class ThreadPool{ - static class Worker implements Runnable{ - Runnable task; - public Worker(Runnable task){ - this.task = task; - } - - public void newTask(Runnable task){ - synchronized(this) - { - this.task = task; - notify(); - } - } - - public void run(){ - for(; ;) - { - task.run(); - synchronized(this) - { - IPersistentStack workers = ThreadPool.workers.get(); - if(workers.count() < ThreadPool.workerReserve) - { - boolean pushed = false; - while(!pushed) - { - IPersistentStack prior = ThreadPool.workers.get(); - IPersistentStack next = (IPersistentStack) prior.cons(this); - pushed = ThreadPool.workers.compareAndSet(prior, next); - } - task = null; - try - { - while(task == null) - wait(); - } - catch(InterruptedException e) - { - break; - } - } - else //have enough reserve workers, die - break; - } - } - } - } +final public static Executor pooledExecutor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - static int workerReserve = 2 * Runtime.getRuntime().availableProcessors(); - static AtomicReference<IPersistentStack> workers = new AtomicReference(PersistentList.EMPTY); +final static Executor soloExecutor = Executors.newCachedThreadPool(); - static void execute(Runnable r){ - IPersistentStack ws = null; - while(ws == null) - { - ws = workers.get(); - if(ws.count() > 0) - { - IPersistentStack popped = ws.pop(); - if(!workers.compareAndSet(ws, popped)) - ws = null; - } - } - if(ws.count() > 0) - { - Worker worker = (Worker) ws.peek(); - worker.newTask(r); - } - else - (new Thread(new Worker(r))).start(); +final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); - } -} static class Action implements Runnable{ final Agent agent; final IFn fn; final ISeq args; + final boolean solo; - public Action(Agent agent, IFn fn, ISeq args){ + public Action(Agent agent, IFn fn, ISeq args, boolean solo){ this.agent = agent; this.args = args; this.fn = fn; + this.solo = solo; + } + + void execute(){ + if(solo) + soloExecutor.execute(this); + else + pooledExecutor.execute(this); } static void doRun(Action action){ - while(action != null) + try { + Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); nested.set(PersistentVector.EMPTY); + boolean hadError = false; try { @@ -162,11 +86,14 @@ static class Action implements Runnable{ popped = action.agent.q.compareAndSet(prior, next); } -// if(next.count() > 0) -// executor.execute((Runnable) next.peek()); -// action = null; - action = (Action) next.peek(); + if(next.count() > 0) + ((Action) next.peek()).execute(); + + } + finally + { nested.set(null); + Var.popThreadBindings(); } } @@ -180,14 +107,6 @@ public Agent(Object state){ } void setState(Object newState){ - if(newState instanceof IObj) - { - IObj o = (IObj) newState; - if(RT.get(o.meta(), RT.AGENT_KEY) != this) - { - newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.AGENT_KEY, this)); - } - } state = newState; } @@ -207,12 +126,12 @@ public void clearErrors(){ errors = null; } -public Object dispatch(IFn fn, ISeq args) throws Exception{ +public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{ if(errors != null) { throw new Exception("Agent has errors", (Exception) RT.first(errors)); } - Action action = new Action(this, fn, args); + Action action = new Action(this, fn, args, solo); LockingTransaction trans = LockingTransaction.getRunning(); if(trans != null) trans.enqueue(action); @@ -236,8 +155,7 @@ void enqueue(Action action){ } if(prior.count() == 0) -// executor.execute(action); - ThreadPool.execute(action); + action.execute(); } } diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java index 7fecdd49..905f0d84 100644 --- a/src/jvm/clojure/lang/RT.java +++ b/src/jvm/clojure/lang/RT.java @@ -107,7 +107,7 @@ final static public Var IN = Var.intern(CLOJURE_NS, Symbol.create("*in*"), new LineNumberingPushbackReader(new InputStreamReader(System.in))); final static Keyword TAG_KEY = Keyword.intern(null, "tag"); -final static Keyword AGENT_KEY = Keyword.intern("clojure", "agent"); +final static public Var AGENT = Var.intern(CLOJURE_NS, Symbol.create("*agent*"), null); static Keyword LINE_KEY = Keyword.intern(null, "line"); static Keyword FILE_KEY = Keyword.intern(null, "file"); //final static public Var CURRENT_MODULE = Var.intern(Symbol.create("clojure", "current-module"), |