summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-07-29 18:31:28 +0000
committerRich Hickey <richhickey@gmail.com>2007-07-29 18:31:28 +0000
commite9aa32282a2b428d12b6409bf4c84ebe7423d16f (patch)
tree427d96512cd67524a0a451448786e10040b3aa90 /src
parent5c719b2f91f723c78d0f066f6ddd7fa558ff8bcc (diff)
older trans wins on lock conflict
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java245
1 files changed, 118 insertions, 127 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index 3faf1b0a..21074951 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -16,14 +16,16 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+@SuppressWarnings({"SynchronizeOnNonFinalField"})
public class LockingTransaction{
public static int RETRY_LIMIT = 1000;
public static int LOCK_WAIT_MSECS = 100;
static final int RUNNING = 0;
-static final int COMMITTED = 1;
+static final int COMMITTING = 1;
static final int RETRY = 2;
static final int KILLED = 3;
+static final int COMMITTED = 4;
final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();
@@ -35,54 +37,28 @@ static class AbortException extends Exception{
public static class Info{
final AtomicInteger status;
- final long point;
+ final long startPoint;
- public Info(int status, long point){
+ public Info(int status, long startPoint){
this.status = new AtomicInteger(status);
- this.point = point;
+ this.startPoint = startPoint;
}
public boolean running(){
- return status.get() == RUNNING;
+ int s = status.get();
+ return s == RUNNING || s == COMMITTING;
}
}
//total order on transactions
//transactions will consume a point for init, for each retry, and on commit if writing
final private static AtomicInteger lastPoint = new AtomicInteger();
-//final static PriorityQueue<Long> points = new PriorityQueue<Long>();
long getCommitPoint(){
return lastPoint.incrementAndGet();
-// synchronized(points)
-// {
-// points.remove(readPoint);
-// completedPriorPoint = completedThroughPoint();
-// ++lastPoint;
-// return lastPoint;
-// }
}
-/*
-static long completedThroughPoint(){
- synchronized(points)
- {
- Long p = points.peek();
- if(p != null)
- return p - 1;
- return lastPoint;
- }
-}
-
-void relinquishReadPoint(){
- synchronized(points)
- {
- points.remove(readPoint);
- }
-}
-*/
-
void stop(int status){
if(info != null)
{
@@ -110,24 +86,28 @@ TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>();
void getReadPoint(){
readPoint = lastPoint.incrementAndGet();
-// synchronized(points)
-// {
-// ++lastPoint;
-// points.add(lastPoint);
-// readPoint = lastPoint;
-// }
}
//returns the most recent val
-Object lock(Ref ref, boolean ensurePoint) throws Exception{
+Object lock(Ref ref) throws Exception{
boolean unlocked = false;
try
{
ref.lock.writeLock().lock();
Info refinfo = ref.tinfo;
+ //write lock conflict?
if(refinfo != null && refinfo != info && refinfo.running())
{
+ //if this transaction is older, and could continue if it got the lock,
+ // try to abort the other
+ if(info.startPoint < refinfo.startPoint
+ && (ref.tvals == null || ref.tvals.point <= readPoint)
+ && refinfo.status.compareAndSet(RUNNING, KILLED))
+ {
+ ref.tinfo = info;
+ return ref.tvals == null ? null : ref.tvals.val;
+ }
ref.lock.writeLock().unlock();
unlocked = true;
//stop prior to blocking
@@ -137,12 +117,11 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{
if(refinfo.running())
refinfo.wait(LOCK_WAIT_MSECS);
}
- throw retryex;//new RetryException();
+ throw retryex;
}
- if(ensurePoint && ref.tvals != null && ref.tvals.point > readPoint)
+ if(ref.tvals != null && ref.tvals.point > readPoint)
{
-// stop();
- throw retryex;//throw new RetryException();
+ throw retryex;
}
ref.tinfo = info;
return ref.tvals == null ? null : ref.tvals.val;
@@ -192,56 +171,54 @@ Object run(IFn fn) throws Exception{
startPoint = readPoint;
info = new Info(RUNNING, startPoint);
ret = fn.invoke();
- synchronized(info)
+ //make sure no one has killed us before this point, and can't now
+ if(info.status.compareAndSet(RUNNING, COMMITTING))
{
- if(info.status.get() == RUNNING)
+ for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet())
{
- for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet())
+ Ref ref = e.getKey();
+ ref.lock.writeLock().lock();
+ locked.add(ref);
+ Info refinfo = ref.tinfo;
+ if(refinfo != null && refinfo != info && refinfo.running())
+ throw retryex;//new RetryException();
+ Object val = ref.tvals == null ? null : ref.tvals.val;
+ if(!sets.contains(ref))
+ vals.put(ref, val);
+ for(IFn f : e.getValue())
{
- Ref ref = e.getKey();
- ref.lock.writeLock().lock();
- locked.add(ref);
- Info refinfo = ref.tinfo;
- if(refinfo != null && refinfo != info && refinfo.running())
- throw retryex;//new RetryException();
- Object val = ref.tvals == null ? null : ref.tvals.val;
- if(!sets.contains(ref))
- vals.put(ref, val);
- for(IFn f : e.getValue())
- {
- vals.put(ref, f.invoke(vals.get(ref)));
- }
+ vals.put(ref, f.invoke(vals.get(ref)));
}
- for(Ref ref : sets)
+ }
+ for(Ref ref : sets)
+ {
+ if(!commutes.containsKey(ref))
{
- if(!commutes.containsKey(ref))
- {
- ref.lock.writeLock().lock();
- locked.add(ref);
- }
+ ref.lock.writeLock().lock();
+ locked.add(ref);
}
+ }
- //at this point, all values calced, all refs to be written locked
- //no more client code to be called
- long commitPoint = getCommitPoint();
- long msecs = System.currentTimeMillis();
- for(Map.Entry<Ref, Object> e : vals.entrySet())
- {
- Ref ref = e.getKey();
+ //at this point, all values calced, all refs to be written locked
+ //no more client code to be called
+ long commitPoint = getCommitPoint();
+ long msecs = System.currentTimeMillis();
+ for(Map.Entry<Ref, Object> e : vals.entrySet())
+ {
+ Ref ref = e.getKey();
// ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null);
- if(ref.tvals != null)
- ref.tvals.prior = null;
- ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
- //ref.tstatus = null;
- //auto-trim
+ if(ref.tvals != null)
+ ref.tvals.prior = null;
+ ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
+ //auto-trim
// for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior)
// {
// if(tv.msecs <= msecs)
// tv.prior = null;
// }
- }
- done = true;
}
+ done = true;
+ info.status.set(COMMITTED);
}
}
catch(RetryException retry)
@@ -256,8 +233,6 @@ Object run(IFn fn) throws Exception{
}
locked.clear();
stop(done ? COMMITTED : RETRY);
-// if(!done)
-// relinquishReadPoint();
}
}
if(!done)
@@ -267,65 +242,84 @@ Object run(IFn fn) throws Exception{
Object doGet(Ref ref) throws Exception{
- if(vals.containsKey(ref))
- return vals.get(ref);
- try
+ if(info.running())
{
- ref.lock.readLock().lock();
- if(ref.tvals == null)
- throw new IllegalStateException(ref.toString() + " is unbound.");
- for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior)
+ if(vals.containsKey(ref))
+ return vals.get(ref);
+ try
+ {
+ ref.lock.readLock().lock();
+ if(ref.tvals == null)
+ throw new IllegalStateException(ref.toString() + " is unbound.");
+ for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior)
+ {
+ if(ver.point <= readPoint)
+ return ver.val;
+ }
+ }
+ finally
{
- if(ver.point <= readPoint)
- return ver.val;
+ ref.lock.readLock().unlock();
}
+ //no version of val precedes the read point
+ throw retryex;//new RetryException();
}
- finally
- {
- ref.lock.readLock().unlock();
- }
- //no version of val precedes the read point
- throw retryex;//new RetryException();
+ else
+ throw retryex;//new RetryException();
+
}
Object doSet(Ref ref, Object val) throws Exception{
- if(commutes.containsKey(ref))
- throw new IllegalStateException("Can't set after commute");
- if(!sets.contains(ref))
+ if(info.running())
{
- sets.add(ref);
- lock(ref, true);
+ if(commutes.containsKey(ref))
+ throw new IllegalStateException("Can't set after commute");
+ if(!sets.contains(ref))
+ {
+ sets.add(ref);
+ lock(ref);
+ }
+ vals.put(ref, val);
+ return val;
}
- vals.put(ref, val);
- return val;
+ else
+ throw retryex;//new RetryException();
}
void doTouch(Ref ref) throws Exception{
- lock(ref, true);
+ if(info.running())
+ lock(ref);
+ else
+ throw retryex;//new RetryException();
}
Object doCommute(Ref ref, IFn fn) throws Exception{
- if(!vals.containsKey(ref))
+ if(info.running())
{
- Object val = null;
- try
- {
- ref.lock.readLock().lock();
- val = ref.tvals == null ? null : ref.tvals.val;
- }
- finally
+ if(!vals.containsKey(ref))
{
- ref.lock.readLock().unlock();
+ Object val = null;
+ try
+ {
+ ref.lock.readLock().lock();
+ val = ref.tvals == null ? null : ref.tvals.val;
+ }
+ finally
+ {
+ ref.lock.readLock().unlock();
+ }
+ vals.put(ref, val);
}
- vals.put(ref, val);
+ ArrayList<IFn> fns = commutes.get(ref);
+ if(fns == null)
+ commutes.put(ref, fns = new ArrayList<IFn>());
+ fns.add(fn);
+ Object ret = fn.invoke(vals.get(ref));
+ vals.put(ref, ret);
+ return ret;
}
- ArrayList<IFn> fns = commutes.get(ref);
- if(fns == null)
- commutes.put(ref, fns = new ArrayList<IFn>());
- fns.add(fn);
- Object ret = fn.invoke(vals.get(ref));
- vals.put(ref, ret);
- return ret;
+ else
+ throw retryex;//new RetryException();
}
@@ -381,8 +375,7 @@ public static void main(String[] args){
{
long start = System.nanoTime();
LockingTransaction.runInTransaction(this);
- long dur = System.nanoTime() - start;
- nanos += dur;
+ nanos += System.nanoTime() - start;
}
return nanos;
}
@@ -417,8 +410,7 @@ public static void main(String[] args){
{
long start = System.nanoTime();
LockingTransaction.runInTransaction(this);
- long dur = System.nanoTime() - start;
- nanos += dur;
+ nanos += System.nanoTime() - start;
}
return nanos;
}
@@ -467,8 +459,7 @@ public static void main(String[] args){
e.shutdown();
for(Future<Long> result : results)
{
- Future<Long> res = (Future<Long>) result;
- System.out.printf("%d, ", res.get() / 1000000);
+ System.out.printf("%d, ", result.get() / 1000000);
}
System.out.println();
System.out.println("waiting for other instances...");