summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2010-10-15 10:26:05 -0400
committerRich Hickey <richhickey@gmail.com>2010-10-15 10:26:05 -0400
commitb1bed1b1c936855801e6d6dd40e649fddb6c039d (patch)
treec3056aec12974ba2e56d93279e8a5707a68dbd19
parent1f52d7de92bf22a777574dade0c5efdbe5dc5390 (diff)
binding conveyance to future calls and agent sends
-rw-r--r--src/clj/clojure/core.clj33
-rw-r--r--src/jvm/clojure/lang/Var.java86
2 files changed, 72 insertions, 47 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 83297d20..b399f6cf 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -1753,7 +1753,29 @@
nil if no var with that name."
{:added "1.0"
:static true}
- [sym] (. clojure.lang.Var (find sym)))
+ [sym] (. clojure.lang.Var (find sym)))
+
+(defn binding-conveyor-fn
+ {:private true
+ :added "1.3"}
+ [f]
+ (let [frame (clojure.lang.Var/getThreadBindingFrame)]
+ (fn
+ ([]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f))
+ ([x]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x))
+ ([x y]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x y))
+ ([x y z]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x y z))
+ ([x y z & args]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (apply f x y z args)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn ^{:private true}
@@ -1809,7 +1831,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (. a (dispatch f args false)))
+ (. a (dispatch (binding-conveyor-fn f) args false)))
(defn send-off
"Dispatch a potentially blocking action to an agent. Returns the
@@ -1820,7 +1842,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (. a (dispatch f args true)))
+ (. a (dispatch (binding-conveyor-fn f) args true)))
(defn release-pending-sends
"Normally, actions sent directly or indirectly during another action
@@ -5830,8 +5852,9 @@
not yet finished, calls to deref/@ will block."
{:added "1.1"
:static true}
- [^Callable f]
- (let [fut (.submit clojure.lang.Agent/soloExecutor f)]
+ [f]
+ (let [f (binding-conveyor-fn f)
+ fut (.submit clojure.lang.Agent/soloExecutor ^Callable f)]
(reify
clojure.lang.IDeref
(deref [_] (.get fut))
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index 12114222..6df652b0 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -12,26 +12,36 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
public final class Var extends ARef implements IFn, IRef, Settable{
+static class TBox{
+
+volatile Object val;
+final Thread thread;
+
+public TBox(Thread t, Object val){
+ this.thread = t;
+ this.val = val;
+}
+}
static class Frame{
- //Var->Box
+ //Var->TBox
Associative bindings;
//Var->val
- Associative frameBindings;
+// Associative frameBindings;
Frame prev;
public Frame(){
- this(PersistentHashMap.EMPTY, PersistentHashMap.EMPTY, null);
+ this(PersistentHashMap.EMPTY, null);
}
- public Frame(Associative frameBindings, Associative bindings, Frame prev){
- this.frameBindings = frameBindings;
+ public Frame(Associative bindings, Frame prev){
+// this.frameBindings = frameBindings;
this.bindings = bindings;
this.prev = prev;
}
@@ -52,12 +62,23 @@ static Keyword nsKey = Keyword.intern(null, "ns");
//static Keyword tagKey = Keyword.intern(null, "tag");
volatile Object root;
-transient final AtomicInteger count;
+transient final AtomicBoolean threadBound;
public final Symbol sym;
public final Namespace ns;
//IPersistentMap _meta;
+public static Object getThreadBindingFrame(){
+ Frame f = dvals.get();
+ if(f != null)
+ return f;
+ return new Frame();
+}
+
+public static void resetThreadBindingFrame(Object frame){
+ dvals.set((Frame) frame);
+}
+
public static Var intern(Namespace ns, Symbol sym, Object root){
return intern(ns, sym, root, true);
}
@@ -113,7 +134,7 @@ public static Var create(Object root){
Var(Namespace ns, Symbol sym){
this.ns = ns;
this.sym = sym;
- this.count = new AtomicInteger();
+ this.threadBound = new AtomicBoolean(false);
this.root = dvals; //use dvals as magic not-bound value
setMeta(PersistentHashMap.EMPTY);
}
@@ -124,17 +145,17 @@ Var(Namespace ns, Symbol sym, Object root){
}
public boolean isBound(){
- return hasRoot() || (count.get() > 0 && dvals.get().bindings.containsKey(this));
+ return hasRoot() || (threadBound.get() && dvals.get().bindings.containsKey(this));
}
final public Object get(){
- if(count.get() == 0 && root != dvals)
+ if(!threadBound.get() && root != dvals)
return root;
return deref();
}
final public Object deref(){
- Box b = getThreadBinding();
+ TBox b = getThreadBinding();
if(b != null)
return b.val;
if(hasRoot())
@@ -155,15 +176,13 @@ public Object alter(IFn fn, ISeq args) throws Exception{
public Object set(Object val){
validate(getValidator(), val);
- Box b = getThreadBinding();
+ TBox b = getThreadBinding();
if(b != null)
+ {
+ if(Thread.currentThread() != b.thread)
+ throw new IllegalStateException(String.format("Can't set!: %s from non-binding thread", sym));
return (b.val = val);
- //jury still out on this
-// if(hasRoot())
-// {
-// bindRoot(val);
-// return val;
-// }
+ }
throw new IllegalStateException(String.format("Can't change/establish root binding of: %s with set", sym));
}
@@ -285,36 +304,19 @@ public static void pushThreadBindings(Associative bindings){
IMapEntry e = (IMapEntry) bs.first();
Var v = (Var) e.key();
v.validate(v.getValidator(), e.val());
- v.count.incrementAndGet();
- bmap = bmap.assoc(v, new Box(e.val()));
+ v.threadBound.set(true);
+ bmap = bmap.assoc(v, new TBox(Thread.currentThread(), e.val()));
}
- dvals.set(new Frame(bindings, bmap, f));
+ dvals.set(new Frame(bmap, f));
}
public static void popThreadBindings(){
Frame f = dvals.get();
if(f.prev == null)
throw new IllegalStateException("Pop without matching push");
- for(ISeq bs = RT.keys(f.frameBindings); bs != null; bs = bs.next())
- {
- Var v = (Var) bs.first();
- v.count.decrementAndGet();
- }
dvals.set(f.prev);
}
-public static void releaseThreadBindings(){
- Frame f = dvals.get();
- if(f.prev == null)
- throw new IllegalStateException("Release without full unwind");
- for(ISeq bs = RT.keys(f.bindings); bs != null; bs = bs.next())
- {
- Var v = (Var) bs.first();
- v.count.decrementAndGet();
- }
- dvals.set(null);
-}
-
public static Associative getThreadBindings(){
Frame f = dvals.get();
IPersistentMap ret = PersistentHashMap.EMPTY;
@@ -322,18 +324,18 @@ public static Associative getThreadBindings(){
{
IMapEntry e = (IMapEntry) bs.first();
Var v = (Var) e.key();
- Box b = (Box) e.val();
+ TBox b = (TBox) e.val();
ret = ret.assoc(v, b.val);
}
return ret;
}
-public final Box getThreadBinding(){
- if(count.get() > 0)
+public final TBox getThreadBinding(){
+ if(threadBound.get())
{
IMapEntry e = dvals.get().bindings.entryAt(this);
if(e != null)
- return (Box) e.val();
+ return (TBox) e.val();
}
return null;
}