diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-01-02 02:58:51 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-01-02 02:58:51 +0000 |
commit | e26c24712a9e0f94e25aa6e9bdadea7835a7bec3 (patch) | |
tree | fa5390d95391e42566ac84038a98b7f86e6da653 | |
parent | 8a6c52556d92ab6ab369c7bf2a8c956add582de9 (diff) |
Added watcher support for agents/atoms/refs/vars
Watchers must be agents
(add-watcher reference :send/:send-off an-agent an-action)
-rw-r--r-- | src/clj/clojure/core.clj | 30 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ARef.java | 63 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 40 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Atom.java | 10 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IRef.java | 14 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 3 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 82 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Var.java | 26 |
8 files changed, 150 insertions, 118 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj index 55407a39..41504cd9 100644 --- a/src/clj/clojure/core.clj +++ b/src/clj/clojure/core.clj @@ -1100,24 +1100,22 @@ occurring, does nothing. Returns the number of actions dispatched." [] (clojure.lang.Agent/releasePendingSends)) -(defn add-watch +(defn add-watcher "Experimental. - Adds a watcher to an agent. Whenever the agent runs an action, any - registered watchers will have their callback function called. The - callback fn will be passed 3 args, the watcher, the agent and a boolean - which will be true if the agent's state was (potentially) changed by - the action. The callback fn is run synchronously with the action, - and thus derefs of the agent in the callback will see the value set - during that action. Because it is run on the action thread, the - callback should not block, but can send messages." - [#^clojure.lang.Agent a watcher callback] - (.addWatch a watcher callback)) - -(defn remove-watch + Adds a watcher to an agent/atom/var/ref reference. The watcher must + be an Agent, and the action a function of the agent's state and one + additional arg, the reference. Whenever the reference's state + changes, any registered watchers will have their actions + sent. send-type must be one of :send or :send-off. The actions will + be sent afer the reference's state is changed." + [#^clojure.lang.IRef reference send-type watcher-agent action-fn] + (.addWatch reference watcher-agent action-fn (= send-type :send-off))) + +(defn remove-watcher "Experimental. - Removes a watcher (set by add-watch) from an agent" - [#^clojure.lang.Agent a watcher] - (.removeWatch a watcher)) + Removes a watcher (set by add-watcher) from a reference" + [#^clojure.lang.IRef reference watcher-agent] + (.removeWatch reference watcher-agent)) (defn agent-errors "Returns a sequence of the exceptions thrown during asynchronous diff --git a/src/jvm/clojure/lang/ARef.java b/src/jvm/clojure/lang/ARef.java index ee04d2e4..3257b9eb 100644 --- a/src/jvm/clojure/lang/ARef.java +++ b/src/jvm/clojure/lang/ARef.java @@ -12,8 +12,12 @@ package clojure.lang; +import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; + public abstract class ARef extends AReference implements IRef { - private volatile IFn validator = null; + protected volatile IFn validator = null; + private AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY); public ARef() { super(); @@ -57,4 +61,61 @@ public abstract class ARef extends AReference implements IRef { public IFn getValidator(){ return validator; } + + public IPersistentMap getWatches(){ + return watchers.get(); + } + + public IRef addWatch(Agent watcher, IFn action, boolean sendOff){ + boolean added = false; + IPersistentMap prior = null; + while(!added) + { + prior = watchers.get(); + added = watchers.compareAndSet(prior, prior.assoc(watcher,new Object[]{action,sendOff})); + } + + return this; + } + + public IRef removeWatch(Agent watcher){ + boolean removed = false; + IPersistentMap prior = null; + while(!removed) + { + prior = watchers.get(); + try + { + removed = watchers.compareAndSet(prior, prior.without(watcher)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + return this; + } + + public void notifyWatches() { + IPersistentMap ws = watchers.get(); + if (ws != null) + { + ISeq args = new Cons(this, null); + for (ISeq s = RT.seq(ws); s != null; s = s.rest()) + { + Map.Entry e = (Map.Entry) s.first(); + Object[] a = (Object[]) e.getValue(); + Agent agent = (Agent) e.getKey(); + try + { + agent.dispatch((IFn) a[0], args, (Boolean)a[1]); + } + catch (Exception e1) + { + //eat dispatching exceptions and continue + } + } + } + } } diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index c2d8613d..50c430c6 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -19,9 +19,8 @@ import java.util.Map; public class Agent extends ARef { volatile Object state; AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); -AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY); -volatile ISeq errors = null; + volatile ISeq errors = null; final public static ExecutorService pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); @@ -68,11 +67,8 @@ static class Action implements Runnable{ try { changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); - for(Object o : action.agent.watchers.get()) - { - Map.Entry e = (Map.Entry) o; - ((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed)); - } + if(changed) + action.agent.notifyWatches(); } catch(Exception e) { @@ -143,10 +139,10 @@ public void clearErrors(){ errors = null; } -public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{ +public Object dispatch(IFn fn, ISeq args, boolean solo) { if(errors != null) { - throw new Exception("Agent has errors", (Exception) RT.first(errors)); + throw new RuntimeException("Agent has errors", (Exception) RT.first(errors)); } Action action = new Action(this, fn, args, solo); dispatchAction(action); @@ -183,31 +179,7 @@ public int getQueueCount(){ return q.get().count(); } -public Agent addWatch(Object watcher, IFn callback){ - boolean added = false; - IPersistentMap prior = null; - while(!added) - { - prior = watchers.get(); - added = watchers.compareAndSet(prior, prior.assoc(watcher,callback)); - } - - return this; -} - -public Agent removeWatch(Object watcher) throws Exception{ - boolean removed = false; - IPersistentMap prior = null; - while(!removed) - { - prior = watchers.get(); - removed = watchers.compareAndSet(prior, prior.without(watcher)); - } - - return this; -} - -static public int releasePendingSends(){ + static public int releasePendingSends(){ IPersistentVector sends = nested.get(); if(sends == null) return 0; diff --git a/src/jvm/clojure/lang/Atom.java b/src/jvm/clojure/lang/Atom.java index 7834047a..e194b59c 100644 --- a/src/jvm/clojure/lang/Atom.java +++ b/src/jvm/clojure/lang/Atom.java @@ -37,18 +37,26 @@ public class Atom extends ARef{ Object newv = f.applyTo(new Cons(v, args)); validate(newv); if(state.compareAndSet(v,newv)) + { + if(v != newv) + notifyWatches(); return newv; + } } } public boolean compareAndSet(Object oldv, Object newv){ validate(newv); - return state.compareAndSet(oldv, newv); + boolean ret = state.compareAndSet(oldv, newv); + if (ret && oldv != newv) + notifyWatches(); + return ret; } public Object reset(Object newval){ validate(newval); state.set(newval); + notifyWatches(); return newval; } } diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java index 2ad289ff..641edec3 100644 --- a/src/jvm/clojure/lang/IRef.java +++ b/src/jvm/clojure/lang/IRef.java @@ -14,9 +14,17 @@ package clojure.lang; public interface IRef{ -Object get() throws Exception; + Object get() throws Exception; -void setValidator(IFn vf); + void setValidator(IFn vf); -IFn getValidator(); + IFn getValidator(); + + IPersistentMap getWatches(); + + IRef addWatch(Agent watcher, IFn action, boolean sendOff); + + IRef removeWatch(Agent watcher); + + void notifyWatches(); } diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index c152546b..43d72da2 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -252,11 +252,12 @@ Object run(Callable fn) throws Exception{ } } - //validate + //validate and enqueue notifications for(Map.Entry<Ref, Object> e : vals.entrySet()) { Ref ref = e.getKey(); ref.validate(ref.getValidator(), e.getValue()); + ref.notifyWatches(); } //at this point, all values calced, all refs to be written locked diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index 08700e5c..2a59b185 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.UUID; -public class Ref extends AReference implements IFn, Comparable<Ref>, IRef{ +public class Ref extends ARef implements IFn, Comparable<Ref>, IRef{ public int compareTo(Ref ref) { if(this.id == ref.id) return 0; @@ -58,7 +58,7 @@ TVal tvals; final AtomicInteger faults; final ReentrantReadWriteLock lock; LockingTransaction.Info tinfo; -IFn validator; +//IFn validator; final long id; static final AtomicLong ids = new AtomicLong(); @@ -101,45 +101,45 @@ public Object get(){ return t.doGet(this); } -void validate(IFn vf, Object val){ - try{ - if(vf != null && !RT.booleanCast(vf.invoke(val))) - throw new IllegalStateException("Invalid ref state"); - } - catch(RuntimeException re) - { - throw re; - } - catch(Exception e) - { - throw new IllegalStateException("Invalid ref state", e); - } -} - -public void setValidator(IFn vf){ - try - { - lock.writeLock().lock(); - validate(vf,currentVal()); - validator = vf; - } - finally - { - lock.writeLock().unlock(); - } -} - -public IFn getValidator(){ - try - { - lock.readLock().lock(); - return validator; - } - finally - { - lock.readLock().unlock(); - } -} +//void validate(IFn vf, Object val){ +// try{ +// if(vf != null && !RT.booleanCast(vf.invoke(val))) +// throw new IllegalStateException("Invalid ref state"); +// } +// catch(RuntimeException re) +// { +// throw re; +// } +// catch(Exception e) +// { +// throw new IllegalStateException("Invalid ref state", e); +// } +//} +// +//public void setValidator(IFn vf){ +// try +// { +// lock.writeLock().lock(); +// validate(vf,currentVal()); +// validator = vf; +// } +// finally +// { +// lock.writeLock().unlock(); +// } +//} +// +//public IFn getValidator(){ +// try +// { +// lock.readLock().lock(); +// return validator; +// } +// finally +// { +// lock.readLock().unlock(); +// } +//} public Object set(Object val){ return LockingTransaction.getEx().doSet(this, val); diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java index a8d29a6e..dfc0436c 100644 --- a/src/jvm/clojure/lang/Var.java +++ b/src/jvm/clojure/lang/Var.java @@ -15,7 +15,7 @@ package clojure.lang; import java.util.concurrent.atomic.AtomicInteger; -public final class Var extends AReference implements IFn, IRef, Settable{ +public final class Var extends ARef implements IFn, IRef, Settable{ static class Frame{ @@ -55,7 +55,6 @@ volatile Object root; transient final AtomicInteger count; public final Symbol sym; public final Namespace ns; -volatile IFn validator = null; //IPersistentMap _meta; @@ -143,30 +142,11 @@ public void setValidator(IFn vf){ validator = vf; } -public IFn getValidator(){ - return validator; -} - public Object alter(IFn fn, ISeq args) throws Exception{ set(fn.applyTo(RT.cons(get(), args))); return this; } -void validate(IFn vf, Object val){ - try{ - if(vf != null && !RT.booleanCast(vf.invoke(val))) - throw new IllegalStateException("Invalid var state"); - } - catch(RuntimeException re) - { - throw re; - } - catch(Exception e) - { - throw new IllegalStateException("Invalid var state", e); - } -} - public Object set(Object val){ validate(getValidator(), val); Box b = getThreadBinding(); @@ -253,11 +233,13 @@ synchronized public void bindRoot(Object root){ { throw new RuntimeException(e); } + notifyWatches(); } synchronized void swapRoot(Object root){ validate(getValidator(), root); this.root = root; + notifyWatches(); } synchronized public void unbindRoot(){ @@ -268,12 +250,14 @@ synchronized public void commuteRoot(IFn fn) throws Exception{ Object newRoot = fn.invoke(root); validate(getValidator(), newRoot); this.root = newRoot; + notifyWatches(); } synchronized public Object alterRoot(IFn fn, ISeq args) throws Exception{ Object newRoot = fn.applyTo(RT.cons(root, args)); validate(getValidator(), newRoot); this.root = newRoot; + notifyWatches(); return newRoot; } |