diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-07-03 12:55:40 -0400 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-07-03 12:55:40 -0400 |
commit | 961743446562b6fa7be25f96de02aacd626169da (patch) | |
tree | e887b9dc45822747755e81268eb385fd44ae15b7 | |
parent | c4a5cd208aef54ae5b292fa41c4880017315e553 (diff) |
support overlapping ensures with no retry, refs #143ensure
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 98 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 2 |
2 files changed, 80 insertions, 20 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index 4976d169..68d94640 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -16,6 +16,7 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; @SuppressWarnings({"SynchronizeOnNonFinalField"}) public class LockingTransaction{ @@ -104,13 +105,32 @@ final HashMap<Ref, Object> vals = new HashMap<Ref, Object>(); final HashSet<Ref> sets = new HashSet<Ref>(); final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>(); +final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock + + +void tryWriteLock(Ref ref){ + try + { + if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS)) + throw retryex; + } + catch(InterruptedException e) + { + throw retryex; + } +} //returns the most recent val Object lock(Ref ref){ - boolean unlocked = false; + //can't upgrade readLock, so release it + releaseIfEnsured(ref); + + boolean unlocked = true; try { - ref.lock.writeLock().lock(); + tryWriteLock(ref); + unlocked = false; + if(ref.tvals != null && ref.tvals.point > readPoint) throw retryex; Info refinfo = ref.tinfo; @@ -122,9 +142,23 @@ Object lock(Ref ref){ { ref.lock.writeLock().unlock(); unlocked = true; - //stop prior to blocking - stop(RETRY); - synchronized(refinfo) + return blockAndBail(refinfo); + } + } + ref.tinfo = info; + return ref.tvals == null ? null : ref.tvals.val; + } + finally + { + if(!unlocked) + ref.lock.writeLock().unlock(); + } +} + +private Object blockAndBail(Info refinfo){ +//stop prior to blocking + stop(RETRY); + synchronized(refinfo) { if(refinfo.running()) { @@ -133,20 +167,18 @@ Object lock(Ref ref){ refinfo.wait(LOCK_WAIT_MSECS); } catch(InterruptedException e) - { - } - } - } - throw retryex; + { } } - ref.tinfo = info; - return ref.tvals == null ? null : ref.tvals.val; } - finally + throw retryex; +} + +private void releaseIfEnsured(Ref ref){ + if(ensures.contains(ref)) { - if(!unlocked) - ref.lock.writeLock().unlock(); + ensures.remove(ref); + ref.lock.readLock().unlock(); } } @@ -240,8 +272,14 @@ Object run(Callable fn) throws Exception{ for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet()) { Ref ref = e.getKey(); - ref.lock.writeLock().lock(); + boolean wasEnsured = ensures.contains(ref); + //can't upgrade readLock, so release it + releaseIfEnsured(ref); + tryWriteLock(ref); locked.add(ref); + if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint) + throw retryex; + Info refinfo = ref.tinfo; if(refinfo != null && refinfo != info && refinfo.running()) { @@ -260,7 +298,7 @@ Object run(Callable fn) throws Exception{ { if(!commutes.containsKey(ref)) { - ref.lock.writeLock().lock(); + tryWriteLock(ref); locked.add(ref); } } @@ -319,6 +357,11 @@ Object run(Callable fn) throws Exception{ locked.get(k).lock.writeLock().unlock(); } locked.clear(); + for(Ref r : ensures) + { + r.lock.readLock().unlock(); + } + ensures.clear(); stop(done ? COMMITTED : RETRY); try { @@ -391,10 +434,27 @@ Object doSet(Ref ref, Object val){ return val; } -void doTouch(Ref ref){ +void doEnsure(Ref ref){ if(!info.running()) throw retryex; - lock(ref); + if(ensures.contains(ref)) + return; + ref.lock.readLock().lock(); + + Info refinfo = ref.tinfo; + + //writer exists + if(refinfo != null && refinfo.running()) + { + ref.lock.readLock().unlock(); + + if(refinfo != info) //not us, ensure is doomed + { + blockAndBail(refinfo); + } + } + else + ensures.add(ref); } Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{ diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index fef7c439..a4626acb 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -175,7 +175,7 @@ public Object alter(IFn fn, ISeq args) throws Exception{ } public void touch(){ - LockingTransaction.getEx().doTouch(this); + LockingTransaction.getEx().doEnsure(this); } //*/ |