summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2008-09-23 13:37:42 +0000
committerRich Hickey <richhickey@gmail.com>2008-09-23 13:37:42 +0000
commit1c2a26ad5d07f6cc4175f86d9aa95656cf76c6f1 (patch)
treec155c75b1d3ace0ba60c196e5adaee9350e9153d /src
parentd25ed01f4c028d04e8134daf584c6d5fcc7b858d (diff)
first cut at agent watches - add-watch/remove-watch
Diffstat (limited to 'src')
-rw-r--r--src/clj/clojure/boot.clj20
-rw-r--r--src/jvm/clojure/lang/Agent.java49
2 files changed, 62 insertions, 7 deletions
diff --git a/src/clj/clojure/boot.clj b/src/clj/clojure/boot.clj
index 366386d7..3d8667c1 100644
--- a/src/clj/clojure/boot.clj
+++ b/src/clj/clojure/boot.clj
@@ -1038,6 +1038,25 @@
[#^clojure.lang.Agent a f & args]
(. a (dispatch f args true)))
+(defn add-watch
+ "Experimental.
+ Adds a watcher to an agent. Whenever the agent runs an action, any
+ registered watchers will have their callback function called. The
+ callback fn will be passed 3 args, the watcher, the agent and a boolean
+ which will be true if the agent's state was (potentially) changed by
+ the action. The callback fn is run synchronously with the action,
+ and thus derefs of the agent in the callback will see the value set
+ during that action. Because it is run on the action thread, the
+ callback should not block, but can send messages."
+ [#^clojure.lang.Agent a watcher callback]
+ (.addWatch a watcher callback))
+
+(defn remove-watch
+ "Experimental.
+ Removes a watcher (set by add-watch) from an agent"
+ [#^clojure.lang.Agent a watcher]
+ (.removeWatch a watcher))
+
(defn agent-errors
"Returns a sequence of the exceptions thrown during asynchronous
actions of the agent."
@@ -3484,3 +3503,4 @@
(defmethod print-method java.util.regex.Pattern [p #^Writer w]
(.append w \#)
(print-method (str p) w))
+
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 5b776a70..2ad5d035 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -14,11 +14,13 @@ package clojure.lang;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map;
public class Agent implements IRef{
volatile Object state;
volatile IFn validator = null;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
+AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
volatile ISeq errors = null;
@@ -35,7 +37,7 @@ public static void shutdown(){
pooledExecutor.shutdown();
}
-static class Action implements Runnable{
+static class Action implements Callable{
final Agent agent;
final IFn fn;
final ISeq args;
@@ -51,21 +53,22 @@ static class Action implements Runnable{
void execute(){
if(solo)
- soloExecutor.execute(this);
+ soloExecutor.submit(this);
else
- pooledExecutor.execute(this);
+ pooledExecutor.submit(this);
}
- static void doRun(Action action){
+ static void doRun(Action action) throws Exception{
try
{
Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);
boolean hadError = false;
+ boolean changed = false;
try
{
- action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
+ changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
}
catch(Exception e)
{
@@ -76,6 +79,11 @@ static class Action implements Runnable{
if(!hadError)
{
+ for(Object o : action.agent.watchers.get())
+ {
+ Map.Entry e = (Map.Entry) o;
+ ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed));
+ }
for(ISeq s = nested.get().seq(); s != null; s = s.rest())
{
Action a = (Action) s.first();
@@ -103,8 +111,9 @@ static class Action implements Runnable{
}
}
- public void run(){
+ public Object call() throws Exception{
doRun(this);
+ return this;
}
}
@@ -117,9 +126,11 @@ public Agent(Object state, IFn validator) throws Exception{
setState(state);
}
-void setState(Object newState) throws Exception{
+boolean setState(Object newState) throws Exception{
validate(getValidator(),newState);
+ boolean ret = state != newState;
state = newState;
+ return ret;
}
void validate(IFn vf, Object val){
@@ -198,4 +209,28 @@ public int getQueueCount(){
return q.get().count();
}
+public Agent addWatch(Object watcher, IFn callback){
+ boolean added = false;
+ IPersistentMap prior = null;
+ while(!added)
+ {
+ prior = watchers.get();
+ added = watchers.compareAndSet(prior, prior.assoc(watcher,callback));
+ }
+
+ return this;
+}
+
+public Agent removeWatch(Object watcher) throws Exception{
+ boolean removed = false;
+ IPersistentMap prior = null;
+ while(!removed)
+ {
+ prior = watchers.get();
+ removed = watchers.compareAndSet(prior, prior.without(watcher));
+ }
+
+ return this;
+}
+
}