summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-08-03 10:32:14 -0400
committerRich Hickey <richhickey@gmail.com>2009-08-03 10:32:14 -0400
commite456322f0ae2bd16704887db0c0d42f93406b35d (patch)
tree63b1601ee31e62d54fa26c75644a5b5c943488a9 /src
parent309161d5d20ae9af2907cf9bc07a8647398e0289 (diff)
parent961743446562b6fa7be25f96de02aacd626169da (diff)
merge ensure
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java108
-rw-r--r--src/jvm/clojure/lang/Ref.java2
2 files changed, 86 insertions, 24 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index a067987c..68d94640 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -16,6 +16,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
@SuppressWarnings({"SynchronizeOnNonFinalField"})
public class LockingTransaction{
@@ -104,13 +105,32 @@ final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
final HashSet<Ref> sets = new HashSet<Ref>();
final TreeMap<Ref, ArrayList<CFn>> commutes = new TreeMap<Ref, ArrayList<CFn>>();
+final HashSet<Ref> ensures = new HashSet<Ref>(); //all hold readLock
+
+
+void tryWriteLock(Ref ref){
+ try
+ {
+ if(!ref.lock.writeLock().tryLock(LOCK_WAIT_MSECS, TimeUnit.MILLISECONDS))
+ throw retryex;
+ }
+ catch(InterruptedException e)
+ {
+ throw retryex;
+ }
+}
//returns the most recent val
Object lock(Ref ref){
- boolean unlocked = false;
+ //can't upgrade readLock, so release it
+ releaseIfEnsured(ref);
+
+ boolean unlocked = true;
try
{
- ref.lock.writeLock().lock();
+ tryWriteLock(ref);
+ unlocked = false;
+
if(ref.tvals != null && ref.tvals.point > readPoint)
throw retryex;
Info refinfo = ref.tinfo;
@@ -122,9 +142,23 @@ Object lock(Ref ref){
{
ref.lock.writeLock().unlock();
unlocked = true;
- //stop prior to blocking
- stop(RETRY);
- synchronized(refinfo)
+ return blockAndBail(refinfo);
+ }
+ }
+ ref.tinfo = info;
+ return ref.tvals == null ? null : ref.tvals.val;
+ }
+ finally
+ {
+ if(!unlocked)
+ ref.lock.writeLock().unlock();
+ }
+}
+
+private Object blockAndBail(Info refinfo){
+//stop prior to blocking
+ stop(RETRY);
+ synchronized(refinfo)
{
if(refinfo.running())
{
@@ -133,20 +167,18 @@ Object lock(Ref ref){
refinfo.wait(LOCK_WAIT_MSECS);
}
catch(InterruptedException e)
- {
- }
- }
- }
- throw retryex;
+ {
}
}
- ref.tinfo = info;
- return ref.tvals == null ? null : ref.tvals.val;
}
- finally
+ throw retryex;
+}
+
+private void releaseIfEnsured(Ref ref){
+ if(ensures.contains(ref))
{
- if(!unlocked)
- ref.lock.writeLock().unlock();
+ ensures.remove(ref);
+ ref.lock.readLock().unlock();
}
}
@@ -240,10 +272,14 @@ Object run(Callable fn) throws Exception{
for(Map.Entry<Ref, ArrayList<CFn>> e : commutes.entrySet())
{
Ref ref = e.getKey();
- if(sets.contains(ref)) continue;
-
- ref.lock.writeLock().lock();
+ boolean wasEnsured = ensures.contains(ref);
+ //can't upgrade readLock, so release it
+ releaseIfEnsured(ref);
+ tryWriteLock(ref);
locked.add(ref);
+ if(wasEnsured && ref.tvals != null && ref.tvals.point > readPoint)
+ throw retryex;
+
Info refinfo = ref.tinfo;
if(refinfo != null && refinfo != info && refinfo.running())
{
@@ -251,7 +287,8 @@ Object run(Callable fn) throws Exception{
throw retryex;
}
Object val = ref.tvals == null ? null : ref.tvals.val;
- vals.put(ref, val);
+ if(!sets.contains(ref))
+ vals.put(ref, val);
for(CFn f : e.getValue())
{
vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));
@@ -259,8 +296,11 @@ Object run(Callable fn) throws Exception{
}
for(Ref ref : sets)
{
- ref.lock.writeLock().lock();
- locked.add(ref);
+ if(!commutes.containsKey(ref))
+ {
+ tryWriteLock(ref);
+ locked.add(ref);
+ }
}
//validate and enqueue notifications
@@ -317,6 +357,11 @@ Object run(Callable fn) throws Exception{
locked.get(k).lock.writeLock().unlock();
}
locked.clear();
+ for(Ref r : ensures)
+ {
+ r.lock.readLock().unlock();
+ }
+ ensures.clear();
stop(done ? COMMITTED : RETRY);
try
{
@@ -389,10 +434,27 @@ Object doSet(Ref ref, Object val){
return val;
}
-void doTouch(Ref ref){
+void doEnsure(Ref ref){
if(!info.running())
throw retryex;
- lock(ref);
+ if(ensures.contains(ref))
+ return;
+ ref.lock.readLock().lock();
+
+ Info refinfo = ref.tinfo;
+
+ //writer exists
+ if(refinfo != null && refinfo.running())
+ {
+ ref.lock.readLock().unlock();
+
+ if(refinfo != info) //not us, ensure is doomed
+ {
+ blockAndBail(refinfo);
+ }
+ }
+ else
+ ensures.add(ref);
}
Object doCommute(Ref ref, IFn fn, ISeq args) throws Exception{
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index fef7c439..a4626acb 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -175,7 +175,7 @@ public Object alter(IFn fn, ISeq args) throws Exception{
}
public void touch(){
- LockingTransaction.getEx().doTouch(this);
+ LockingTransaction.getEx().doEnsure(this);
}
//*/