summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/clojure/boot.clj23
-rw-r--r--src/jvm/clojure/lang/Agent.java40
-rw-r--r--src/jvm/clojure/lang/Delay.java2
-rw-r--r--src/jvm/clojure/lang/IRef.java3
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java6
-rw-r--r--src/jvm/clojure/lang/Ref.java65
-rw-r--r--src/jvm/clojure/lang/Var.java29
7 files changed, 148 insertions, 20 deletions
diff --git a/src/clojure/boot.clj b/src/clojure/boot.clj
index b585c5ca..bfcdafed 100644
--- a/src/clojure/boot.clj
+++ b/src/clojure/boot.clj
@@ -965,8 +965,9 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn agent
- "Creates and returns an agent with an initial value of state."
- [state] (new clojure.lang.Agent state))
+ "Creates and returns an agent with an initial value of state and an optional validate fn."
+ ([state] (new clojure.lang.Agent state))
+ ([state validate-fn] (new clojure.lang.Agent state validate-fn)))
(defn ! [& args] (throw (new Exception "! is now send. See also send-off")))
@@ -998,9 +999,16 @@
agent, allowing subsequent actions to occur."
[#^clojure.lang.Agent a] (. a (clearErrors)))
+(defn shutdown-agents
+ "Initiates a shutdown of the thread pools that back the agent
+ system. Running actions will complete, but no new actions will be
+ accepted"
+ [] (. clojure.lang.Agent shutdown))
+
(defn ref
- "Creates and returns a Ref with an initial value of x."
- [x] (new clojure.lang.Ref x))
+ "Creates and returns a Ref with an initial value of x and an optional validate fn."
+ ([x] (new clojure.lang.Ref x))
+ ([x validate-fn] (new clojure.lang.Ref x validate-fn)))
(defn deref
"Also reader macro: @ref/@agent Within a transaction, returns the
@@ -1009,6 +1017,13 @@
returns its current state."
[#^clojure.lang.IRef ref] (. ref (get)))
+(defn set-validator
+ "Sets the validator-fn for a var/ref/agent."
+ [#^clojure.lang.IRef iref validator-fn] (. iref (setValidator validator-fn)))
+
+(defn get-validator
+ "Gets the validator-fn for a var/ref/agent."
+ [#^clojure.lang.IRef iref] (. iref (getValidator)))
(defn commute
"Must be called in a transaction. Sets the in-transaction-value of
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index cc81ec18..5b776a70 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -17,18 +17,24 @@ import java.util.concurrent.atomic.AtomicReference;
public class Agent implements IRef{
volatile Object state;
+volatile IFn validator = null;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
volatile ISeq errors = null;
-final public static Executor pooledExecutor =
+final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
-final static Executor soloExecutor = Executors.newCachedThreadPool();
+final static ExecutorService soloExecutor = Executors.newCachedThreadPool();
final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
+public static void shutdown(){
+ soloExecutor.shutdown();
+ pooledExecutor.shutdown();
+}
+
static class Action implements Runnable{
final Agent agent;
final IFn fn;
@@ -102,14 +108,31 @@ static class Action implements Runnable{
}
}
-public Agent(Object state){
+public Agent(Object state) throws Exception{
+ this(state,null);
+}
+
+public Agent(Object state, IFn validator) throws Exception{
+ this.validator = validator;
setState(state);
}
-void setState(Object newState){
+void setState(Object newState) throws Exception{
+ validate(getValidator(),newState);
state = newState;
}
+void validate(IFn vf, Object val){
+ try{
+ if(vf != null)
+ vf.invoke(val);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalStateException("Invalid agent state", e);
+ }
+}
+
public Object get() throws Exception{
if(errors != null)
{
@@ -118,6 +141,15 @@ public Object get() throws Exception{
return state;
}
+public void setValidator(IFn vf){
+ validate(vf,state);
+ validator = vf;
+}
+
+public IFn getValidator(){
+ return validator;
+}
+
public ISeq getErrors(){
return errors;
}
diff --git a/src/jvm/clojure/lang/Delay.java b/src/jvm/clojure/lang/Delay.java
index ae1cfa2e..18a8d1c1 100644
--- a/src/jvm/clojure/lang/Delay.java
+++ b/src/jvm/clojure/lang/Delay.java
@@ -12,7 +12,7 @@
package clojure.lang;
-public class Delay extends AFn implements IRef{
+public class Delay extends AFn{
Object val;
IFn fn;
diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java
index efb1d0b7..b86e8e1b 100644
--- a/src/jvm/clojure/lang/IRef.java
+++ b/src/jvm/clojure/lang/IRef.java
@@ -16,4 +16,7 @@ public interface IRef{
Object get() throws Exception;
+void setValidator(IFn vf);
+
+IFn getValidator();
}
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 837187d7..3a8eaee2 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -246,6 +246,12 @@ Object run(IFn fn) throws Exception{
}
}
+ //validate
+ for(Ref ref : sets)
+ {
+ ref.validate(ref.getValidator(), ref.get());
+ }
+
//at this point, all values calced, all refs to be written locked
//no more client code to be called
long msecs = System.currentTimeMillis();
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index 7c1c86b2..31b1077b 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -54,8 +54,9 @@ final AtomicInteger faults;
final ReentrantReadWriteLock lock;
LockingTransaction.Info tinfo;
final UUID uuid;
+IFn validator;
-public Ref(){
+Ref(){
this.tvals = null;
this.tinfo = null;
this.faults = new AtomicInteger();
@@ -63,26 +64,34 @@ public Ref(){
this.uuid = UUID.randomUUID();
}
-public Ref(Object initVal){
+public Ref(Object initVal) throws Exception{
+ this(initVal, null);
+}
+
+public Ref(Object initVal,IFn validator) throws Exception{
this();
+ if(validator != null)
+ validate(validator,initVal);
+ this.validator = validator;
tvals = new TVal(initVal, 0, System.currentTimeMillis());
}
//note - makes no attempt to ensure there is no other Ref with same UUID
//use only with a cache/registry
-public Ref(UUID uuid, Object initVal){
- tvals = new TVal(initVal, 0, System.currentTimeMillis());
- this.tinfo = null;
- this.faults = new AtomicInteger();
- this.lock = new ReentrantReadWriteLock();
- this.uuid = uuid;
-}
+//public Ref(UUID uuid, Object initVal){
+// tvals = new TVal(initVal, 0, System.currentTimeMillis());
+// this.tinfo = null;
+// this.faults = new AtomicInteger();
+// this.lock = new ReentrantReadWriteLock();
+// this.uuid = uuid;
+//}
public UUID getUUID(){
return uuid;
}
+
//the latest val
// ok out of transaction
@@ -109,6 +118,42 @@ public Object get(){
return t.doGet(this);
}
+void validate(IFn vf, Object val){
+ try{
+ if(vf != null)
+ vf.invoke(val);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalStateException("Invalid ref state", e);
+ }
+}
+
+public void setValidator(IFn vf){
+ try
+ {
+ lock.writeLock().lock();
+ validate(vf,currentVal());
+ validator = vf;
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+}
+
+public IFn getValidator(){
+ try
+ {
+ lock.readLock().lock();
+ return validator;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+}
+
public Object set(Object val){
return LockingTransaction.getEx().doSet(this, val);
}
@@ -140,7 +185,7 @@ boolean isBound(){
}
-void trimHistory(){
+public void trimHistory(){
try
{
lock.writeLock().lock();
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index 1ac0b302..f7d42c22 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -54,6 +54,7 @@ volatile Object root;
transient final AtomicInteger count;
public final Symbol sym;
public final Namespace ns;
+volatile IFn validator = null;
IPersistentMap _meta;
@@ -128,12 +129,34 @@ final public Object get(){
throw new IllegalStateException(String.format("Var %s is unbound.", sym));
}
+public void setValidator(IFn vf){
+ if(isBound())
+ validate(vf,getRoot());
+ validator = vf;
+}
+
+public IFn getValidator(){
+ return validator;
+}
+
public Object alter(IFn fn, ISeq args) throws Exception{
set(fn.applyTo(RT.cons(get(), args)));
return this;
}
+void validate(IFn vf, Object val){
+ try{
+ if(vf != null)
+ vf.invoke(val);
+ }
+ catch(Exception e)
+ {
+ throw new IllegalStateException("Invalid var state", e);
+ }
+}
+
public Object set(Object val){
+ validate(getValidator(),val);
Box b = getThreadBinding();
if(b != null)
return (b.val = val);
@@ -193,6 +216,7 @@ final public boolean hasRoot(){
//binding root always clears macro flag
synchronized public void bindRoot(Object root){
+ validate(getValidator(), root);
this.root = root;
_meta = _meta.assoc(macroKey, RT.F);
}
@@ -202,7 +226,9 @@ synchronized public void unbindRoot(){
}
synchronized public void commuteRoot(IFn fn) throws Exception{
- this.root = fn.invoke(root);
+ Object newRoot = fn.invoke(root);
+ validate(getValidator(),newRoot);
+ this.root = newRoot;
}
public static void pushThreadBindings(Associative bindings){
@@ -212,6 +238,7 @@ public static void pushThreadBindings(Associative bindings){
{
IMapEntry e = (IMapEntry) bs.first();
Var v = (Var) e.key();
+ v.validate(v.getValidator(), e.val());
v.count.incrementAndGet();
bmap = bmap.assoc(v, new Box(e.val()));
}