diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-02-26 15:34:41 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-02-26 15:34:41 +0000 |
commit | 2ea56006cb0ee74559ecd5d97e05944c6ee21fc4 (patch) | |
tree | ad57bf27c2a63d74c93a6b0321cb7a67619de270 | |
parent | 71334efab758979331735e832b36052bc1bc19de (diff) |
interim checkin - needs testing - made watches synchronous, send old+new state, added add-watch, remove-watch, redefined add-watcher in terms of add-watch
-rw-r--r-- | src/jvm/clojure/lang/ARef.java | 160 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 8 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Atom.java | 159 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IRef.java | 7 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 43 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Var.java | 18 |
6 files changed, 212 insertions, 183 deletions
diff --git a/src/jvm/clojure/lang/ARef.java b/src/jvm/clojure/lang/ARef.java index 33e88d8c..99c8c692 100644 --- a/src/jvm/clojure/lang/ARef.java +++ b/src/jvm/clojure/lang/ARef.java @@ -14,94 +14,94 @@ package clojure.lang; import java.util.Map; -public abstract class ARef extends AReference implements IRef { - protected volatile IFn validator = null; - private volatile IPersistentMap watchers = PersistentHashMap.EMPTY; +public abstract class ARef extends AReference implements IRef{ +protected volatile IFn validator = null; +private volatile IPersistentMap watches = PersistentHashMap.EMPTY; - public ARef() { - super(); - } +public ARef(){ + super(); +} - public ARef(IPersistentMap meta) { - super(meta); - } +public ARef(IPersistentMap meta){ + super(meta); +} - void validate(IFn vf, Object val){ - try{ - if(vf != null && !RT.booleanCast(vf.invoke(val))) - throw new IllegalStateException("Invalid reference state"); - } - catch(RuntimeException re) - { - throw re; - } - catch(Exception e) - { - throw new IllegalStateException("Invalid reference state", e); - } - } +void validate(IFn vf, Object val){ + try + { + if(vf != null && !RT.booleanCast(vf.invoke(val))) + throw new IllegalStateException("Invalid reference state"); + } + catch(RuntimeException re) + { + throw re; + } + catch(Exception e) + { + throw new IllegalStateException("Invalid reference state", e); + } +} - void validate(Object val){ - validate(validator,val); - } +void validate(Object val){ + validate(validator, val); +} - public void setValidator(IFn vf){ - try - { - validate(vf, deref()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - validator = vf; - } +public void setValidator(IFn vf){ + try + { + validate(vf, deref()); + } + catch(Exception e) + { + throw new RuntimeException(e); + } + validator = vf; +} - public IFn getValidator(){ - return validator; - } +public IFn getValidator(){ + return validator; +} - public IPersistentMap getWatches(){ - return watchers; - } - - synchronized public IRef addWatch(Agent watcher, IFn action, boolean sendOff){ - watchers = watchers.assoc(watcher, new Object[]{action, sendOff}); - return this; - } +public IPersistentMap getWatches(){ + return watches; +} - synchronized public IRef removeWatch(Agent watcher){ - try - { - watchers = watchers.without(watcher); - } - catch(Exception e) - { - throw new RuntimeException(e); - } +synchronized public IRef addWatch(Object key, IFn callback){ + watches = watches.assoc(key, callback); + return this; +} + +synchronized public IRef removeWatch(Object key){ + try + { + watches = watches.without(key); + } + catch(Exception e) + { + throw new RuntimeException(e); + } - return this; - } + return this; +} - public void notifyWatches() { - IPersistentMap ws = watchers; - if (ws.count() > 0) - { - ISeq args = new Cons(this, null); - for (ISeq s = RT.seq(ws); s != null; s = s.next()) - { - Map.Entry e = (Map.Entry) s.first(); - Object[] a = (Object[]) e.getValue(); - Agent agent = (Agent) e.getKey(); - try - { - agent.dispatch((IFn) a[0], args, (Boolean)a[1]); - } - catch (Exception e1) - { - //eat dispatching exceptions and continue - } - } - } - } +public void notifyWatches(Object oldval, Object newval){ + IPersistentMap ws = watches; + if(ws.count() > 0) + { + for(ISeq s = ws.seq(); s != null; s = s.next()) + { + Map.Entry e = (Map.Entry) s.first(); + IFn fn = (IFn) e.getValue(); + try + { + if(fn != null) + fn.invoke(e.getKey(), this, oldval, newval); + } + catch(Exception e1) + { + throw new RuntimeException(e1); + } + } + } +} } diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 277307e7..3737805d 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -63,12 +63,12 @@ static class Action implements Runnable{ nested.set(PersistentVector.EMPTY); boolean hadError = false; - boolean changed = false; try { - changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args))); - if(changed) - action.agent.notifyWatches(); + Object oldval = action.agent.state; + Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args)); + action.agent.setState(newval); + action.agent.notifyWatches(oldval,newval); } catch(Throwable e) { diff --git a/src/jvm/clojure/lang/Atom.java b/src/jvm/clojure/lang/Atom.java index 050b5ffb..eebe1d98 100644 --- a/src/jvm/clojure/lang/Atom.java +++ b/src/jvm/clojure/lang/Atom.java @@ -14,94 +14,91 @@ package clojure.lang; import java.util.concurrent.atomic.AtomicReference; -public class Atom extends ARef{ - final AtomicReference state; +final public class Atom extends ARef{ +final AtomicReference state; - public Atom(Object state) { - this.state = new AtomicReference(state); - } +public Atom(Object state){ + this.state = new AtomicReference(state); +} - public Atom(Object state, IPersistentMap meta) { - super(meta); - this.state = new AtomicReference(state); - } +public Atom(Object state, IPersistentMap meta){ + super(meta); + this.state = new AtomicReference(state); +} - public Object deref() { - return state.get(); - } +public Object deref(){ + return state.get(); +} - public Object swap(IFn f) throws Exception { - for(;;) - { - Object v = deref(); - Object newv = f.invoke(v); - validate(newv); - if(state.compareAndSet(v,newv)) - { - if(v != newv) - notifyWatches(); - return newv; - } - } - } +public Object swap(IFn f) throws Exception{ + for(; ;) + { + Object v = deref(); + Object newv = f.invoke(v); + validate(newv); + if(state.compareAndSet(v, newv)) + { + notifyWatches(v, newv); + return newv; + } + } +} - public Object swap(IFn f, Object arg) throws Exception { - for(;;) - { - Object v = deref(); - Object newv = f.invoke(v,arg); - validate(newv); - if(state.compareAndSet(v,newv)) - { - if(v != newv) - notifyWatches(); - return newv; - } - } - } +public Object swap(IFn f, Object arg) throws Exception{ + for(; ;) + { + Object v = deref(); + Object newv = f.invoke(v, arg); + validate(newv); + if(state.compareAndSet(v, newv)) + { + notifyWatches(v, newv); + return newv; + } + } +} - public Object swap(IFn f, Object arg1, Object arg2) throws Exception { - for(;;) - { - Object v = deref(); - Object newv = f.invoke(v, arg1, arg2); - validate(newv); - if(state.compareAndSet(v,newv)) - { - if(v != newv) - notifyWatches(); - return newv; - } - } - } +public Object swap(IFn f, Object arg1, Object arg2) throws Exception{ + for(; ;) + { + Object v = deref(); + Object newv = f.invoke(v, arg1, arg2); + validate(newv); + if(state.compareAndSet(v, newv)) + { + notifyWatches(v, newv); + return newv; + } + } +} - public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception { - for(;;) - { - Object v = deref(); - Object newv = f.applyTo(RT.listStar(v, x, y, args)); - validate(newv); - if(state.compareAndSet(v,newv)) - { - if(v != newv) - notifyWatches(); - return newv; - } - } - } +public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception{ + for(; ;) + { + Object v = deref(); + Object newv = f.applyTo(RT.listStar(v, x, y, args)); + validate(newv); + if(state.compareAndSet(v, newv)) + { + notifyWatches(v, newv); + return newv; + } + } +} - public boolean compareAndSet(Object oldv, Object newv){ - validate(newv); - boolean ret = state.compareAndSet(oldv, newv); - if (ret && oldv != newv) - notifyWatches(); - return ret; - } +public boolean compareAndSet(Object oldv, Object newv){ + validate(newv); + boolean ret = state.compareAndSet(oldv, newv); + if(ret) + notifyWatches(oldv, newv); + return ret; +} - public Object reset(Object newval){ - validate(newval); - state.set(newval); - notifyWatches(); - return newval; - } +public Object reset(Object newval){ + Object oldval = state.get(); + validate(newval); + state.set(newval); + notifyWatches(oldval, newval); + return newval; +} } diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java index be5687af..2e40aa15 100644 --- a/src/jvm/clojure/lang/IRef.java +++ b/src/jvm/clojure/lang/IRef.java @@ -14,15 +14,14 @@ package clojure.lang; public interface IRef extends IDeref{ -void setValidator(IFn vf); + void setValidator(IFn vf); IFn getValidator(); IPersistentMap getWatches(); - IRef addWatch(Agent watcher, IFn action, boolean sendOff); + IRef addWatch(Object key, IFn callback); - IRef removeWatch(Agent watcher); + IRef removeWatch(Object key); - void notifyWatches(); } diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 43d72da2..66a262e7 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -204,10 +204,23 @@ static public Object runInTransaction(Callable fn) throws Exception{ return t.run(fn); } +static class Notify{ + final public Ref ref; + final public Object oldval; + final public Object newval; + + Notify(Ref ref, Object oldval, Object newval){ + this.ref = ref; + this.oldval = oldval; + this.newval = newval; + } +} + Object run(Callable fn) throws Exception{ boolean done = false; Object ret = null; ArrayList<Ref> locked = new ArrayList<Ref>(); + ArrayList<Notify> notify = new ArrayList<Notify>(); for(int i = 0; !done && i < RETRY_LIMIT; i++) { @@ -257,7 +270,6 @@ Object run(Callable fn) throws Exception{ { Ref ref = e.getKey(); ref.validate(ref.getValidator(), e.getValue()); - ref.notifyWatches(); } //at this point, all values calced, all refs to be written locked @@ -267,22 +279,26 @@ Object run(Callable fn) throws Exception{ for(Map.Entry<Ref, Object> e : vals.entrySet()) { Ref ref = e.getKey(); + Object oldval = ref.tvals == null ? null : ref.tvals.val; + Object newval = e.getValue(); if(ref.tvals == null) { - ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs); + ref.tvals = new Ref.TVal(newval, commitPoint, msecs); } else if(ref.faults.get() > 0) { - ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); + ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals); ref.faults.set(0); } else { ref.tvals = ref.tvals.next; - ref.tvals.val = e.getValue(); + ref.tvals.val = newval; ref.tvals.point = commitPoint; ref.tvals.msecs = msecs; } + if(ref.getWatches().count() > 0) + notify.add(new Notify(ref, oldval, newval)); } done = true; @@ -301,14 +317,25 @@ Object run(Callable fn) throws Exception{ } locked.clear(); stop(done ? COMMITTED : RETRY); - if(done) //re-dispatch out of transaction + try { - for(Agent.Action action : actions) + if(done) //re-dispatch out of transaction { - Agent.dispatchAction(action); + for(Notify n : notify) + { + n.ref.notifyWatches(n.oldval, n.newval); + } + for(Agent.Action action : actions) + { + Agent.dispatchAction(action); + } } } - actions.clear(); + finally + { + notify.clear(); + actions.clear(); + } } } if(!done) diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java index 02fc2e1b..598a5785 100644 --- a/src/jvm/clojure/lang/Var.java +++ b/src/jvm/clojure/lang/Var.java @@ -141,7 +141,7 @@ final public Object deref(){ } public void setValidator(IFn vf){ - if(isBound()) + if(hasRoot()) validate(vf, getRoot()); validator = vf; } @@ -203,7 +203,9 @@ public boolean isPublic(){ } public Object getRoot(){ - return root; + if(hasRoot()) + return root; + throw new IllegalStateException(String.format("Var %s/%s is unbound.", ns, sym)); } public Object getTag(){ @@ -228,6 +230,7 @@ final public boolean hasRoot(){ //binding root always clears macro flag synchronized public void bindRoot(Object root){ validate(getValidator(), root); + Object oldroot = hasRoot()?this.root:null; this.root = root; try { @@ -237,13 +240,14 @@ synchronized public void bindRoot(Object root){ { throw new RuntimeException(e); } - notifyWatches(); + notifyWatches(oldroot,this.root); } synchronized void swapRoot(Object root){ validate(getValidator(), root); + Object oldroot = hasRoot()?this.root:null; this.root = root; - notifyWatches(); + notifyWatches(oldroot,root); } synchronized public void unbindRoot(){ @@ -253,15 +257,17 @@ synchronized public void unbindRoot(){ synchronized public void commuteRoot(IFn fn) throws Exception{ Object newRoot = fn.invoke(root); validate(getValidator(), newRoot); + Object oldroot = getRoot(); this.root = newRoot; - notifyWatches(); + notifyWatches(oldroot,newRoot); } synchronized public Object alterRoot(IFn fn, ISeq args) throws Exception{ Object newRoot = fn.applyTo(RT.cons(root, args)); validate(getValidator(), newRoot); + Object oldroot = getRoot(); this.root = newRoot; - notifyWatches(); + notifyWatches(oldroot,newRoot); return newRoot; } |