summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-01-02 02:58:51 +0000
committerRich Hickey <richhickey@gmail.com>2009-01-02 02:58:51 +0000
commite26c24712a9e0f94e25aa6e9bdadea7835a7bec3 (patch)
treefa5390d95391e42566ac84038a98b7f86e6da653
parent8a6c52556d92ab6ab369c7bf2a8c956add582de9 (diff)
Added watcher support for agents/atoms/refs/vars
Watchers must be agents (add-watcher reference :send/:send-off an-agent an-action)
-rw-r--r--src/clj/clojure/core.clj30
-rw-r--r--src/jvm/clojure/lang/ARef.java63
-rw-r--r--src/jvm/clojure/lang/Agent.java40
-rw-r--r--src/jvm/clojure/lang/Atom.java10
-rw-r--r--src/jvm/clojure/lang/IRef.java14
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java3
-rw-r--r--src/jvm/clojure/lang/Ref.java82
-rw-r--r--src/jvm/clojure/lang/Var.java26
8 files changed, 150 insertions, 118 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 55407a39..41504cd9 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -1100,24 +1100,22 @@
occurring, does nothing. Returns the number of actions dispatched."
[] (clojure.lang.Agent/releasePendingSends))
-(defn add-watch
+(defn add-watcher
"Experimental.
- Adds a watcher to an agent. Whenever the agent runs an action, any
- registered watchers will have their callback function called. The
- callback fn will be passed 3 args, the watcher, the agent and a boolean
- which will be true if the agent's state was (potentially) changed by
- the action. The callback fn is run synchronously with the action,
- and thus derefs of the agent in the callback will see the value set
- during that action. Because it is run on the action thread, the
- callback should not block, but can send messages."
- [#^clojure.lang.Agent a watcher callback]
- (.addWatch a watcher callback))
-
-(defn remove-watch
+ Adds a watcher to an agent/atom/var/ref reference. The watcher must
+ be an Agent, and the action a function of the agent's state and one
+ additional arg, the reference. Whenever the reference's state
+ changes, any registered watchers will have their actions
+ sent. send-type must be one of :send or :send-off. The actions will
+ be sent afer the reference's state is changed."
+ [#^clojure.lang.IRef reference send-type watcher-agent action-fn]
+ (.addWatch reference watcher-agent action-fn (= send-type :send-off)))
+
+(defn remove-watcher
"Experimental.
- Removes a watcher (set by add-watch) from an agent"
- [#^clojure.lang.Agent a watcher]
- (.removeWatch a watcher))
+ Removes a watcher (set by add-watcher) from a reference"
+ [#^clojure.lang.IRef reference watcher-agent]
+ (.removeWatch reference watcher-agent))
(defn agent-errors
"Returns a sequence of the exceptions thrown during asynchronous
diff --git a/src/jvm/clojure/lang/ARef.java b/src/jvm/clojure/lang/ARef.java
index ee04d2e4..3257b9eb 100644
--- a/src/jvm/clojure/lang/ARef.java
+++ b/src/jvm/clojure/lang/ARef.java
@@ -12,8 +12,12 @@
package clojure.lang;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.Map;
+
public abstract class ARef extends AReference implements IRef {
- private volatile IFn validator = null;
+ protected volatile IFn validator = null;
+ private AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
public ARef() {
super();
@@ -57,4 +61,61 @@ public abstract class ARef extends AReference implements IRef {
public IFn getValidator(){
return validator;
}
+
+ public IPersistentMap getWatches(){
+ return watchers.get();
+ }
+
+ public IRef addWatch(Agent watcher, IFn action, boolean sendOff){
+ boolean added = false;
+ IPersistentMap prior = null;
+ while(!added)
+ {
+ prior = watchers.get();
+ added = watchers.compareAndSet(prior, prior.assoc(watcher,new Object[]{action,sendOff}));
+ }
+
+ return this;
+ }
+
+ public IRef removeWatch(Agent watcher){
+ boolean removed = false;
+ IPersistentMap prior = null;
+ while(!removed)
+ {
+ prior = watchers.get();
+ try
+ {
+ removed = watchers.compareAndSet(prior, prior.without(watcher));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return this;
+ }
+
+ public void notifyWatches() {
+ IPersistentMap ws = watchers.get();
+ if (ws != null)
+ {
+ ISeq args = new Cons(this, null);
+ for (ISeq s = RT.seq(ws); s != null; s = s.rest())
+ {
+ Map.Entry e = (Map.Entry) s.first();
+ Object[] a = (Object[]) e.getValue();
+ Agent agent = (Agent) e.getKey();
+ try
+ {
+ agent.dispatch((IFn) a[0], args, (Boolean)a[1]);
+ }
+ catch (Exception e1)
+ {
+ //eat dispatching exceptions and continue
+ }
+ }
+ }
+ }
}
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index c2d8613d..50c430c6 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -19,9 +19,8 @@ import java.util.Map;
public class Agent extends ARef {
volatile Object state;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
-AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
-volatile ISeq errors = null;
+ volatile ISeq errors = null;
final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
@@ -68,11 +67,8 @@ static class Action implements Runnable{
try
{
changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
- for(Object o : action.agent.watchers.get())
- {
- Map.Entry e = (Map.Entry) o;
- ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed));
- }
+ if(changed)
+ action.agent.notifyWatches();
}
catch(Exception e)
{
@@ -143,10 +139,10 @@ public void clearErrors(){
errors = null;
}
-public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{
+public Object dispatch(IFn fn, ISeq args, boolean solo) {
if(errors != null)
{
- throw new Exception("Agent has errors", (Exception) RT.first(errors));
+ throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
}
Action action = new Action(this, fn, args, solo);
dispatchAction(action);
@@ -183,31 +179,7 @@ public int getQueueCount(){
return q.get().count();
}
-public Agent addWatch(Object watcher, IFn callback){
- boolean added = false;
- IPersistentMap prior = null;
- while(!added)
- {
- prior = watchers.get();
- added = watchers.compareAndSet(prior, prior.assoc(watcher,callback));
- }
-
- return this;
-}
-
-public Agent removeWatch(Object watcher) throws Exception{
- boolean removed = false;
- IPersistentMap prior = null;
- while(!removed)
- {
- prior = watchers.get();
- removed = watchers.compareAndSet(prior, prior.without(watcher));
- }
-
- return this;
-}
-
-static public int releasePendingSends(){
+ static public int releasePendingSends(){
IPersistentVector sends = nested.get();
if(sends == null)
return 0;
diff --git a/src/jvm/clojure/lang/Atom.java b/src/jvm/clojure/lang/Atom.java
index 7834047a..e194b59c 100644
--- a/src/jvm/clojure/lang/Atom.java
+++ b/src/jvm/clojure/lang/Atom.java
@@ -37,18 +37,26 @@ public class Atom extends ARef{
Object newv = f.applyTo(new Cons(v, args));
validate(newv);
if(state.compareAndSet(v,newv))
+ {
+ if(v != newv)
+ notifyWatches();
return newv;
+ }
}
}
public boolean compareAndSet(Object oldv, Object newv){
validate(newv);
- return state.compareAndSet(oldv, newv);
+ boolean ret = state.compareAndSet(oldv, newv);
+ if (ret && oldv != newv)
+ notifyWatches();
+ return ret;
}
public Object reset(Object newval){
validate(newval);
state.set(newval);
+ notifyWatches();
return newval;
}
}
diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java
index 2ad289ff..641edec3 100644
--- a/src/jvm/clojure/lang/IRef.java
+++ b/src/jvm/clojure/lang/IRef.java
@@ -14,9 +14,17 @@ package clojure.lang;
public interface IRef{
-Object get() throws Exception;
+ Object get() throws Exception;
-void setValidator(IFn vf);
+ void setValidator(IFn vf);
-IFn getValidator();
+ IFn getValidator();
+
+ IPersistentMap getWatches();
+
+ IRef addWatch(Agent watcher, IFn action, boolean sendOff);
+
+ IRef removeWatch(Agent watcher);
+
+ void notifyWatches();
}
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index c152546b..43d72da2 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -252,11 +252,12 @@ Object run(Callable fn) throws Exception{
}
}
- //validate
+ //validate and enqueue notifications
for(Map.Entry<Ref, Object> e : vals.entrySet())
{
Ref ref = e.getKey();
ref.validate(ref.getValidator(), e.getValue());
+ ref.notifyWatches();
}
//at this point, all values calced, all refs to be written locked
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index 08700e5c..2a59b185 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.UUID;
-public class Ref extends AReference implements IFn, Comparable<Ref>, IRef{
+public class Ref extends ARef implements IFn, Comparable<Ref>, IRef{
public int compareTo(Ref ref) {
if(this.id == ref.id)
return 0;
@@ -58,7 +58,7 @@ TVal tvals;
final AtomicInteger faults;
final ReentrantReadWriteLock lock;
LockingTransaction.Info tinfo;
-IFn validator;
+//IFn validator;
final long id;
static final AtomicLong ids = new AtomicLong();
@@ -101,45 +101,45 @@ public Object get(){
return t.doGet(this);
}
-void validate(IFn vf, Object val){
- try{
- if(vf != null && !RT.booleanCast(vf.invoke(val)))
- throw new IllegalStateException("Invalid ref state");
- }
- catch(RuntimeException re)
- {
- throw re;
- }
- catch(Exception e)
- {
- throw new IllegalStateException("Invalid ref state", e);
- }
-}
-
-public void setValidator(IFn vf){
- try
- {
- lock.writeLock().lock();
- validate(vf,currentVal());
- validator = vf;
- }
- finally
- {
- lock.writeLock().unlock();
- }
-}
-
-public IFn getValidator(){
- try
- {
- lock.readLock().lock();
- return validator;
- }
- finally
- {
- lock.readLock().unlock();
- }
-}
+//void validate(IFn vf, Object val){
+// try{
+// if(vf != null && !RT.booleanCast(vf.invoke(val)))
+// throw new IllegalStateException("Invalid ref state");
+// }
+// catch(RuntimeException re)
+// {
+// throw re;
+// }
+// catch(Exception e)
+// {
+// throw new IllegalStateException("Invalid ref state", e);
+// }
+//}
+//
+//public void setValidator(IFn vf){
+// try
+// {
+// lock.writeLock().lock();
+// validate(vf,currentVal());
+// validator = vf;
+// }
+// finally
+// {
+// lock.writeLock().unlock();
+// }
+//}
+//
+//public IFn getValidator(){
+// try
+// {
+// lock.readLock().lock();
+// return validator;
+// }
+// finally
+// {
+// lock.readLock().unlock();
+// }
+//}
public Object set(Object val){
return LockingTransaction.getEx().doSet(this, val);
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index a8d29a6e..dfc0436c 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -15,7 +15,7 @@ package clojure.lang;
import java.util.concurrent.atomic.AtomicInteger;
-public final class Var extends AReference implements IFn, IRef, Settable{
+public final class Var extends ARef implements IFn, IRef, Settable{
static class Frame{
@@ -55,7 +55,6 @@ volatile Object root;
transient final AtomicInteger count;
public final Symbol sym;
public final Namespace ns;
-volatile IFn validator = null;
//IPersistentMap _meta;
@@ -143,30 +142,11 @@ public void setValidator(IFn vf){
validator = vf;
}
-public IFn getValidator(){
- return validator;
-}
-
public Object alter(IFn fn, ISeq args) throws Exception{
set(fn.applyTo(RT.cons(get(), args)));
return this;
}
-void validate(IFn vf, Object val){
- try{
- if(vf != null && !RT.booleanCast(vf.invoke(val)))
- throw new IllegalStateException("Invalid var state");
- }
- catch(RuntimeException re)
- {
- throw re;
- }
- catch(Exception e)
- {
- throw new IllegalStateException("Invalid var state", e);
- }
-}
-
public Object set(Object val){
validate(getValidator(), val);
Box b = getThreadBinding();
@@ -253,11 +233,13 @@ synchronized public void bindRoot(Object root){
{
throw new RuntimeException(e);
}
+ notifyWatches();
}
synchronized void swapRoot(Object root){
validate(getValidator(), root);
this.root = root;
+ notifyWatches();
}
synchronized public void unbindRoot(){
@@ -268,12 +250,14 @@ synchronized public void commuteRoot(IFn fn) throws Exception{
Object newRoot = fn.invoke(root);
validate(getValidator(), newRoot);
this.root = newRoot;
+ notifyWatches();
}
synchronized public Object alterRoot(IFn fn, ISeq args) throws Exception{
Object newRoot = fn.applyTo(RT.cons(root, args));
validate(getValidator(), newRoot);
this.root = newRoot;
+ notifyWatches();
return newRoot;
}