diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-30 15:27:32 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-30 15:27:32 +0000 |
commit | bd6e6a16c32f15713a351263a07c384a19c2d21d (patch) | |
tree | 23159b394179c64ffcdcbad6a86828a8165ab49b /src | |
parent | 031971d1d80e1e2f6fbc50ac6a6afe81fcbc2e27 (diff) |
added barging to commute locking
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 159 |
1 files changed, 83 insertions, 76 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 1f87fb44..093ad2d6 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -21,6 +21,9 @@ public class LockingTransaction{ public static int RETRY_LIMIT = 1000; public static int LOCK_WAIT_MSECS = 100; +public static long BARGE_WAIT_NANOS = 100 * 1000000; +//public static int COMMUTE_RETRY_LIMIT = 10; + static final int RUNNING = 0; static final int COMMITTING = 1; static final int RETRY = 2; @@ -78,6 +81,7 @@ void stop(int status){ Info info; long readPoint; long startPoint; +long startTime; final RetryException retryex = new RetryException(); final HashMap<Ref, Object> vals = new HashMap<Ref, Object>(); final HashSet<Ref> sets = new HashSet<Ref>(); @@ -100,19 +104,7 @@ Object lock(Ref ref) throws Exception{ //write lock conflict if(refinfo != null && refinfo != info && refinfo.running()) { - boolean barged = false; - //if this transaction is older - // try to abort the other - if(info.startPoint < refinfo.startPoint) - { - synchronized(refinfo) - { - barged = refinfo.status.compareAndSet(RUNNING, KILLED); - if(barged) - refinfo.notifyAll(); - } - } - if(!barged) + if(!barge(refinfo)) { ref.lock.writeLock().unlock(); unlocked = true; @@ -141,6 +133,25 @@ void abort() throws AbortException{ throw new AbortException(); } +private boolean bargeTimeElapsed(){ + return System.nanoTime() - startTime > BARGE_WAIT_NANOS; +} + +private boolean barge(Info refinfo){ + boolean barged = false; + //if this transaction is older + // try to abort the other + if(bargeTimeElapsed() && startPoint < refinfo.startPoint) + { + synchronized(refinfo) + { + barged = refinfo.status.compareAndSet(RUNNING, KILLED); + if(barged) + refinfo.notifyAll(); + } + } + return barged; +} static LockingTransaction getEx() throws Exception{ LockingTransaction t = transaction.get(); @@ -171,10 +182,13 @@ Object run(IFn fn) throws Exception{ { getReadPoint(); if(i == 0) + { startPoint = readPoint; + startTime = System.nanoTime(); + } info = new Info(RUNNING, startPoint); ret = fn.invoke(); - //make sure no one has killed us before this point, and can't now + //make sure no one has killed us before this point, and can't from now on if(info.status.compareAndSet(RUNNING, COMMITTING)) { for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) @@ -184,7 +198,10 @@ Object run(IFn fn) throws Exception{ locked.add(ref); Info refinfo = ref.tinfo; if(refinfo != null && refinfo != info && refinfo.running()) - throw retryex;//new RetryException(); + { + if(!barge(refinfo)) + throw retryex; + } Object val = ref.tvals == null ? null : ref.tvals.val; if(!sets.contains(ref)) vals.put(ref, val); @@ -204,8 +221,8 @@ Object run(IFn fn) throws Exception{ //at this point, all values calced, all refs to be written locked //no more client code to be called - long commitPoint = getCommitPoint(); long msecs = System.currentTimeMillis(); + long commitPoint = getCommitPoint(); for(Map.Entry<Ref, Object> e : vals.entrySet()) { Ref ref = e.getKey(); @@ -251,86 +268,76 @@ Object run(IFn fn) throws Exception{ Object doGet(Ref ref) throws Exception{ - if(info.running()) + if(!info.running()) + throw retryex; + if(vals.containsKey(ref)) + return vals.get(ref); + try { - if(vals.containsKey(ref)) - return vals.get(ref); - try + ref.lock.readLock().lock(); + if(ref.tvals == null) + throw new IllegalStateException(ref.toString() + " is unbound."); + Ref.TVal ver = ref.tvals; + do { - ref.lock.readLock().lock(); - if(ref.tvals == null) - throw new IllegalStateException(ref.toString() + " is unbound."); - Ref.TVal ver = ref.tvals; - do - { - if(ver.point <= readPoint) - return ver.val; - } while((ver = ver.prior) != ref.tvals); - } - finally - { - ref.lock.readLock().unlock(); - } - //no version of val precedes the read point - ref.faults.incrementAndGet(); - throw retryex; + if(ver.point <= readPoint) + return ver.val; + } while((ver = ver.prior) != ref.tvals); } - else - throw retryex; + finally + { + ref.lock.readLock().unlock(); + } + //no version of val precedes the read point + ref.faults.incrementAndGet(); + throw retryex; } Object doSet(Ref ref, Object val) throws Exception{ - if(info.running()) + if(!info.running()) + throw retryex; + if(commutes.containsKey(ref)) + throw new IllegalStateException("Can't set after commute"); + if(!sets.contains(ref)) { - if(commutes.containsKey(ref)) - throw new IllegalStateException("Can't set after commute"); - if(!sets.contains(ref)) - { - sets.add(ref); - lock(ref); - } - vals.put(ref, val); - return val; + sets.add(ref); + lock(ref); } - else - throw retryex; + vals.put(ref, val); + return val; } void doTouch(Ref ref) throws Exception{ - if(info.running()) - lock(ref); - else + if(!info.running()) throw retryex; + lock(ref); } Object doCommute(Ref ref, IFn fn) throws Exception{ - if(info.running()) + if(!info.running()) + throw retryex; + if(!vals.containsKey(ref)) { - if(!vals.containsKey(ref)) + Object val = null; + try { - Object val = null; - try - { - ref.lock.readLock().lock(); - val = ref.tvals == null ? null : ref.tvals.val; - } - finally - { - ref.lock.readLock().unlock(); - } - vals.put(ref, val); + ref.lock.readLock().lock(); + val = ref.tvals == null ? null : ref.tvals.val; } - ArrayList<IFn> fns = commutes.get(ref); - if(fns == null) - commutes.put(ref, fns = new ArrayList<IFn>()); - fns.add(fn); - Object ret = fn.invoke(vals.get(ref)); - vals.put(ref, ret); - return ret; + finally + { + ref.lock.readLock().unlock(); + } + vals.put(ref, val); } - else - throw retryex; + ArrayList<IFn> fns = commutes.get(ref); + if(fns == null) + commutes.put(ref, fns = new ArrayList<IFn>()); + fns.add(fn); + Object ret = fn.invoke(vals.get(ref)); + vals.put(ref, ret); + return ret; } |