summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2010-10-15 10:27:14 -0400
committerRich Hickey <richhickey@gmail.com>2010-10-15 10:27:14 -0400
commit64e4a00b25baed8315e17752241a3a3d28b5280c (patch)
treeb923ee3d71e7107d8a9e8e37afe344f48466dc1e
parent479bb230b410cd39f3ca94120729096a38c8df67 (diff)
parentb1bed1b1c936855801e6d6dd40e649fddb6c039d (diff)
Merge branch 'direct'
-rw-r--r--src/clj/clojure/core.clj33
-rw-r--r--src/jvm/clojure/lang/MethodImplCache.java2
-rw-r--r--src/jvm/clojure/lang/Var.java86
3 files changed, 73 insertions, 48 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj
index 89706868..2f5504cc 100644
--- a/src/clj/clojure/core.clj
+++ b/src/clj/clojure/core.clj
@@ -1753,7 +1753,29 @@
nil if no var with that name."
{:added "1.0"
:static true}
- [sym] (. clojure.lang.Var (find sym)))
+ [sym] (. clojure.lang.Var (find sym)))
+
+(defn binding-conveyor-fn
+ {:private true
+ :added "1.3"}
+ [f]
+ (let [frame (clojure.lang.Var/getThreadBindingFrame)]
+ (fn
+ ([]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f))
+ ([x]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x))
+ ([x y]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x y))
+ ([x y z]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (f x y z))
+ ([x y z & args]
+ (clojure.lang.Var/resetThreadBindingFrame frame)
+ (apply f x y z args)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn ^{:private true}
@@ -1809,7 +1831,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (. a (dispatch f args false)))
+ (. a (dispatch (binding-conveyor-fn f) args false)))
(defn send-off
"Dispatch a potentially blocking action to an agent. Returns the
@@ -1820,7 +1842,7 @@
{:added "1.0"
:static true}
[^clojure.lang.Agent a f & args]
- (. a (dispatch f args true)))
+ (. a (dispatch (binding-conveyor-fn f) args true)))
(defn release-pending-sends
"Normally, actions sent directly or indirectly during another action
@@ -5834,8 +5856,9 @@
not yet finished, calls to deref/@ will block."
{:added "1.1"
:static true}
- [^Callable f]
- (let [fut (.submit clojure.lang.Agent/soloExecutor f)]
+ [f]
+ (let [f (binding-conveyor-fn f)
+ fut (.submit clojure.lang.Agent/soloExecutor ^Callable f)]
(reify
clojure.lang.IDeref
(deref [_] (.get fut))
diff --git a/src/jvm/clojure/lang/MethodImplCache.java b/src/jvm/clojure/lang/MethodImplCache.java
index e4ed06d0..be77f72c 100644
--- a/src/jvm/clojure/lang/MethodImplCache.java
+++ b/src/jvm/clojure/lang/MethodImplCache.java
@@ -30,7 +30,7 @@ public final int shift;
public final int mask;
public final Object[] table; //[class, entry. class, entry ...]
-volatile Entry mre = null;
+Entry mre = null;
public MethodImplCache(IPersistentMap protocol, Keyword methodk){
this(protocol, methodk, 0, 0, RT.EMPTY_ARRAY);
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index 3d566ca7..ca3b9533 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -12,26 +12,36 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
public final class Var extends ARef implements IFn, IRef, Settable{
+static class TBox{
+
+volatile Object val;
+final Thread thread;
+
+public TBox(Thread t, Object val){
+ this.thread = t;
+ this.val = val;
+}
+}
static class Frame{
- //Var->Box
+ //Var->TBox
Associative bindings;
//Var->val
- Associative frameBindings;
+// Associative frameBindings;
Frame prev;
public Frame(){
- this(PersistentHashMap.EMPTY, PersistentHashMap.EMPTY, null);
+ this(PersistentHashMap.EMPTY, null);
}
- public Frame(Associative frameBindings, Associative bindings, Frame prev){
- this.frameBindings = frameBindings;
+ public Frame(Associative bindings, Frame prev){
+// this.frameBindings = frameBindings;
this.bindings = bindings;
this.prev = prev;
}
@@ -52,12 +62,23 @@ static Keyword nsKey = Keyword.intern(null, "ns");
//static Keyword tagKey = Keyword.intern(null, "tag");
volatile Object root;
-transient final AtomicInteger count;
+transient final AtomicBoolean threadBound;
public final Symbol sym;
public final Namespace ns;
//IPersistentMap _meta;
+public static Object getThreadBindingFrame(){
+ Frame f = dvals.get();
+ if(f != null)
+ return f;
+ return new Frame();
+}
+
+public static void resetThreadBindingFrame(Object frame){
+ dvals.set((Frame) frame);
+}
+
public static Var intern(Namespace ns, Symbol sym, Object root){
return intern(ns, sym, root, true);
}
@@ -113,7 +134,7 @@ public static Var create(Object root){
Var(Namespace ns, Symbol sym){
this.ns = ns;
this.sym = sym;
- this.count = new AtomicInteger();
+ this.threadBound = new AtomicBoolean(false);
this.root = dvals; //use dvals as magic not-bound value
setMeta(PersistentHashMap.EMPTY);
}
@@ -124,17 +145,17 @@ Var(Namespace ns, Symbol sym, Object root){
}
public boolean isBound(){
- return hasRoot() || (count.get() > 0 && dvals.get().bindings.containsKey(this));
+ return hasRoot() || (threadBound.get() && dvals.get().bindings.containsKey(this));
}
final public Object get(){
- if(count.get() == 0 && root != dvals)
+ if(!threadBound.get() && root != dvals)
return root;
return deref();
}
final public Object deref(){
- Box b = getThreadBinding();
+ TBox b = getThreadBinding();
if(b != null)
return b.val;
if(hasRoot())
@@ -155,15 +176,13 @@ public Object alter(IFn fn, ISeq args) throws Exception{
public Object set(Object val){
validate(getValidator(), val);
- Box b = getThreadBinding();
+ TBox b = getThreadBinding();
if(b != null)
+ {
+ if(Thread.currentThread() != b.thread)
+ throw new IllegalStateException(String.format("Can't set!: %s from non-binding thread", sym));
return (b.val = val);
- //jury still out on this
-// if(hasRoot())
-// {
-// bindRoot(val);
-// return val;
-// }
+ }
throw new IllegalStateException(String.format("Can't change/establish root binding of: %s with set", sym));
}
@@ -285,36 +304,19 @@ public static void pushThreadBindings(Associative bindings){
IMapEntry e = (IMapEntry) bs.first();
Var v = (Var) e.key();
v.validate(v.getValidator(), e.val());
- v.count.incrementAndGet();
- bmap = bmap.assoc(v, new Box(e.val()));
+ v.threadBound.set(true);
+ bmap = bmap.assoc(v, new TBox(Thread.currentThread(), e.val()));
}
- dvals.set(new Frame(bindings, bmap, f));
+ dvals.set(new Frame(bmap, f));
}
public static void popThreadBindings(){
Frame f = dvals.get();
if(f.prev == null)
throw new IllegalStateException("Pop without matching push");
- for(ISeq bs = RT.keys(f.frameBindings); bs != null; bs = bs.next())
- {
- Var v = (Var) bs.first();
- v.count.decrementAndGet();
- }
dvals.set(f.prev);
}
-public static void releaseThreadBindings(){
- Frame f = dvals.get();
- if(f.prev == null)
- throw new IllegalStateException("Release without full unwind");
- for(ISeq bs = RT.keys(f.bindings); bs != null; bs = bs.next())
- {
- Var v = (Var) bs.first();
- v.count.decrementAndGet();
- }
- dvals.set(null);
-}
-
public static Associative getThreadBindings(){
Frame f = dvals.get();
IPersistentMap ret = PersistentHashMap.EMPTY;
@@ -322,18 +324,18 @@ public static Associative getThreadBindings(){
{
IMapEntry e = (IMapEntry) bs.first();
Var v = (Var) e.key();
- Box b = (Box) e.val();
+ TBox b = (TBox) e.val();
ret = ret.assoc(v, b.val);
}
return ret;
}
-public final Box getThreadBinding(){
- if(count.get() > 0)
+public final TBox getThreadBinding(){
+ if(threadBound.get())
{
IMapEntry e = dvals.get().bindings.entryAt(this);
if(e != null)
- return (Box) e.val();
+ return (TBox) e.val();
}
return null;
}