summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-07-03 12:55:40 -0400
committerRich Hickey <richhickey@gmail.com>2009-07-03 12:55:40 -0400
commit961743446562b6fa7be25f96de02aacd626169da (patch)
treee887b9dc45822747755e81268eb385fd44ae15b7
parentc4a5cd208aef54ae5b292fa41c4880017315e553 (diff)
support overlapping ensures with no retry, refs #143ensure
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java98
-rw-r--r--src/jvm/clojure/lang/Ref.java2
2 files changed, 80 insertions, 20 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 4976d169..68d94640 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -16,6 +16,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings({"SynchronizeOnNonFinalField"})
public class LockingTransaction{
@@ -104,13 +105,32 @@ final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
final HashSet<Ref> sets = new HashSet<Ref>();
final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>();
+final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock
+
+
+void tryWriteLock(Ref ref){
+ try
+ {
+ if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))
+ throw retryex;
+ }
+ catch(InterruptedException e)
+ {
+ throw retryex;
+ }
+}
//returns the most recent val
Object lock(Ref ref){
- boolean unlocked = false;
+ //can't upgrade readLock, so release it
+ releaseIfEnsured(ref);
+
+ boolean unlocked = true;
try
{
- ref.lock.writeLock().lock();
+ tryWriteLock(ref);
+ unlocked = false;
+
if(ref.tvals != null && ref.tvals.point > readPoint)
throw retryex;
Info refinfo = ref.tinfo;
@@ -122,9 +142,23 @@ Object lock(Ref ref){
{
ref.lock.writeLock().unlock();
unlocked = true;
- //stop prior to blocking
- stop(RETRY);
- synchronized(refinfo)
+ return blockAndBail(refinfo);
+ }
+ }
+ ref.tinfo = info;
+ return ref.tvals == null ? null : ref.tvals.val;
+ }
+ finally
+ {
+ if(!unlocked)
+ ref.lock.writeLock().unlock();
+ }
+}
+
+private Object blockAndBail(Info refinfo){
+//stop prior to blocking
+ stop(RETRY);
+ synchronized(refinfo)
{
if(refinfo.running())
{
@@ -133,20 +167,18 @@ Object lock(Ref ref){
refinfo.wait(LOCK_WAIT_MSECS);
}
catch(InterruptedException e)
- {
- }
- }
- }
- throw retryex;
+ {
}
}
- ref.tinfo = info;
- return ref.tvals == null ? null : ref.tvals.val;
}
- finally
+ throw retryex;
+}
+
+private void releaseIfEnsured(Ref ref){
+ if(ensures.contains(ref))
{
- if(!unlocked)
- ref.lock.writeLock().unlock();
+ ensures.remove(ref);
+ ref.lock.readLock().unlock();
}
}
@@ -240,8 +272,14 @@ Object run(Callable fn) throws Exception{
for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
{
Ref ref = e.getKey();
- ref.lock.writeLock().lock();
+ boolean wasEnsured = ensures.contains(ref);
+ //can't upgrade readLock, so release it
+ releaseIfEnsured(ref);
+ tryWriteLock(ref);
locked.add(ref);
+ if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
+ throw retryex;
+
Info refinfo = ref.tinfo;
if(refinfo != null && refinfo != info && refinfo.running())
{
@@ -260,7 +298,7 @@ Object run(Callable fn) throws Exception{
{
if(!commutes.containsKey(ref))
{
- ref.lock.writeLock().lock();
+ tryWriteLock(ref);
locked.add(ref);
}
}
@@ -319,6 +357,11 @@ Object run(Callable fn) throws Exception{
locked.get(k).lock.writeLock().unlock();
}
locked.clear();
+ for(Ref r : ensures)
+ {
+ r.lock.readLock().unlock();
+ }
+ ensures.clear();
stop(done ? COMMITTED : RETRY);
try
{
@@ -391,10 +434,27 @@ Object doSet(Ref ref, Object val){
return val;
}
-void doTouch(Ref ref){
+void doEnsure(Ref ref){
if(!info.running())
throw retryex;
- lock(ref);
+ if(ensures.contains(ref))
+ return;
+ ref.lock.readLock().lock();
+
+ Info refinfo = ref.tinfo;
+
+ //writer exists
+ if(refinfo != null && refinfo.running())
+ {
+ ref.lock.readLock().unlock();
+
+ if(refinfo != info) //not us, ensure is doomed
+ {
+ blockAndBail(refinfo);
+ }
+ }
+ else
+ ensures.add(ref);
}
Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index fef7c439..a4626acb 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -175,7 +175,7 @@ public Object alter(IFn fn, ISeq args) throws Exception{
}
public void touch(){
- LockingTransaction.getEx().doTouch(this);
+ LockingTransaction.getEx().doEnsure(this);
}
//*/