summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-02-26 15:34:41 +0000
committerRich Hickey <richhickey@gmail.com>2009-02-26 15:34:41 +0000
commit2ea56006cb0ee74559ecd5d97e05944c6ee21fc4 (patch)
treead57bf27c2a63d74c93a6b0321cb7a67619de270
parent71334efab758979331735e832b36052bc1bc19de (diff)
interim checkin - needs testing - made watches synchronous, send old+new state, added add-watch, remove-watch, redefined add-watcher in terms of add-watch
-rw-r--r--src/jvm/clojure/lang/ARef.java160
-rw-r--r--src/jvm/clojure/lang/Agent.java8
-rw-r--r--src/jvm/clojure/lang/Atom.java159
-rw-r--r--src/jvm/clojure/lang/IRef.java7
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java43
-rw-r--r--src/jvm/clojure/lang/Var.java18
6 files changed, 212 insertions, 183 deletions
diff --git a/src/jvm/clojure/lang/ARef.java b/src/jvm/clojure/lang/ARef.java
index 33e88d8c..99c8c692 100644
--- a/src/jvm/clojure/lang/ARef.java
+++ b/src/jvm/clojure/lang/ARef.java
@@ -14,94 +14,94 @@ package clojure.lang;
import java.util.Map;
-public abstract class ARef extends AReference implements IRef {
- protected volatile IFn validator = null;
- private volatile IPersistentMap watchers = PersistentHashMap.EMPTY;
+public abstract class ARef extends AReference implements IRef{
+protected volatile IFn validator = null;
+private volatile IPersistentMap watches = PersistentHashMap.EMPTY;
- public ARef() {
- super();
- }
+public ARef(){
+ super();
+}
- public ARef(IPersistentMap meta) {
- super(meta);
- }
+public ARef(IPersistentMap meta){
+ super(meta);
+}
- void validate(IFn vf, Object val){
- try{
- if(vf != null && !RT.booleanCast(vf.invoke(val)))
- throw new IllegalStateException("Invalid reference state");
- }
- catch(RuntimeException re)
- {
- throw re;
- }
- catch(Exception e)
- {
- throw new IllegalStateException("Invalid reference state", e);
- }
- }
+void validate(IFn vf, Object val){
+ try
+ {
+ if(vf != null && !RT.booleanCast(vf.invoke(val)))
+ throw new IllegalStateException("Invalid reference state");
+ }
+ catch(RuntimeException re)
+ {
+ throw re;
+ }
+ catch(Exception e)
+ {
+ throw new IllegalStateException("Invalid reference state", e);
+ }
+}
- void validate(Object val){
- validate(validator,val);
- }
+void validate(Object val){
+ validate(validator, val);
+}
- public void setValidator(IFn vf){
- try
- {
- validate(vf, deref());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- validator = vf;
- }
+public void setValidator(IFn vf){
+ try
+ {
+ validate(vf, deref());
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ validator = vf;
+}
- public IFn getValidator(){
- return validator;
- }
+public IFn getValidator(){
+ return validator;
+}
- public IPersistentMap getWatches(){
- return watchers;
- }
-
- synchronized public IRef addWatch(Agent watcher, IFn action, boolean sendOff){
- watchers = watchers.assoc(watcher, new Object[]{action, sendOff});
- return this;
- }
+public IPersistentMap getWatches(){
+ return watches;
+}
- synchronized public IRef removeWatch(Agent watcher){
- try
- {
- watchers = watchers.without(watcher);
- }
- catch(Exception e)
- {
- throw new RuntimeException(e);
- }
+synchronized public IRef addWatch(Object key, IFn callback){
+ watches = watches.assoc(key, callback);
+ return this;
+}
+
+synchronized public IRef removeWatch(Object key){
+ try
+ {
+ watches = watches.without(key);
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
- return this;
- }
+ return this;
+}
- public void notifyWatches() {
- IPersistentMap ws = watchers;
- if (ws.count() > 0)
- {
- ISeq args = new Cons(this, null);
- for (ISeq s = RT.seq(ws); s != null; s = s.next())
- {
- Map.Entry e = (Map.Entry) s.first();
- Object[] a = (Object[]) e.getValue();
- Agent agent = (Agent) e.getKey();
- try
- {
- agent.dispatch((IFn) a[0], args, (Boolean)a[1]);
- }
- catch (Exception e1)
- {
- //eat dispatching exceptions and continue
- }
- }
- }
- }
+public void notifyWatches(Object oldval, Object newval){
+ IPersistentMap ws = watches;
+ if(ws.count() > 0)
+ {
+ for(ISeq s = ws.seq(); s != null; s = s.next())
+ {
+ Map.Entry e = (Map.Entry) s.first();
+ IFn fn = (IFn) e.getValue();
+ try
+ {
+ if(fn != null)
+ fn.invoke(e.getKey(), this, oldval, newval);
+ }
+ catch(Exception e1)
+ {
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+}
}
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 277307e7..3737805d 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -63,12 +63,12 @@ static class Action implements Runnable{
nested.set(PersistentVector.EMPTY);
boolean hadError = false;
- boolean changed = false;
try
{
- changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
- if(changed)
- action.agent.notifyWatches();
+ Object oldval = action.agent.state;
+ Object newval = action.fn.applyTo(RT.cons(action.agent.state, action.args));
+ action.agent.setState(newval);
+ action.agent.notifyWatches(oldval,newval);
}
catch(Throwable e)
{
diff --git a/src/jvm/clojure/lang/Atom.java b/src/jvm/clojure/lang/Atom.java
index 050b5ffb..eebe1d98 100644
--- a/src/jvm/clojure/lang/Atom.java
+++ b/src/jvm/clojure/lang/Atom.java
@@ -14,94 +14,91 @@ package clojure.lang;
import java.util.concurrent.atomic.AtomicReference;
-public class Atom extends ARef{
- final AtomicReference state;
+final public class Atom extends ARef{
+final AtomicReference state;
- public Atom(Object state) {
- this.state = new AtomicReference(state);
- }
+public Atom(Object state){
+ this.state = new AtomicReference(state);
+}
- public Atom(Object state, IPersistentMap meta) {
- super(meta);
- this.state = new AtomicReference(state);
- }
+public Atom(Object state, IPersistentMap meta){
+ super(meta);
+ this.state = new AtomicReference(state);
+}
- public Object deref() {
- return state.get();
- }
+public Object deref(){
+ return state.get();
+}
- public Object swap(IFn f) throws Exception {
- for(;;)
- {
- Object v = deref();
- Object newv = f.invoke(v);
- validate(newv);
- if(state.compareAndSet(v,newv))
- {
- if(v != newv)
- notifyWatches();
- return newv;
- }
- }
- }
+public Object swap(IFn f) throws Exception{
+ for(; ;)
+ {
+ Object v = deref();
+ Object newv = f.invoke(v);
+ validate(newv);
+ if(state.compareAndSet(v, newv))
+ {
+ notifyWatches(v, newv);
+ return newv;
+ }
+ }
+}
- public Object swap(IFn f, Object arg) throws Exception {
- for(;;)
- {
- Object v = deref();
- Object newv = f.invoke(v,arg);
- validate(newv);
- if(state.compareAndSet(v,newv))
- {
- if(v != newv)
- notifyWatches();
- return newv;
- }
- }
- }
+public Object swap(IFn f, Object arg) throws Exception{
+ for(; ;)
+ {
+ Object v = deref();
+ Object newv = f.invoke(v, arg);
+ validate(newv);
+ if(state.compareAndSet(v, newv))
+ {
+ notifyWatches(v, newv);
+ return newv;
+ }
+ }
+}
- public Object swap(IFn f, Object arg1, Object arg2) throws Exception {
- for(;;)
- {
- Object v = deref();
- Object newv = f.invoke(v, arg1, arg2);
- validate(newv);
- if(state.compareAndSet(v,newv))
- {
- if(v != newv)
- notifyWatches();
- return newv;
- }
- }
- }
+public Object swap(IFn f, Object arg1, Object arg2) throws Exception{
+ for(; ;)
+ {
+ Object v = deref();
+ Object newv = f.invoke(v, arg1, arg2);
+ validate(newv);
+ if(state.compareAndSet(v, newv))
+ {
+ notifyWatches(v, newv);
+ return newv;
+ }
+ }
+}
- public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception {
- for(;;)
- {
- Object v = deref();
- Object newv = f.applyTo(RT.listStar(v, x, y, args));
- validate(newv);
- if(state.compareAndSet(v,newv))
- {
- if(v != newv)
- notifyWatches();
- return newv;
- }
- }
- }
+public Object swap(IFn f, Object x, Object y, ISeq args) throws Exception{
+ for(; ;)
+ {
+ Object v = deref();
+ Object newv = f.applyTo(RT.listStar(v, x, y, args));
+ validate(newv);
+ if(state.compareAndSet(v, newv))
+ {
+ notifyWatches(v, newv);
+ return newv;
+ }
+ }
+}
- public boolean compareAndSet(Object oldv, Object newv){
- validate(newv);
- boolean ret = state.compareAndSet(oldv, newv);
- if (ret && oldv != newv)
- notifyWatches();
- return ret;
- }
+public boolean compareAndSet(Object oldv, Object newv){
+ validate(newv);
+ boolean ret = state.compareAndSet(oldv, newv);
+ if(ret)
+ notifyWatches(oldv, newv);
+ return ret;
+}
- public Object reset(Object newval){
- validate(newval);
- state.set(newval);
- notifyWatches();
- return newval;
- }
+public Object reset(Object newval){
+ Object oldval = state.get();
+ validate(newval);
+ state.set(newval);
+ notifyWatches(oldval, newval);
+ return newval;
+}
}
diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java
index be5687af..2e40aa15 100644
--- a/src/jvm/clojure/lang/IRef.java
+++ b/src/jvm/clojure/lang/IRef.java
@@ -14,15 +14,14 @@ package clojure.lang;
public interface IRef extends IDeref{
-void setValidator(IFn vf);
+ void setValidator(IFn vf);
IFn getValidator();
IPersistentMap getWatches();
- IRef addWatch(Agent watcher, IFn action, boolean sendOff);
+ IRef addWatch(Object key, IFn callback);
- IRef removeWatch(Agent watcher);
+ IRef removeWatch(Object key);
- void notifyWatches();
}
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 43d72da2..66a262e7 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -204,10 +204,23 @@ static public Object runInTransaction(Callable fn) throws Exception{
return t.run(fn);
}
+static class Notify{
+ final public Ref ref;
+ final public Object oldval;
+ final public Object newval;
+
+ Notify(Ref ref, Object oldval, Object newval){
+ this.ref = ref;
+ this.oldval = oldval;
+ this.newval = newval;
+ }
+}
+
Object run(Callable fn) throws Exception{
boolean done = false;
Object ret = null;
ArrayList<Ref> locked = new ArrayList<Ref>();
+ ArrayList<Notify> notify = new ArrayList<Notify>();
for(int i = 0; !done && i < RETRY_LIMIT; i++)
{
@@ -257,7 +270,6 @@ Object run(Callable fn) throws Exception{
{
Ref ref = e.getKey();
ref.validate(ref.getValidator(), e.getValue());
- ref.notifyWatches();
}
//at this point, all values calced, all refs to be written locked
@@ -267,22 +279,26 @@ Object run(Callable fn) throws Exception{
for(Map.Entry<Ref, Object> e : vals.entrySet())
{
Ref ref = e.getKey();
+ Object oldval = ref.tvals == null ? null : ref.tvals.val;
+ Object newval = e.getValue();
if(ref.tvals == null)
{
- ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs);
+ ref.tvals = new Ref.TVal(newval, commitPoint, msecs);
}
else if(ref.faults.get() > 0)
{
- ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
+ ref.tvals = new Ref.TVal(newval, commitPoint, msecs, ref.tvals);
ref.faults.set(0);
}
else
{
ref.tvals = ref.tvals.next;
- ref.tvals.val = e.getValue();
+ ref.tvals.val = newval;
ref.tvals.point = commitPoint;
ref.tvals.msecs = msecs;
}
+ if(ref.getWatches().count() > 0)
+ notify.add(new Notify(ref, oldval, newval));
}
done = true;
@@ -301,14 +317,25 @@ Object run(Callable fn) throws Exception{
}
locked.clear();
stop(done ? COMMITTED : RETRY);
- if(done) //re-dispatch out of transaction
+ try
{
- for(Agent.Action action : actions)
+ if(done) //re-dispatch out of transaction
{
- Agent.dispatchAction(action);
+ for(Notify n : notify)
+ {
+ n.ref.notifyWatches(n.oldval, n.newval);
+ }
+ for(Agent.Action action : actions)
+ {
+ Agent.dispatchAction(action);
+ }
}
}
- actions.clear();
+ finally
+ {
+ notify.clear();
+ actions.clear();
+ }
}
}
if(!done)
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index 02fc2e1b..598a5785 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -141,7 +141,7 @@ final public Object deref(){
}
public void setValidator(IFn vf){
- if(isBound())
+ if(hasRoot())
validate(vf, getRoot());
validator = vf;
}
@@ -203,7 +203,9 @@ public boolean isPublic(){
}
public Object getRoot(){
- return root;
+ if(hasRoot())
+ return root;
+ throw new IllegalStateException(String.format("Var %s/%s is unbound.", ns, sym));
}
public Object getTag(){
@@ -228,6 +230,7 @@ final public boolean hasRoot(){
//binding root always clears macro flag
synchronized public void bindRoot(Object root){
validate(getValidator(), root);
+ Object oldroot = hasRoot()?this.root:null;
this.root = root;
try
{
@@ -237,13 +240,14 @@ synchronized public void bindRoot(Object root){
{
throw new RuntimeException(e);
}
- notifyWatches();
+ notifyWatches(oldroot,this.root);
}
synchronized void swapRoot(Object root){
validate(getValidator(), root);
+ Object oldroot = hasRoot()?this.root:null;
this.root = root;
- notifyWatches();
+ notifyWatches(oldroot,root);
}
synchronized public void unbindRoot(){
@@ -253,15 +257,17 @@ synchronized public void unbindRoot(){
synchronized public void commuteRoot(IFn fn) throws Exception{
Object newRoot = fn.invoke(root);
validate(getValidator(), newRoot);
+ Object oldroot = getRoot();
this.root = newRoot;
- notifyWatches();
+ notifyWatches(oldroot,newRoot);
}
synchronized public Object alterRoot(IFn fn, ISeq args) throws Exception{
Object newRoot = fn.applyTo(RT.cons(root, args));
validate(getValidator(), newRoot);
+ Object oldroot = getRoot();
this.root = newRoot;
- notifyWatches();
+ notifyWatches(oldroot,newRoot);
return newRoot;
}