diff options
author | Rich Hickey <richhickey@gmail.com> | 2008-09-23 13:37:42 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2008-09-23 13:37:42 +0000 |
commit | 1c2a26ad5d07f6cc4175f86d9aa95656cf76c6f1 (patch) | |
tree | c155c75b1d3ace0ba60c196e5adaee9350e9153d /src | |
parent | d25ed01f4c028d04e8134daf584c6d5fcc7b858d (diff) |
first cut at agent watches - add-watch/remove-watch
Diffstat (limited to 'src')
-rw-r--r-- | src/clj/clojure/boot.clj | 20 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 49 |
2 files changed, 62 insertions, 7 deletions
diff --git a/src/clj/clojure/boot.clj b/src/clj/clojure/boot.clj index 366386d7..3d8667c1 100644 --- a/src/clj/clojure/boot.clj +++ b/src/clj/clojure/boot.clj @@ -1038,6 +1038,25 @@ [#^clojure.lang.Agent a f & args] (. a (dispatch f args true))) +(defn add-watch + "Experimental. + Adds a watcher to an agent. Whenever the agent runs an action, any + registered watchers will have their callback function called. The + callback fn will be passed 3 args, the watcher, the agent and a boolean + which will be true if the agent's state was (potentially) changed by + the action. The callback fn is run synchronously with the action, + and thus derefs of the agent in the callback will see the value set + during that action. Because it is run on the action thread, the + callback should not block, but can send messages." + [#^clojure.lang.Agent a watcher callback] + (.addWatch a watcher callback)) + +(defn remove-watch + "Experimental. + Removes a watcher (set by add-watch) from an agent" + [#^clojure.lang.Agent a watcher] + (.removeWatch a watcher)) + (defn agent-errors "Returns a sequence of the exceptions thrown during asynchronous actions of the agent." @@ -3484,3 +3503,4 @@ (defmethod print-method java.util.regex.Pattern [p #^Writer w] (.append w \#) (print-method (str p) w)) + diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 5b776a70..2ad5d035 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -14,11 +14,13 @@ package clojure.lang; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; public class Agent implements IRef{ volatile Object state; volatile IFn validator = null; AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); +AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY); volatile ISeq errors = null; @@ -35,7 +37,7 @@ public static void shutdown(){ pooledExecutor.shutdown(); } -static class Action implements Runnable{ +static class Action implements Callable{ final Agent agent; final IFn fn; final ISeq args; @@ -51,21 +53,22 @@ static class Action implements Runnable{ void execute(){ if(solo) - soloExecutor.execute(this); + soloExecutor.submit(this); else - pooledExecutor.execute(this); + pooledExecutor.submit(this); } - static void doRun(Action action){ + static void doRun(Action action) throws Exception{ try { Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); nested.set(PersistentVector.EMPTY); boolean hadError = false; + boolean changed = false; try { - action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); + changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); } catch(Exception e) { @@ -76,6 +79,11 @@ static class Action implements Runnable{ if(!hadError) { + for(Object o : action.agent.watchers.get()) + { + Map.Entry e = (Map.Entry) o; + ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed)); + } for(ISeq s = nested.get().seq(); s != null; s = s.rest()) { Action a = (Action) s.first(); @@ -103,8 +111,9 @@ static class Action implements Runnable{ } } - public void run(){ + public Object call() throws Exception{ doRun(this); + return this; } } @@ -117,9 +126,11 @@ public Agent(Object state, IFn validator) throws Exception{ setState(state); } -void setState(Object newState) throws Exception{ +boolean setState(Object newState) throws Exception{ validate(getValidator(),newState); + boolean ret = state != newState; state = newState; + return ret; } void validate(IFn vf, Object val){ @@ -198,4 +209,28 @@ public int getQueueCount(){ return q.get().count(); } +public Agent addWatch(Object watcher, IFn callback){ + boolean added = false; + IPersistentMap prior = null; + while(!added) + { + prior = watchers.get(); + added = watchers.compareAndSet(prior, prior.assoc(watcher,callback)); + } + + return this; +} + +public Agent removeWatch(Object watcher) throws Exception{ + boolean removed = false; + IPersistentMap prior = null; + while(!removed) + { + prior = watchers.get(); + removed = watchers.compareAndSet(prior, prior.without(watcher)); + } + + return this; +} + } |