diff options
author | Rich Hickey <richhickey@gmail.com> | 2010-10-15 10:27:14 -0400 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2010-10-15 10:27:14 -0400 |
commit | 64e4a00b25baed8315e17752241a3a3d28b5280c (patch) | |
tree | b923ee3d71e7107d8a9e8e37afe344f48466dc1e | |
parent | 479bb230b410cd39f3ca94120729096a38c8df67 (diff) | |
parent | b1bed1b1c936855801e6d6dd40e649fddb6c039d (diff) |
Merge branch 'direct'
-rw-r--r-- | src/clj/clojure/core.clj | 33 | ||||
-rw-r--r-- | src/jvm/clojure/lang/MethodImplCache.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Var.java | 86 |
3 files changed, 73 insertions, 48 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj index 89706868..2f5504cc 100644 --- a/src/clj/clojure/core.clj +++ b/src/clj/clojure/core.clj @@ -1753,7 +1753,29 @@ nil if no var with that name." {:added "1.0" :static true} - [sym] (. clojure.lang.Var (find sym))) + [sym] (. clojure.lang.Var (find sym))) + +(defn binding-conveyor-fn + {:private true + :added "1.3"} + [f] + (let [frame (clojure.lang.Var/getThreadBindingFrame)] + (fn + ([] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f)) + ([x] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f x)) + ([x y] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f x y)) + ([x y z] + (clojure.lang.Var/resetThreadBindingFrame frame) + (f x y z)) + ([x y z & args] + (clojure.lang.Var/resetThreadBindingFrame frame) + (apply f x y z args))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn ^{:private true} @@ -1809,7 +1831,7 @@ {:added "1.0" :static true} [^clojure.lang.Agent a f & args] - (. a (dispatch f args false))) + (. a (dispatch (binding-conveyor-fn f) args false))) (defn send-off "Dispatch a potentially blocking action to an agent. Returns the @@ -1820,7 +1842,7 @@ {:added "1.0" :static true} [^clojure.lang.Agent a f & args] - (. a (dispatch f args true))) + (. a (dispatch (binding-conveyor-fn f) args true))) (defn release-pending-sends "Normally, actions sent directly or indirectly during another action @@ -5834,8 +5856,9 @@ not yet finished, calls to deref/@ will block." {:added "1.1" :static true} - [^Callable f] - (let [fut (.submit clojure.lang.Agent/soloExecutor f)] + [f] + (let [f (binding-conveyor-fn f) + fut (.submit clojure.lang.Agent/soloExecutor ^Callable f)] (reify clojure.lang.IDeref (deref [_] (.get fut)) diff --git a/src/jvm/clojure/lang/MethodImplCache.java b/src/jvm/clojure/lang/MethodImplCache.java index e4ed06d0..be77f72c 100644 --- a/src/jvm/clojure/lang/MethodImplCache.java +++ b/src/jvm/clojure/lang/MethodImplCache.java @@ -30,7 +30,7 @@ public final int shift; public final int mask; public final Object[] table; //[class, entry. class, entry ...] -volatile Entry mre = null; +Entry mre = null; public MethodImplCache(IPersistentMap protocol, Keyword methodk){ this(protocol, methodk, 0, 0, RT.EMPTY_ARRAY); diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java index 3d566ca7..ca3b9533 100644 --- a/src/jvm/clojure/lang/Var.java +++ b/src/jvm/clojure/lang/Var.java @@ -12,26 +12,36 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; public final class Var extends ARef implements IFn, IRef, Settable{ +static class TBox{ + +volatile Object val; +final Thread thread; + +public TBox(Thread t, Object val){ + this.thread = t; + this.val = val; +} +} static class Frame{ - //Var->Box + //Var->TBox Associative bindings; //Var->val - Associative frameBindings; +// Associative frameBindings; Frame prev; public Frame(){ - this(PersistentHashMap.EMPTY, PersistentHashMap.EMPTY, null); + this(PersistentHashMap.EMPTY, null); } - public Frame(Associative frameBindings, Associative bindings, Frame prev){ - this.frameBindings = frameBindings; + public Frame(Associative bindings, Frame prev){ +// this.frameBindings = frameBindings; this.bindings = bindings; this.prev = prev; } @@ -52,12 +62,23 @@ static Keyword nsKey = Keyword.intern(null, "ns"); //static Keyword tagKey = Keyword.intern(null, "tag"); volatile Object root; -transient final AtomicInteger count; +transient final AtomicBoolean threadBound; public final Symbol sym; public final Namespace ns; //IPersistentMap _meta; +public static Object getThreadBindingFrame(){ + Frame f = dvals.get(); + if(f != null) + return f; + return new Frame(); +} + +public static void resetThreadBindingFrame(Object frame){ + dvals.set((Frame) frame); +} + public static Var intern(Namespace ns, Symbol sym, Object root){ return intern(ns, sym, root, true); } @@ -113,7 +134,7 @@ public static Var create(Object root){ Var(Namespace ns, Symbol sym){ this.ns = ns; this.sym = sym; - this.count = new AtomicInteger(); + this.threadBound = new AtomicBoolean(false); this.root = dvals; //use dvals as magic not-bound value setMeta(PersistentHashMap.EMPTY); } @@ -124,17 +145,17 @@ Var(Namespace ns, Symbol sym, Object root){ } public boolean isBound(){ - return hasRoot() || (count.get() > 0 && dvals.get().bindings.containsKey(this)); + return hasRoot() || (threadBound.get() && dvals.get().bindings.containsKey(this)); } final public Object get(){ - if(count.get() == 0 && root != dvals) + if(!threadBound.get() && root != dvals) return root; return deref(); } final public Object deref(){ - Box b = getThreadBinding(); + TBox b = getThreadBinding(); if(b != null) return b.val; if(hasRoot()) @@ -155,15 +176,13 @@ public Object alter(IFn fn, ISeq args) throws Exception{ public Object set(Object val){ validate(getValidator(), val); - Box b = getThreadBinding(); + TBox b = getThreadBinding(); if(b != null) + { + if(Thread.currentThread() != b.thread) + throw new IllegalStateException(String.format("Can't set!: %s from non-binding thread", sym)); return (b.val = val); - //jury still out on this -// if(hasRoot()) -// { -// bindRoot(val); -// return val; -// } + } throw new IllegalStateException(String.format("Can't change/establish root binding of: %s with set", sym)); } @@ -285,36 +304,19 @@ public static void pushThreadBindings(Associative bindings){ IMapEntry e = (IMapEntry) bs.first(); Var v = (Var) e.key(); v.validate(v.getValidator(), e.val()); - v.count.incrementAndGet(); - bmap = bmap.assoc(v, new Box(e.val())); + v.threadBound.set(true); + bmap = bmap.assoc(v, new TBox(Thread.currentThread(), e.val())); } - dvals.set(new Frame(bindings, bmap, f)); + dvals.set(new Frame(bmap, f)); } public static void popThreadBindings(){ Frame f = dvals.get(); if(f.prev == null) throw new IllegalStateException("Pop without matching push"); - for(ISeq bs = RT.keys(f.frameBindings); bs != null; bs = bs.next()) - { - Var v = (Var) bs.first(); - v.count.decrementAndGet(); - } dvals.set(f.prev); } -public static void releaseThreadBindings(){ - Frame f = dvals.get(); - if(f.prev == null) - throw new IllegalStateException("Release without full unwind"); - for(ISeq bs = RT.keys(f.bindings); bs != null; bs = bs.next()) - { - Var v = (Var) bs.first(); - v.count.decrementAndGet(); - } - dvals.set(null); -} - public static Associative getThreadBindings(){ Frame f = dvals.get(); IPersistentMap ret = PersistentHashMap.EMPTY; @@ -322,18 +324,18 @@ public static Associative getThreadBindings(){ { IMapEntry e = (IMapEntry) bs.first(); Var v = (Var) e.key(); - Box b = (Box) e.val(); + TBox b = (TBox) e.val(); ret = ret.assoc(v, b.val); } return ret; } -public final Box getThreadBinding(){ - if(count.get() > 0) +public final TBox getThreadBinding(){ + if(threadBound.get()) { IMapEntry e = dvals.get().bindings.entryAt(this); if(e != null) - return (Box) e.val(); + return (TBox) e.val(); } return null; } |