summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-07-30 15:27:32 +0000
committerRich Hickey <richhickey@gmail.com>2007-07-30 15:27:32 +0000
commitbd6e6a16c32f15713a351263a07c384a19c2d21d (patch)
tree23159b394179c64ffcdcbad6a86828a8165ab49b /src
parent031971d1d80e1e2f6fbc50ac6a6afe81fcbc2e27 (diff)
added barging to commute locking
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java159
1 files changed, 83 insertions, 76 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 1f87fb44..093ad2d6 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -21,6 +21,9 @@ public class LockingTransaction{
public static int RETRY_LIMIT = 1000;
public static int LOCK_WAIT_MSECS = 100;
+public static long BARGE_WAIT_NANOS = 100 * 1000000;
+//public static int COMMUTE_RETRY_LIMIT = 10;
+
static final int RUNNING = 0;
static final int COMMITTING = 1;
static final int RETRY = 2;
@@ -78,6 +81,7 @@ void stop(int status){
Info info;
long readPoint;
long startPoint;
+long startTime;
final RetryException retryex = new RetryException();
final HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
final HashSet<Ref> sets = new HashSet<Ref>();
@@ -100,19 +104,7 @@ Object lock(Ref ref) throws Exception{
//write lock conflict
if(refinfo != null && refinfo != info && refinfo.running())
{
- boolean barged = false;
- //if this transaction is older
- // try to abort the other
- if(info.startPoint < refinfo.startPoint)
- {
- synchronized(refinfo)
- {
- barged = refinfo.status.compareAndSet(RUNNING, KILLED);
- if(barged)
- refinfo.notifyAll();
- }
- }
- if(!barged)
+ if(!barge(refinfo))
{
ref.lock.writeLock().unlock();
unlocked = true;
@@ -141,6 +133,25 @@ void abort() throws AbortException{
throw new AbortException();
}
+private boolean bargeTimeElapsed(){
+ return System.nanoTime() - startTime > BARGE_WAIT_NANOS;
+}
+
+private boolean barge(Info refinfo){
+ boolean barged = false;
+ //if this transaction is older
+ // try to abort the other
+ if(bargeTimeElapsed() && startPoint < refinfo.startPoint)
+ {
+ synchronized(refinfo)
+ {
+ barged = refinfo.status.compareAndSet(RUNNING, KILLED);
+ if(barged)
+ refinfo.notifyAll();
+ }
+ }
+ return barged;
+}
static LockingTransaction getEx() throws Exception{
LockingTransaction t = transaction.get();
@@ -171,10 +182,13 @@ Object run(IFn fn) throws Exception{
{
getReadPoint();
if(i == 0)
+ {
startPoint = readPoint;
+ startTime = System.nanoTime();
+ }
info = new Info(RUNNING, startPoint);
ret = fn.invoke();
- //make sure no one has killed us before this point, and can't now
+ //make sure no one has killed us before this point, and can't from now on
if(info.status.compareAndSet(RUNNING, COMMITTING))
{
for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet())
@@ -184,7 +198,10 @@ Object run(IFn fn) throws Exception{
locked.add(ref);
Info refinfo = ref.tinfo;
if(refinfo != null && refinfo != info && refinfo.running())
- throw retryex;//new RetryException();
+ {
+ if(!barge(refinfo))
+ throw retryex;
+ }
Object val = ref.tvals == null ? null : ref.tvals.val;
if(!sets.contains(ref))
vals.put(ref, val);
@@ -204,8 +221,8 @@ Object run(IFn fn) throws Exception{
//at this point, all values calced, all refs to be written locked
//no more client code to be called
- long commitPoint = getCommitPoint();
long msecs = System.currentTimeMillis();
+ long commitPoint = getCommitPoint();
for(Map.Entry<Ref, Object> e : vals.entrySet())
{
Ref ref = e.getKey();
@@ -251,86 +268,76 @@ Object run(IFn fn) throws Exception{
Object doGet(Ref ref) throws Exception{
- if(info.running())
+ if(!info.running())
+ throw retryex;
+ if(vals.containsKey(ref))
+ return vals.get(ref);
+ try
{
- if(vals.containsKey(ref))
- return vals.get(ref);
- try
+ ref.lock.readLock().lock();
+ if(ref.tvals == null)
+ throw new IllegalStateException(ref.toString() + " is unbound.");
+ Ref.TVal ver = ref.tvals;
+ do
{
- ref.lock.readLock().lock();
- if(ref.tvals == null)
- throw new IllegalStateException(ref.toString() + " is unbound.");
- Ref.TVal ver = ref.tvals;
- do
- {
- if(ver.point <= readPoint)
- return ver.val;
- } while((ver = ver.prior) != ref.tvals);
- }
- finally
- {
- ref.lock.readLock().unlock();
- }
- //no version of val precedes the read point
- ref.faults.incrementAndGet();
- throw retryex;
+ if(ver.point <= readPoint)
+ return ver.val;
+ } while((ver = ver.prior) != ref.tvals);
}
- else
- throw retryex;
+ finally
+ {
+ ref.lock.readLock().unlock();
+ }
+ //no version of val precedes the read point
+ ref.faults.incrementAndGet();
+ throw retryex;
}
Object doSet(Ref ref, Object val) throws Exception{
- if(info.running())
+ if(!info.running())
+ throw retryex;
+ if(commutes.containsKey(ref))
+ throw new IllegalStateException("Can't set after commute");
+ if(!sets.contains(ref))
{
- if(commutes.containsKey(ref))
- throw new IllegalStateException("Can't set after commute");
- if(!sets.contains(ref))
- {
- sets.add(ref);
- lock(ref);
- }
- vals.put(ref, val);
- return val;
+ sets.add(ref);
+ lock(ref);
}
- else
- throw retryex;
+ vals.put(ref, val);
+ return val;
}
void doTouch(Ref ref) throws Exception{
- if(info.running())
- lock(ref);
- else
+ if(!info.running())
throw retryex;
+ lock(ref);
}
Object doCommute(Ref ref, IFn fn) throws Exception{
- if(info.running())
+ if(!info.running())
+ throw retryex;
+ if(!vals.containsKey(ref))
{
- if(!vals.containsKey(ref))
+ Object val = null;
+ try
{
- Object val = null;
- try
- {
- ref.lock.readLock().lock();
- val = ref.tvals == null ? null : ref.tvals.val;
- }
- finally
- {
- ref.lock.readLock().unlock();
- }
- vals.put(ref, val);
+ ref.lock.readLock().lock();
+ val = ref.tvals == null ? null : ref.tvals.val;
}
- ArrayList<IFn> fns = commutes.get(ref);
- if(fns == null)
- commutes.put(ref, fns = new ArrayList<IFn>());
- fns.add(fn);
- Object ret = fn.invoke(vals.get(ref));
- vals.put(ref, ret);
- return ret;
+ finally
+ {
+ ref.lock.readLock().unlock();
+ }
+ vals.put(ref, val);
}
- else
- throw retryex;
+ ArrayList<IFn> fns = commutes.get(ref);
+ if(fns == null)
+ commutes.put(ref, fns = new ArrayList<IFn>());
+ fns.add(fn);
+ Object ret = fn.invoke(vals.get(ref));
+ vals.put(ref, ret);
+ return ret;
}