diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cli/runtime/ThreadLocalData.cs | 9 | ||||
-rw-r--r-- | src/cli/runtime/Transaction.cs | 44 | ||||
-rw-r--r-- | src/jvm/clojure/lang/ThreadLocalData.java | 8 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 306 |
4 files changed, 198 insertions, 169 deletions
diff --git a/src/cli/runtime/ThreadLocalData.cs b/src/cli/runtime/ThreadLocalData.cs index 61baa210..8d866bda 100644 --- a/src/cli/runtime/ThreadLocalData.cs +++ b/src/cli/runtime/ThreadLocalData.cs @@ -18,8 +18,6 @@ namespace clojure.lang public class ThreadLocalData{
[ThreadStatic]
-private static Transaction transaction;
-[ThreadStatic]
private static Object[] values;
static public Object[] getValues(){
@@ -30,13 +28,6 @@ static public void setValues(Object[] vals) { values = vals;
}
-static public Transaction getTransaction() {
- return transaction;
-}
-
-static public void setTransaction(Transaction t){
- transaction = t;
-}
} diff --git a/src/cli/runtime/Transaction.cs b/src/cli/runtime/Transaction.cs index a38fc13f..b761b9e2 100644 --- a/src/cli/runtime/Transaction.cs +++ b/src/cli/runtime/Transaction.cs @@ -21,6 +21,23 @@ public class Transaction{ public const int COMMITTED = 0;
public const int WORKING = 1;
static readonly Object lockObj = new Object();
+[ThreadStatic]
+private static Transaction transaction;
+
+static volatile int tcount = 0;
+
+static Transaction getTransaction()
+ {
+ if(tcount == 0)
+ return null;
+ return transaction;
+ }
+
+static void setTransaction(Transaction t)
+ {
+ transaction = t;
+ }
+
volatile static int nextSeq = 1;
@@ -42,6 +59,7 @@ internal Info(int seq,int status){ }
}
+static Info bigbang = new Info(0,COMMITTED);
Info info;
int startSeq;
@@ -51,42 +69,48 @@ Dictionary<TRef,ISeq> commutates; static public Object runInTransaction(ThreadLocalData tld,IFn fn) {
- if(ThreadLocalData.getTransaction() != null)
+ if(getTransaction() != null)
return fn.invoke(tld);
Transaction t = new Transaction();
- ThreadLocalData.setTransaction(t);
- try{
+ setTransaction(t);
+ Interlocked.Increment(ref tcount);
+ try
+ {
return t.run(fn);
}
finally{
- ThreadLocalData.setTransaction(null);
+ setTransaction(null);
+ Interlocked.Decrement(ref tcount);
}
}
static public TRef tref(Object val) {
- Transaction trans = ThreadLocalData.getTransaction();
+ Transaction trans = getTransaction();
TRef tref = new TRef();
- trans.doSet(tref, val);
+ if(trans == null)
+ tref.push(val,bigbang);
+ else
+ trans.doSet(tref, val);
return tref;
}
static public Object get(TRef tref) {
- Transaction trans = ThreadLocalData.getTransaction();
+ Transaction trans = getTransaction();
if(trans != null)
return trans.doGet(tref);
return getCurrent(tref).val;
}
static public Object set(TRef tref, Object val) {
- return ThreadLocalData.getTransaction().doSet(tref,val);
+ return getTransaction().doSet(tref,val);
}
static public void touch(TRef tref) {
- ThreadLocalData.getTransaction().doTouch(tref);
+ getTransaction().doTouch(tref);
}
static public void commutate(TRef tref, IFn fn) {
- ThreadLocalData.getTransaction().doCommutate(tref, fn);
+ getTransaction().doCommutate(tref, fn);
}
diff --git a/src/jvm/clojure/lang/ThreadLocalData.java b/src/jvm/clojure/lang/ThreadLocalData.java index b0f3d19e..d4b76c47 100644 --- a/src/jvm/clojure/lang/ThreadLocalData.java +++ b/src/jvm/clojure/lang/ThreadLocalData.java @@ -14,7 +14,6 @@ package clojure.lang; public class ThreadLocalData{ -private static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); private static ThreadLocal<Object[]> values = new ThreadLocal<Object[]>(); static public Object[] getValues(){ @@ -25,14 +24,7 @@ static public void setValues(Object[] vals) { values.set(vals); } -static public Transaction getTransaction() { - return transaction.get(); -} - -static public void setTransaction(Transaction t){ - transaction.set(t); -} private static ThreadLocal<Integer> tltest = new ThreadLocal<Integer>(); diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 3cfc38e1..0aae1e1b 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -13,19 +13,33 @@ package clojure.lang; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; public class Transaction{ public static final int COMMITTED = 0; public static final int WORKING = 1; static final Object lock = new Object(); +private static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); + +static AtomicInteger tcount = new AtomicInteger(0); + +static Transaction getTransaction() { + if(tcount.get() == 0) + return null; + return transaction.get(); +} + +static void setTransaction(Transaction t){ + transaction.set(t); +} volatile static int nextSeq = 1; static int getNextSeq(){ - synchronized(lock){ - return nextSeq++; - } + synchronized(lock){ + return nextSeq++; + } } public static class Info{ @@ -34,11 +48,13 @@ int status; Info(int seq,int status){ - this.seq = seq; - this.status = status; + this.seq = seq; + this.status = status; } } +static Info bigbang = new Info(0,COMMITTED); + Info info; int startSeq; @@ -48,192 +64,198 @@ IdentityHashMap<TRef,ISeq> commutates; static public Object runInTransaction(IFn fn) throws Exception{ - if(ThreadLocalData.getTransaction() != null) - return fn.invoke(); + if(getTransaction() != null) + return fn.invoke(); + Transaction t = new Transaction(); - ThreadLocalData.setTransaction(t); - try{ - return t.run(fn); - } - finally{ - ThreadLocalData.setTransaction(null); - } + setTransaction(t); + tcount.incrementAndGet(); + try{ + return t.run(fn); + } + finally{ + tcount.decrementAndGet(); + setTransaction(null); + } } static public TRef tref(Object val) throws Exception{ - Transaction trans = ThreadLocalData.getTransaction(); - TRef tref = new TRef(); - trans.doSet(tref, val); - return tref; + Transaction trans = getTransaction(); + TRef tref = new TRef(); + if(trans == null) + tref.push(val,bigbang); + else + trans.doSet(tref, val); + return tref; } //* static public Object get(TRef tref) throws Exception{ - Transaction trans = ThreadLocalData.getTransaction(); + Transaction trans = getTransaction(); if(trans != null) return trans.doGet(tref); return getCurrent(tref).val; } static public Object set(TRef tref, Object val) throws Exception{ - return ThreadLocalData.getTransaction().doSet(tref,val); + return getTransaction().doSet(tref,val); } static public void touch(TRef tref) throws Exception{ - ThreadLocalData.getTransaction().doTouch(tref); + getTransaction().doTouch(tref); } static public void commutate(TRef tref, IFn fn) throws Exception{ - ThreadLocalData.getTransaction().doCommutate(tref, fn); + getTransaction().doCommutate(tref, fn); } //*/ Object run(IFn fn) throws Exception{ - boolean done = false; - Object ret = null; - ArrayList<TRef> locks = null; - ArrayList<TRef> locked = null; - - loop: - while(!done){ - try - { - ret = fn.invoke(); - if(locks == null && (sets != null || commutates != null)) - locks = new ArrayList<TRef>(); - if(sets != null) - locks.addAll(sets.keySet()); - if(commutates != null) - locks.addAll(commutates.keySet()); - if(locks != null) - { - if(locked == null) - locked = new ArrayList<TRef>(locks.size()); - //lock in order, to avoid deadlocks - Collections.sort(locks); - for(TRef tref : locks) - { - //will block here - tref.lock.lock(); - locked.add(tref); - if(sets.containsKey(tref)) - { - //try again if the thing we are trying to set has changed since we started - TVal curr = getCurrent(tref); - if(curr != null && curr.tinfo.seq > startSeq) - continue loop; - } - } - } - - //at this point all write targets are locked - //turn commutates into sets - for(Map.Entry<TRef, ISeq> e : commutates.entrySet()) - { - TRef tref = e.getKey(); - //note this will npe if tref has never been set, as designed - Object val = getCurrent(tref).val; - for(ISeq c = e.getValue();c!=null;c = c.rest()) - { - IFn f = (IFn) c.first(); - val = f.invoke(val); - } - sets.put(tref, val); - } - - //set the new vals - for(Map.Entry<TRef, Object> entry : sets.entrySet()) - { - TRef tref = entry.getKey(); - tref.push(entry.getValue(), info); - } - - //atomic commit - synchronized(lock){ - info.seq = getNextSeq(); - info.status = COMMITTED; - } - - done = true; - } - finally{ - if(locked != null) - { - for(TRef tref : locked) - { - tref.lock.unlock(); - } - locked.clear(); - } - reset(); - if(locks != null) - locks.clear(); - } - } - return ret; + boolean done = false; + Object ret = null; + ArrayList<TRef> locks = null; + ArrayList<TRef> locked = null; + + loop: + while(!done){ + try + { + ret = fn.invoke(); + if(locks == null && (sets != null || commutates != null)) + locks = new ArrayList<TRef>(); + if(sets != null) + locks.addAll(sets.keySet()); + if(commutates != null) + locks.addAll(commutates.keySet()); + if(locks != null) + { + if(locked == null) + locked = new ArrayList<TRef>(locks.size()); + //lock in order, to avoid deadlocks + Collections.sort(locks); + for(TRef tref : locks) + { + //will block here + tref.lock.lock(); + locked.add(tref); + if(sets.containsKey(tref)) + { + //try again if the thing we are trying to set has changed since we started + TVal curr = getCurrent(tref); + if(curr != null && curr.tinfo.seq > startSeq) + continue loop; + } + } + } + + //at this point all write targets are locked + //turn commutates into sets + for(Map.Entry<TRef, ISeq> e : commutates.entrySet()) + { + TRef tref = e.getKey(); + //note this will npe if tref has never been set, as designed + Object val = getCurrent(tref).val; + for(ISeq c = e.getValue();c!=null;c = c.rest()) + { + IFn f = (IFn) c.first(); + val = f.invoke(val); + } + sets.put(tref, val); + } + + //set the new vals + for(Map.Entry<TRef, Object> entry : sets.entrySet()) + { + TRef tref = entry.getKey(); + tref.push(entry.getValue(), info); + } + + //atomic commit + synchronized(lock){ + info.seq = getNextSeq(); + info.status = COMMITTED; + } + + done = true; + } + finally{ + if(locked != null) + { + for(TRef tref : locked) + { + tref.lock.unlock(); + } + locked.clear(); + } + reset(); + if(locks != null) + locks.clear(); + } + } + return ret; } private void reset(){ - if(sets != null) - sets.clear(); - if(commutates != null) - commutates.clear(); + if(sets != null) + sets.clear(); + if(commutates != null) + commutates.clear(); } Transaction(){ - synchronized(lock){ - int seq = getNextSeq(); - this.info = new Info(seq, WORKING); - this.startSeq = seq; - } + synchronized(lock){ + int seq = getNextSeq(); + this.info = new Info(seq, WORKING); + this.startSeq = seq; + } } Object doGet(TRef tref) throws Exception{ - if(sets != null && sets.containsKey(tref)) - return sets.get(tref); + if(sets != null && sets.containsKey(tref)) + return sets.get(tref); for(TVal ver = tref;ver != null;ver = ver.prior) - { - //note this will npe if tref has never been set, as designed - if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq) - return ver.val; - } + { + //note this will npe if tref has never been set, as designed + if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq) + return ver.val; + } - throw new Exception("Version not found"); + throw new Exception("Version not found"); } static TVal getCurrent(TRef tref) throws Exception{ for(TVal ver = tref;ver != null;ver = ver.prior) - { - if(ver.tinfo != null && ver.tinfo.status == COMMITTED) - return ver; - } - //this return only if no value was ever successfully set - return null; + { + if(ver.tinfo != null && ver.tinfo.status == COMMITTED) + return ver; + } + //this return only if no value was ever successfully set + return null; } Object doSet(TRef tref, Object val) throws Exception{ - if(sets == null) - sets = new IdentityHashMap<TRef,Object>(); - if(commutates != null && commutates.containsKey(tref)) - throw new Exception("Can't commutate and set a TRef in the same transaction"); + if(sets == null) + sets = new IdentityHashMap<TRef,Object>(); + if(commutates != null && commutates.containsKey(tref)) + throw new Exception("Can't commutate and set a TRef in the same transaction"); - sets.put(tref,val); - return val; - } + sets.put(tref,val); + return val; + } void doTouch(TRef tref) throws Exception{ - doSet(tref, doGet(tref)); - } + doSet(tref, doGet(tref)); + } void doCommutate(TRef tref, IFn fn) throws Exception{ - if(commutates == null) - commutates = new IdentityHashMap<TRef,ISeq>(); - if(sets != null && sets.containsKey(tref)) - throw new Exception("Can't commutate and set a TRef in the same transaction"); - commutates.put(tref, RT.cons(fn, commutates.get(tref))); - } + if(commutates == null) + commutates = new IdentityHashMap<TRef,ISeq>(); + if(sets != null && sets.containsKey(tref)) + throw new Exception("Can't commutate and set a TRef in the same transaction"); + commutates.put(tref, RT.cons(fn, commutates.get(tref))); + } } |