diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/clojure/boot.clj | 23 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 40 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Delay.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IRef.java | 3 | ||||
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 6 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 65 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Var.java | 29 |
7 files changed, 148 insertions, 20 deletions
diff --git a/src/clojure/boot.clj b/src/clojure/boot.clj index b585c5ca..bfcdafed 100644 --- a/src/clojure/boot.clj +++ b/src/clojure/boot.clj @@ -965,8 +965,9 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn agent - "Creates and returns an agent with an initial value of state." - [state] (new clojure.lang.Agent state)) + "Creates and returns an agent with an initial value of state and an optional validate fn." + ([state] (new clojure.lang.Agent state)) + ([state validate-fn] (new clojure.lang.Agent state validate-fn))) (defn ! [& args] (throw (new Exception "! is now send. See also send-off"))) @@ -998,9 +999,16 @@ agent, allowing subsequent actions to occur." [#^clojure.lang.Agent a] (. a (clearErrors))) +(defn shutdown-agents + "Initiates a shutdown of the thread pools that back the agent + system. Running actions will complete, but no new actions will be + accepted" + [] (. clojure.lang.Agent shutdown)) + (defn ref - "Creates and returns a Ref with an initial value of x." - [x] (new clojure.lang.Ref x)) + "Creates and returns a Ref with an initial value of x and an optional validate fn." + ([x] (new clojure.lang.Ref x)) + ([x validate-fn] (new clojure.lang.Ref x validate-fn))) (defn deref "Also reader macro: @ref/@agent Within a transaction, returns the @@ -1009,6 +1017,13 @@ returns its current state." [#^clojure.lang.IRef ref] (. ref (get))) +(defn set-validator + "Sets the validator-fn for a var/ref/agent." + [#^clojure.lang.IRef iref validator-fn] (. iref (setValidator validator-fn))) + +(defn get-validator + "Gets the validator-fn for a var/ref/agent." + [#^clojure.lang.IRef iref] (. iref (getValidator))) (defn commute "Must be called in a transaction. Sets the in-transaction-value of diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index cc81ec18..5b776a70 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -17,18 +17,24 @@ import java.util.concurrent.atomic.AtomicReference; public class Agent implements IRef{ volatile Object state; +volatile IFn validator = null; AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY); volatile ISeq errors = null; -final public static Executor pooledExecutor = +final public static ExecutorService pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); -final static Executor soloExecutor = Executors.newCachedThreadPool(); +final static ExecutorService soloExecutor = Executors.newCachedThreadPool(); final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>(); +public static void shutdown(){ + soloExecutor.shutdown(); + pooledExecutor.shutdown(); +} + static class Action implements Runnable{ final Agent agent; final IFn fn; @@ -102,14 +108,31 @@ static class Action implements Runnable{ } } -public Agent(Object state){ +public Agent(Object state) throws Exception{ + this(state,null); +} + +public Agent(Object state, IFn validator) throws Exception{ + this.validator = validator; setState(state); } -void setState(Object newState){ +void setState(Object newState) throws Exception{ + validate(getValidator(),newState); state = newState; } +void validate(IFn vf, Object val){ + try{ + if(vf != null) + vf.invoke(val); + } + catch(Exception e) + { + throw new IllegalStateException("Invalid agent state", e); + } +} + public Object get() throws Exception{ if(errors != null) { @@ -118,6 +141,15 @@ public Object get() throws Exception{ return state; } +public void setValidator(IFn vf){ + validate(vf,state); + validator = vf; +} + +public IFn getValidator(){ + return validator; +} + public ISeq getErrors(){ return errors; } diff --git a/src/jvm/clojure/lang/Delay.java b/src/jvm/clojure/lang/Delay.java index ae1cfa2e..18a8d1c1 100644 --- a/src/jvm/clojure/lang/Delay.java +++ b/src/jvm/clojure/lang/Delay.java @@ -12,7 +12,7 @@ package clojure.lang; -public class Delay extends AFn implements IRef{ +public class Delay extends AFn{ Object val; IFn fn; diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java index efb1d0b7..b86e8e1b 100644 --- a/src/jvm/clojure/lang/IRef.java +++ b/src/jvm/clojure/lang/IRef.java @@ -16,4 +16,7 @@ public interface IRef{ Object get() throws Exception; +void setValidator(IFn vf); + +IFn getValidator(); } diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 837187d7..3a8eaee2 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -246,6 +246,12 @@ Object run(IFn fn) throws Exception{ } } + //validate + for(Ref ref : sets) + { + ref.validate(ref.getValidator(), ref.get()); + } + //at this point, all values calced, all refs to be written locked //no more client code to be called long msecs = System.currentTimeMillis(); diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index 7c1c86b2..31b1077b 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -54,8 +54,9 @@ final AtomicInteger faults; final ReentrantReadWriteLock lock; LockingTransaction.Info tinfo; final UUID uuid; +IFn validator; -public Ref(){ +Ref(){ this.tvals = null; this.tinfo = null; this.faults = new AtomicInteger(); @@ -63,26 +64,34 @@ public Ref(){ this.uuid = UUID.randomUUID(); } -public Ref(Object initVal){ +public Ref(Object initVal) throws Exception{ + this(initVal, null); +} + +public Ref(Object initVal,IFn validator) throws Exception{ this(); + if(validator != null) + validate(validator,initVal); + this.validator = validator; tvals = new TVal(initVal, 0, System.currentTimeMillis()); } //note - makes no attempt to ensure there is no other Ref with same UUID //use only with a cache/registry -public Ref(UUID uuid, Object initVal){ - tvals = new TVal(initVal, 0, System.currentTimeMillis()); - this.tinfo = null; - this.faults = new AtomicInteger(); - this.lock = new ReentrantReadWriteLock(); - this.uuid = uuid; -} +//public Ref(UUID uuid, Object initVal){ +// tvals = new TVal(initVal, 0, System.currentTimeMillis()); +// this.tinfo = null; +// this.faults = new AtomicInteger(); +// this.lock = new ReentrantReadWriteLock(); +// this.uuid = uuid; +//} public UUID getUUID(){ return uuid; } + //the latest val // ok out of transaction @@ -109,6 +118,42 @@ public Object get(){ return t.doGet(this); } +void validate(IFn vf, Object val){ + try{ + if(vf != null) + vf.invoke(val); + } + catch(Exception e) + { + throw new IllegalStateException("Invalid ref state", e); + } +} + +public void setValidator(IFn vf){ + try + { + lock.writeLock().lock(); + validate(vf,currentVal()); + validator = vf; + } + finally + { + lock.writeLock().unlock(); + } +} + +public IFn getValidator(){ + try + { + lock.readLock().lock(); + return validator; + } + finally + { + lock.readLock().unlock(); + } +} + public Object set(Object val){ return LockingTransaction.getEx().doSet(this, val); } @@ -140,7 +185,7 @@ boolean isBound(){ } -void trimHistory(){ +public void trimHistory(){ try { lock.writeLock().lock(); diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java index 1ac0b302..f7d42c22 100644 --- a/src/jvm/clojure/lang/Var.java +++ b/src/jvm/clojure/lang/Var.java @@ -54,6 +54,7 @@ volatile Object root; transient final AtomicInteger count; public final Symbol sym; public final Namespace ns; +volatile IFn validator = null; IPersistentMap _meta; @@ -128,12 +129,34 @@ final public Object get(){ throw new IllegalStateException(String.format("Var %s is unbound.", sym)); } +public void setValidator(IFn vf){ + if(isBound()) + validate(vf,getRoot()); + validator = vf; +} + +public IFn getValidator(){ + return validator; +} + public Object alter(IFn fn, ISeq args) throws Exception{ set(fn.applyTo(RT.cons(get(), args))); return this; } +void validate(IFn vf, Object val){ + try{ + if(vf != null) + vf.invoke(val); + } + catch(Exception e) + { + throw new IllegalStateException("Invalid var state", e); + } +} + public Object set(Object val){ + validate(getValidator(),val); Box b = getThreadBinding(); if(b != null) return (b.val = val); @@ -193,6 +216,7 @@ final public boolean hasRoot(){ //binding root always clears macro flag synchronized public void bindRoot(Object root){ + validate(getValidator(), root); this.root = root; _meta = _meta.assoc(macroKey, RT.F); } @@ -202,7 +226,9 @@ synchronized public void unbindRoot(){ } synchronized public void commuteRoot(IFn fn) throws Exception{ - this.root = fn.invoke(root); + Object newRoot = fn.invoke(root); + validate(getValidator(),newRoot); + this.root = newRoot; } public static void pushThreadBindings(Associative bindings){ @@ -212,6 +238,7 @@ public static void pushThreadBindings(Associative bindings){ { IMapEntry e = (IMapEntry) bs.first(); Var v = (Var) e.key(); + v.validate(v.getValidator(), e.val()); v.count.incrementAndGet(); bmap = bmap.assoc(v, new Box(e.val())); } |