summaryrefslogtreecommitdiff
path: root/src/jvm/clojure/lang/Agent.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/jvm/clojure/lang/Agent.java')
-rw-r--r--src/jvm/clojure/lang/Agent.java49
1 files changed, 42 insertions, 7 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 5b776a70..2ad5d035 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -14,11 +14,13 @@ package clojure.lang;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map;
public class Agent implements IRef{
volatile Object state;
volatile IFn validator = null;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
+AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
volatile ISeq errors = null;
@@ -35,7 +37,7 @@ public static void shutdown(){
pooledExecutor.shutdown();
}
-static class Action implements Runnable{
+static class Action implements Callable{
final Agent agent;
final IFn fn;
final ISeq args;
@@ -51,21 +53,22 @@ static class Action implements Runnable{
void execute(){
if(solo)
- soloExecutor.execute(this);
+ soloExecutor.submit(this);
else
- pooledExecutor.execute(this);
+ pooledExecutor.submit(this);
}
- static void doRun(Action action){
+ static void doRun(Action action) throws Exception{
try
{
Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);
boolean hadError = false;
+ boolean changed = false;
try
{
- action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
+ changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
}
catch(Exception e)
{
@@ -76,6 +79,11 @@ static class Action implements Runnable{
if(!hadError)
{
+ for(Object o : action.agent.watchers.get())
+ {
+ Map.Entry e = (Map.Entry) o;
+ ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed));
+ }
for(ISeq s = nested.get().seq(); s != null; s = s.rest())
{
Action a = (Action) s.first();
@@ -103,8 +111,9 @@ static class Action implements Runnable{
}
}
- public void run(){
+ public Object call() throws Exception{
doRun(this);
+ return this;
}
}
@@ -117,9 +126,11 @@ public Agent(Object state, IFn validator) throws Exception{
setState(state);
}
-void setState(Object newState) throws Exception{
+boolean setState(Object newState) throws Exception{
validate(getValidator(),newState);
+ boolean ret = state != newState;
state = newState;
+ return ret;
}
void validate(IFn vf, Object val){
@@ -198,4 +209,28 @@ public int getQueueCount(){
return q.get().count();
}
+public Agent addWatch(Object watcher, IFn callback){
+ boolean added = false;
+ IPersistentMap prior = null;
+ while(!added)
+ {
+ prior = watchers.get();
+ added = watchers.compareAndSet(prior, prior.assoc(watcher,callback));
+ }
+
+ return this;
+}
+
+public Agent removeWatch(Object watcher) throws Exception{
+ boolean removed = false;
+ IPersistentMap prior = null;
+ while(!removed)
+ {
+ prior = watchers.get();
+ removed = watchers.compareAndSet(prior, prior.without(watcher));
+ }
+
+ return this;
+}
+
}