diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-29 18:31:28 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-29 18:31:28 +0000 |
commit | e9aa32282a2b428d12b6409bf4c84ebe7423d16f (patch) | |
tree | 427d96512cd67524a0a451448786e10040b3aa90 | |
parent | 5c719b2f91f723c78d0f066f6ddd7fa558ff8bcc (diff) |
older trans wins on lock conflict
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 245 |
1 files changed, 118 insertions, 127 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 3faf1b0a..21074951 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -16,14 +16,16 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +@SuppressWarnings({"SynchronizeOnNonFinalField"}) public class LockingTransaction{ public static int RETRY_LIMIT = 1000; public static int LOCK_WAIT_MSECS = 100; static final int RUNNING = 0; -static final int COMMITTED = 1; +static final int COMMITTING = 1; static final int RETRY = 2; static final int KILLED = 3; +static final int COMMITTED = 4; final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>(); @@ -35,54 +37,28 @@ static class AbortException extends Exception{ public static class Info{ final AtomicInteger status; - final long point; + final long startPoint; - public Info(int status, long point){ + public Info(int status, long startPoint){ this.status = new AtomicInteger(status); - this.point = point; + this.startPoint = startPoint; } public boolean running(){ - return status.get() == RUNNING; + int s = status.get(); + return s == RUNNING || s == COMMITTING; } } //total order on transactions //transactions will consume a point for init, for each retry, and on commit if writing final private static AtomicInteger lastPoint = new AtomicInteger(); -//final static PriorityQueue<Long> points = new PriorityQueue<Long>(); long getCommitPoint(){ return lastPoint.incrementAndGet(); -// synchronized(points) -// { -// points.remove(readPoint); -// completedPriorPoint = completedThroughPoint(); -// ++lastPoint; -// return lastPoint; -// } } -/* -static long completedThroughPoint(){ - synchronized(points) - { - Long p = points.peek(); - if(p != null) - return p - 1; - return lastPoint; - } -} - -void relinquishReadPoint(){ - synchronized(points) - { - points.remove(readPoint); - } -} -*/ - void stop(int status){ if(info != null) { @@ -110,24 +86,28 @@ TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>(); void getReadPoint(){ readPoint = lastPoint.incrementAndGet(); -// synchronized(points) -// { -// ++lastPoint; -// points.add(lastPoint); -// readPoint = lastPoint; -// } } //returns the most recent val -Object lock(Ref ref, boolean ensurePoint) throws Exception{ +Object lock(Ref ref) throws Exception{ boolean unlocked = false; try { ref.lock.writeLock().lock(); Info refinfo = ref.tinfo; + //write lock conflict? if(refinfo != null && refinfo != info && refinfo.running()) { + //if this transaction is older, and could continue if it got the lock, + // try to abort the other + if(info.startPoint < refinfo.startPoint + && (ref.tvals == null || ref.tvals.point <= readPoint) + && refinfo.status.compareAndSet(RUNNING, KILLED)) + { + ref.tinfo = info; + return ref.tvals == null ? null : ref.tvals.val; + } ref.lock.writeLock().unlock(); unlocked = true; //stop prior to blocking @@ -137,12 +117,11 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{ if(refinfo.running()) refinfo.wait(LOCK_WAIT_MSECS); } - throw retryex;//new RetryException(); + throw retryex; } - if(ensurePoint && ref.tvals != null && ref.tvals.point > readPoint) + if(ref.tvals != null && ref.tvals.point > readPoint) { -// stop(); - throw retryex;//throw new RetryException(); + throw retryex; } ref.tinfo = info; return ref.tvals == null ? null : ref.tvals.val; @@ -192,56 +171,54 @@ Object run(IFn fn) throws Exception{ startPoint = readPoint; info = new Info(RUNNING, startPoint); ret = fn.invoke(); - synchronized(info) + //make sure no one has killed us before this point, and can't now + if(info.status.compareAndSet(RUNNING, COMMITTING)) { - if(info.status.get() == RUNNING) + for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) { - for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) + Ref ref = e.getKey(); + ref.lock.writeLock().lock(); + locked.add(ref); + Info refinfo = ref.tinfo; + if(refinfo != null && refinfo != info && refinfo.running()) + throw retryex;//new RetryException(); + Object val = ref.tvals == null ? null : ref.tvals.val; + if(!sets.contains(ref)) + vals.put(ref, val); + for(IFn f : e.getValue()) { - Ref ref = e.getKey(); - ref.lock.writeLock().lock(); - locked.add(ref); - Info refinfo = ref.tinfo; - if(refinfo != null && refinfo != info && refinfo.running()) - throw retryex;//new RetryException(); - Object val = ref.tvals == null ? null : ref.tvals.val; - if(!sets.contains(ref)) - vals.put(ref, val); - for(IFn f : e.getValue()) - { - vals.put(ref, f.invoke(vals.get(ref))); - } + vals.put(ref, f.invoke(vals.get(ref))); } - for(Ref ref : sets) + } + for(Ref ref : sets) + { + if(!commutes.containsKey(ref)) { - if(!commutes.containsKey(ref)) - { - ref.lock.writeLock().lock(); - locked.add(ref); - } + ref.lock.writeLock().lock(); + locked.add(ref); } + } - //at this point, all values calced, all refs to be written locked - //no more client code to be called - long commitPoint = getCommitPoint(); - long msecs = System.currentTimeMillis(); - for(Map.Entry<Ref, Object> e : vals.entrySet()) - { - Ref ref = e.getKey(); + //at this point, all values calced, all refs to be written locked + //no more client code to be called + long commitPoint = getCommitPoint(); + long msecs = System.currentTimeMillis(); + for(Map.Entry<Ref, Object> e : vals.entrySet()) + { + Ref ref = e.getKey(); // ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null); - if(ref.tvals != null) - ref.tvals.prior = null; - ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); - //ref.tstatus = null; - //auto-trim + if(ref.tvals != null) + ref.tvals.prior = null; + ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); + //auto-trim // for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior) // { // if(tv.msecs <= msecs) // tv.prior = null; // } - } - done = true; } + done = true; + info.status.set(COMMITTED); } } catch(RetryException retry) @@ -256,8 +233,6 @@ Object run(IFn fn) throws Exception{ } locked.clear(); stop(done ? COMMITTED : RETRY); -// if(!done) -// relinquishReadPoint(); } } if(!done) @@ -267,65 +242,84 @@ Object run(IFn fn) throws Exception{ Object doGet(Ref ref) throws Exception{ - if(vals.containsKey(ref)) - return vals.get(ref); - try + if(info.running()) { - ref.lock.readLock().lock(); - if(ref.tvals == null) - throw new IllegalStateException(ref.toString() + " is unbound."); - for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) + if(vals.containsKey(ref)) + return vals.get(ref); + try + { + ref.lock.readLock().lock(); + if(ref.tvals == null) + throw new IllegalStateException(ref.toString() + " is unbound."); + for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) + { + if(ver.point <= readPoint) + return ver.val; + } + } + finally { - if(ver.point <= readPoint) - return ver.val; + ref.lock.readLock().unlock(); } + //no version of val precedes the read point + throw retryex;//new RetryException(); } - finally - { - ref.lock.readLock().unlock(); - } - //no version of val precedes the read point - throw retryex;//new RetryException(); + else + throw retryex;//new RetryException(); + } Object doSet(Ref ref, Object val) throws Exception{ - if(commutes.containsKey(ref)) - throw new IllegalStateException("Can't set after commute"); - if(!sets.contains(ref)) + if(info.running()) { - sets.add(ref); - lock(ref, true); + if(commutes.containsKey(ref)) + throw new IllegalStateException("Can't set after commute"); + if(!sets.contains(ref)) + { + sets.add(ref); + lock(ref); + } + vals.put(ref, val); + return val; } - vals.put(ref, val); - return val; + else + throw retryex;//new RetryException(); } void doTouch(Ref ref) throws Exception{ - lock(ref, true); + if(info.running()) + lock(ref); + else + throw retryex;//new RetryException(); } Object doCommute(Ref ref, IFn fn) throws Exception{ - if(!vals.containsKey(ref)) + if(info.running()) { - Object val = null; - try - { - ref.lock.readLock().lock(); - val = ref.tvals == null ? null : ref.tvals.val; - } - finally + if(!vals.containsKey(ref)) { - ref.lock.readLock().unlock(); + Object val = null; + try + { + ref.lock.readLock().lock(); + val = ref.tvals == null ? null : ref.tvals.val; + } + finally + { + ref.lock.readLock().unlock(); + } + vals.put(ref, val); } - vals.put(ref, val); + ArrayList<IFn> fns = commutes.get(ref); + if(fns == null) + commutes.put(ref, fns = new ArrayList<IFn>()); + fns.add(fn); + Object ret = fn.invoke(vals.get(ref)); + vals.put(ref, ret); + return ret; } - ArrayList<IFn> fns = commutes.get(ref); - if(fns == null) - commutes.put(ref, fns = new ArrayList<IFn>()); - fns.add(fn); - Object ret = fn.invoke(vals.get(ref)); - vals.put(ref, ret); - return ret; + else + throw retryex;//new RetryException(); } @@ -381,8 +375,7 @@ public static void main(String[] args){ { long start = System.nanoTime(); LockingTransaction.runInTransaction(this); - long dur = System.nanoTime() - start; - nanos += dur; + nanos += System.nanoTime() - start; } return nanos; } @@ -417,8 +410,7 @@ public static void main(String[] args){ { long start = System.nanoTime(); LockingTransaction.runInTransaction(this); - long dur = System.nanoTime() - start; - nanos += dur; + nanos += System.nanoTime() - start; } return nanos; } @@ -467,8 +459,7 @@ public static void main(String[] args){ e.shutdown(); for(Future<Long> result : results) { - Future<Long> res = (Future<Long>) result; - System.out.printf("%d, ", res.get() / 1000000); + System.out.printf("%d, ", result.get() / 1000000); } System.out.println(); System.out.println("waiting for other instances..."); |