diff options
Diffstat (limited to 'src/jvm')
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 5b776a70..2ad5d035 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -14,11 +14,13 @@ package clojure.lang; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; public class Agent implements IRef{ volatile Object state; volatile IFn validator = null; AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); +AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY); volatile ISeq errors = null; @@ -35,7 +37,7 @@ public static void shutdown(){ pooledExecutor.shutdown(); } -static class Action implements Runnable{ +static class Action implements Callable{ final Agent agent; final IFn fn; final ISeq args; @@ -51,21 +53,22 @@ static class Action implements Runnable{ void execute(){ if(solo) - soloExecutor.execute(this); + soloExecutor.submit(this); else - pooledExecutor.execute(this); + pooledExecutor.submit(this); } - static void doRun(Action action){ + static void doRun(Action action) throws Exception{ try { Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); nested.set(PersistentVector.EMPTY); boolean hadError = false; + boolean changed = false; try { - action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); + changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); } catch(Exception e) { @@ -76,6 +79,11 @@ static class Action implements Runnable{ if(!hadError) { + for(Object o : action.agent.watchers.get()) + { + Map.Entry e = (Map.Entry) o; + ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed)); + } for(ISeq s = nested.get().seq(); s != null; s = s.rest()) { Action a = (Action) s.first(); @@ -103,8 +111,9 @@ static class Action implements Runnable{ } } - public void run(){ + public Object call() throws Exception{ doRun(this); + return this; } } @@ -117,9 +126,11 @@ public Agent(Object state, IFn validator) throws Exception{ setState(state); } -void setState(Object newState) throws Exception{ +boolean setState(Object newState) throws Exception{ validate(getValidator(),newState); + boolean ret = state != newState; state = newState; + return ret; } void validate(IFn vf, Object val){ @@ -198,4 +209,28 @@ public int getQueueCount(){ return q.get().count(); } +public Agent addWatch(Object watcher, IFn callback){ + boolean added = false; + IPersistentMap prior = null; + while(!added) + { + prior = watchers.get(); + added = watchers.compareAndSet(prior, prior.assoc(watcher,callback)); + } + + return this; +} + +public Agent removeWatch(Object watcher) throws Exception{ + boolean removed = false; + IPersistentMap prior = null; + while(!removed) + { + prior = watchers.get(); + removed = watchers.compareAndSet(prior, prior.without(watcher)); + } + + return this; +} + } |