diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/boot.clj | 27 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java (renamed from src/jvm/clojure/lang/IRef.java) | 96 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 12 | ||||
-rw-r--r-- | src/jvm/clojure/lang/RT.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 3 |
5 files changed, 41 insertions, 99 deletions
diff --git a/src/boot.clj b/src/boot.clj index f2ebdcc8..8c6a529a 100644 --- a/src/boot.clj +++ b/src/boot.clj @@ -350,19 +350,19 @@ (. clojure.lang.Var (find sym))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn iref [state] - (new clojure.lang.IRef state)) +(defn agent [state] + (new clojure.lang.Agent state)) -(defn iref-of [state] - (:iref ^state)) +(defn agent-of [state] + (:agent ^state)) -(defn ! [#^clojure.lang.IRef a f & args] - (. a (commute f args))) +(defn ! [#^clojure.lang.Agent a f & args] + (. a (dispatch f args))) -(defn iref-errors [#^clojure.lang.IRef a] +(defn agent-errors [#^clojure.lang.Agent a] (. a (getErrors))) -(defn clear-iref-errors [#^clojure.lang.IRef a] +(defn clear-agent-errors [#^clojure.lang.Agent a] (. a (clearErrors))) (defn tref [x] @@ -374,13 +374,10 @@ (defn commute [#^clojure.lang.TRef ref fun & args] (. ref (commute fun args))) -(defn send [#^clojure.lang.IRef ref fun & args] - (. ref (send fun args))) - -(defn alter [#^clojure.lang.Ref ref fun & args] +(defn alter [#^clojure.lang.TRef ref fun & args] (. ref (alter fun args))) -(defn set [#^clojure.lang.Ref ref val] +(defn set [#^clojure.lang.TRef ref val] (. ref (set val))) (defn ensure [#^clojure.lang.TRef ref] @@ -783,8 +780,8 @@ rseq sym name namespace locking .. -> defmulti defmethod remove-method binding find-var - tref deref commute alter set ensure sync send - iref iref-of iref-errors clear-iref-errors + tref deref commute alter set ensure sync ! + agent agent-of agent-errors clear-agent-errors reduce reverse comp appl every not-every any not-any map pmap mapcat filter take take-while drop drop-while diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/Agent.java index e9acfa57..f886de62 100644 --- a/src/jvm/clojure/lang/IRef.java +++ b/src/jvm/clojure/lang/Agent.java @@ -17,27 +17,25 @@ import java.util.LinkedList; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -public class IRef implements Ref{ +public class Agent implements Ref{ volatile Object state; final Queue q = new LinkedList(); boolean busy = false; -boolean altering = false; volatile ISeq errors = null; //todo - make tuneable -final static Executor executor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); +final static Executor executor = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); //final static Executor executor = Executors.newCachedThreadPool(); final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); -final static ThreadLocal inAlter = new ThreadLocal(); static class Action implements Runnable{ - final IRef iref; + final Agent agent; final IFn fn; final ISeq args; - public Action(IRef iref, IFn fn, ISeq args){ - this.iref = iref; + public Action(Agent agent, IFn fn, ISeq args){ + this.agent = agent; this.args = args; this.fn = fn; } @@ -47,12 +45,12 @@ static class Action implements Runnable{ boolean hadError = false; try { - iref.doAlter(fn, args); + agent.setState(fn.applyTo(RT.cons(agent.state, args))); } catch(Exception e) { //todo report/callback - iref.errors = RT.cons(e, iref.errors); + agent.errors = RT.cons(e, agent.errors); hadError = true; } @@ -61,19 +59,19 @@ static class Action implements Runnable{ for(ISeq s = nested.get().seq(); s != null; s = s.rest()) { Action a = (Action) s.first(); - a.iref.enqueue(a); + a.agent.enqueue(a); } } - synchronized(iref) + synchronized(agent) { - if(!iref.q.isEmpty()) + if(!agent.q.isEmpty()) { - executor.execute((Runnable) iref.q.remove()); + executor.execute((Runnable) agent.q.remove()); } else { - iref.busy = false; + agent.busy = false; } } @@ -81,7 +79,7 @@ static class Action implements Runnable{ } } -public IRef(Object state){ +public Agent(Object state){ setState(state); } @@ -89,15 +87,19 @@ void setState(Object newState){ if(newState instanceof IObj) { IObj o = (IObj) newState; - if(RT.get(o.meta(), RT.IREF_KEY) != this) + if(RT.get(o.meta(), RT.AGENT_KEY) != this) { - newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.IREF_KEY, this)); + newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.AGENT_KEY, this)); } } state = newState; } -public Object get(){ +public Object get() throws Exception{ + if(errors != null) + { + throw new Exception("Agent has errors", (Exception) RT.first(errors)); + } return state; } @@ -109,49 +111,10 @@ public void clearErrors(){ errors = null; } -synchronized void doAlter(IFn fn, ISeq args) throws Exception{ - try - { - altering = true; - setState(fn.applyTo(RT.cons(state, args))); - } - finally - { - altering = false; - } -} - -public Object alter(IFn fn, ISeq args) throws Exception{ - if(errors != null) - { - throw new Exception("IRef has errors", (Exception) RT.first(errors)); - } - //Action action = new Action(this, fn, args); - if(altering) - throw new Exception("Recursive change"); - LockingTransaction trans = LockingTransaction.getRunning(); - if(trans != null) - throw new Exception("Cannot alter an IRef in a transaction"); - if(inAlter.get() != null) - throw new Exception("Cannot nest alters, use send"); - - try - { - inAlter.set(this); - doAlter(fn, args); - } - finally - { - inAlter.set(null); - } - - return this; -} - -public Object send(IFn fn, ISeq args) throws Exception{ +public Object dispatch(IFn fn, ISeq args) throws Exception{ if(errors != null) { - throw new Exception("IRef has errors", (Exception) RT.first(errors)); + throw new Exception("Agent has errors", (Exception) RT.first(errors)); } Action action = new Action(this, fn, args); LockingTransaction trans = LockingTransaction.getRunning(); @@ -167,21 +130,6 @@ public Object send(IFn fn, ISeq args) throws Exception{ return this; } -public Object set(Object val) throws Exception{ - synchronized(this) - { - if(altering) - throw new Exception("Recursive change"); - LockingTransaction trans = LockingTransaction.getRunning(); - if(trans != null) - throw new Exception("Cannot set an IRef in a transaction"); - if(inAlter.get() != null) - throw new Exception("Cannot nest alters, use send"); - setState(val); - return val; - } -} - void enqueue(Action action){ synchronized(this) { diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 79c49829..ccc8102d 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -97,7 +97,7 @@ long readPoint; long startPoint; long startTime; final RetryException retryex = new RetryException(); -final ArrayList<IRef.Action> actions = new ArrayList<IRef.Action>(); +final ArrayList<Agent.Action> actions = new ArrayList<Agent.Action>(); final HashMap<TRef, Object> vals = new HashMap<TRef, Object>(); final HashSet<TRef> sets = new HashSet<TRef>(); final TreeMap<TRef, ArrayList<CFn>> commutes = new TreeMap<TRef, ArrayList<CFn>>(); @@ -270,9 +270,9 @@ Object run(IFn fn) throws Exception{ ref.tvals.msecs = msecs; } } - for(IRef.Action action : actions) + for(Agent.Action action : actions) { - action.iref.enqueue(action); + action.agent.enqueue(action); } done = true; info.status.set(COMMITTED); @@ -297,7 +297,7 @@ Object run(IFn fn) throws Exception{ return ret; } -public void enqueue(IRef.Action action){ +public void enqueue(Agent.Action action){ actions.add(action); } @@ -368,8 +368,8 @@ Object doCommute(TRef ref, IFn fn, ISeq args) throws Exception{ ArrayList<CFn> fns = commutes.get(ref); if(fns == null) commutes.put(ref, fns = new ArrayList<CFn>()); - fns.add(new CFn(fn,args)); - Object ret = fn.applyTo(RT.cons(vals.get(ref),args)); + fns.add(new CFn(fn, args)); + Object ret = fn.applyTo(RT.cons(vals.get(ref), args)); vals.put(ref, ret); return ret; } diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java index 1c54b923..30af3c4b 100644 --- a/src/jvm/clojure/lang/RT.java +++ b/src/jvm/clojure/lang/RT.java @@ -25,7 +25,7 @@ final static public Var IN = Var.intern(Symbol.create("clojure", "*in*"), new LineNumberingPushbackReader(new InputStreamReader(System.in))); final static Keyword TAG_KEY = Keyword.intern("clojure", "tag"); -final static Keyword IREF_KEY = Keyword.intern("clojure", "iref"); +final static Keyword AGENT_KEY = Keyword.intern("clojure", "agent"); //final static public Var CURRENT_MODULE = Var.intern(Symbol.create("clojure", "current-module"), // Module.findOrCreateModule("clojure/user")); diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index b5c2b6bd..55397a0a 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -16,7 +16,4 @@ public interface Ref{ Object get() throws Exception; -Object alter(IFn fn, ISeq args) throws Exception; - -Object set(Object val) throws Exception; } |