diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-29 14:25:48 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-29 14:25:48 +0000 |
commit | 5c719b2f91f723c78d0f066f6ddd7fa558ff8bcc (patch) | |
tree | 19b13d2b0a7812b8daa040ef60380a4cd5a65a1e /src | |
parent | 4294c94279eecee549d4969e194a40b5ea50649f (diff) |
lock based STM
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 210 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 9 |
2 files changed, 124 insertions, 95 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index a0ae1c88..3faf1b0a 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -14,11 +14,16 @@ package clojure.lang; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; public class LockingTransaction{ public static int RETRY_LIMIT = 1000; public static int LOCK_WAIT_MSECS = 100; +static final int RUNNING = 0; +static final int COMMITTED = 1; +static final int RETRY = 2; +static final int KILLED = 3; final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>(); @@ -28,22 +33,38 @@ static class RetryException extends Exception{ static class AbortException extends Exception{ } +public static class Info{ + final AtomicInteger status; + final long point; + + + public Info(int status, long point){ + this.status = new AtomicInteger(status); + this.point = point; + } + + public boolean running(){ + return status.get() == RUNNING; + } +} //total order on transactions //transactions will consume a point for init, for each retry, and on commit if writing -private static long lastPoint; -final static PriorityQueue<Long> points = new PriorityQueue<Long>(); +final private static AtomicInteger lastPoint = new AtomicInteger(); +//final static PriorityQueue<Long> points = new PriorityQueue<Long>(); long getCommitPoint(){ - synchronized(points) - { - points.remove(readPoint); - completedPriorPoint = completedThroughPoint(); - ++lastPoint; - return lastPoint; - } + return lastPoint.incrementAndGet(); +// synchronized(points) +// { +// points.remove(readPoint); +// completedPriorPoint = completedThroughPoint(); +// ++lastPoint; +// return lastPoint; +// } } +/* static long completedThroughPoint(){ synchronized(points) { @@ -60,15 +81,17 @@ void relinquishReadPoint(){ points.remove(readPoint); } } +*/ -void stop(){ - if(tstatus.running) +void stop(int status){ + if(info != null) { - synchronized(tstatus) + synchronized(info) { - tstatus.running = false; - tstatus.notifyAll(); + info.status.set(status); + info.notifyAll(); } + info = null; vals.clear(); sets.clear(); commutes.clear(); @@ -76,20 +99,23 @@ void stop(){ } -volatile Ref.TStatus tstatus; -long completedPriorPoint; +Info info; +//long completedPriorPoint; long readPoint; +long startPoint; +RetryException retryex = new RetryException(); HashMap<Ref, Object> vals = new HashMap<Ref, Object>(); HashSet<Ref> sets = new HashSet<Ref>(); TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>(); void getReadPoint(){ - synchronized(points) - { - ++lastPoint; - points.add(lastPoint); - readPoint = lastPoint; - } + readPoint = lastPoint.incrementAndGet(); +// synchronized(points) +// { +// ++lastPoint; +// points.add(lastPoint); +// readPoint = lastPoint; +// } } //returns the most recent val @@ -98,26 +124,27 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{ try { ref.lock.writeLock().lock(); - Ref.TStatus status = ref.tstatus; + Info refinfo = ref.tinfo; - if(status != null && status != tstatus && status.running) + if(refinfo != null && refinfo != info && refinfo.running()) { ref.lock.writeLock().unlock(); unlocked = true; - stop(); - synchronized(status) + //stop prior to blocking + stop(RETRY); + synchronized(refinfo) { - if(status.running) - status.wait(LOCK_WAIT_MSECS); + if(refinfo.running()) + refinfo.wait(LOCK_WAIT_MSECS); } - throw new RetryException(); + throw retryex;//new RetryException(); } if(ensurePoint && ref.tvals != null && ref.tvals.point > readPoint) { - stop(); - throw new RetryException(); +// stop(); + throw retryex;//throw new RetryException(); } - ref.tstatus = tstatus; + ref.tinfo = info; return ref.tvals == null ? null : ref.tvals.val; } finally @@ -128,14 +155,14 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{ } void abort() throws AbortException{ - stop(); + stop(KILLED); throw new AbortException(); } static LockingTransaction getEx() throws Exception{ LockingTransaction t = transaction.get(); - if(!t.tstatus.running) + if(t.info == null) throw new Exception("No transaction running"); return t; } @@ -145,7 +172,7 @@ static public Object runInTransaction(IFn fn) throws Exception{ if(t == null) transaction.set(t = new LockingTransaction()); - if(t.tstatus != null && t.tstatus.running) + if(t.info != null) return fn.invoke(); return t.run(fn); @@ -161,51 +188,61 @@ Object run(IFn fn) throws Exception{ try { getReadPoint(); - tstatus = new Ref.TStatus(); + if(i == 0) + startPoint = readPoint; + info = new Info(RUNNING, startPoint); ret = fn.invoke(); - - for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) + synchronized(info) { - Ref ref = e.getKey(); - ref.lock.writeLock().lock(); - locked.add(ref); - Ref.TStatus status = ref.tstatus; - if(status != null && status != tstatus && status.running) - throw new RetryException(); - Object val = ref.tvals == null ? null : ref.tvals.val; - if(!sets.contains(ref)) - vals.put(ref, val); - for(IFn f : e.getValue()) + if(info.status.get() == RUNNING) { - vals.put(ref, f.invoke(vals.get(ref))); - } - } - for(Ref ref : sets) - { - if(!commutes.containsKey(ref)) - { - ref.lock.writeLock().lock(); - locked.add(ref); - } - } - - //at this point, all values calced, all refs to be written locked - //no more client code to be called - long commitPoint = getCommitPoint(); - long msecs = System.currentTimeMillis(); - for(Map.Entry<Ref, Object> e : vals.entrySet()) - { - Ref ref = e.getKey(); - ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); - ref.tstatus = null; - //auto-trim - for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior) - { - if(tv.point <= completedPriorPoint) - tv.prior = null; + for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) + { + Ref ref = e.getKey(); + ref.lock.writeLock().lock(); + locked.add(ref); + Info refinfo = ref.tinfo; + if(refinfo != null && refinfo != info && refinfo.running()) + throw retryex;//new RetryException(); + Object val = ref.tvals == null ? null : ref.tvals.val; + if(!sets.contains(ref)) + vals.put(ref, val); + for(IFn f : e.getValue()) + { + vals.put(ref, f.invoke(vals.get(ref))); + } + } + for(Ref ref : sets) + { + if(!commutes.containsKey(ref)) + { + ref.lock.writeLock().lock(); + locked.add(ref); + } + } + + //at this point, all values calced, all refs to be written locked + //no more client code to be called + long commitPoint = getCommitPoint(); + long msecs = System.currentTimeMillis(); + for(Map.Entry<Ref, Object> e : vals.entrySet()) + { + Ref ref = e.getKey(); +// ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null); + if(ref.tvals != null) + ref.tvals.prior = null; + ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); + //ref.tstatus = null; + //auto-trim +// for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior) +// { +// if(tv.msecs <= msecs) +// tv.prior = null; +// } + } + done = true; } } - done = true; } catch(RetryException retry) { @@ -218,9 +255,9 @@ Object run(IFn fn) throws Exception{ locked.get(k).lock.writeLock().unlock(); } locked.clear(); - stop(); - if(!done) - relinquishReadPoint(); + stop(done ? COMMITTED : RETRY); +// if(!done) +// relinquishReadPoint(); } } if(!done) @@ -235,6 +272,8 @@ Object doGet(Ref ref) throws Exception{ try { ref.lock.readLock().lock(); + if(ref.tvals == null) + throw new IllegalStateException(ref.toString() + " is unbound."); for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) { if(ver.point <= readPoint) @@ -245,8 +284,8 @@ Object doGet(Ref ref) throws Exception{ { ref.lock.readLock().unlock(); } - - throw new IllegalStateException(ref.toString() + " is unbound."); + //no version of val precedes the read point + throw retryex;//new RetryException(); } Object doSet(Ref ref, Object val) throws Exception{ @@ -273,14 +312,6 @@ Object doCommute(Ref ref, IFn fn) throws Exception{ { ref.lock.readLock().lock(); val = ref.tvals == null ? null : ref.tvals.val; -// for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) -// { -// if(ver.point <= readPoint) -// { -// val = ver.val; -// break; -// } -// } } finally { @@ -434,8 +465,9 @@ public static void main(String[] args){ System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, estimatedTime / 1000000); e.shutdown(); - for(Future<Long> res : results) + for(Future<Long> result : results) { + Future<Long> res = (Future<Long>) result; System.out.printf("%d, ", res.get() / 1000000); } System.out.println(); diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index 7e7d29d1..cd89a7b3 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -12,11 +12,11 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Ref implements IFn, Comparable<Ref>{ + public int compareTo(Ref o){ if(o.id == id) return 0; @@ -40,21 +40,18 @@ public static class TVal{ } -public static class TStatus{ - boolean running = true; -} final static AtomicLong ids = new AtomicLong(); TVal tvals; transient volatile InheritableThreadLocal<Binding> dvals; final ReentrantReadWriteLock lock; -TStatus tstatus; +LockingTransaction.Info tinfo; final long id; public Ref(){ this.tvals = null; this.dvals = null; - this.tstatus = null; + this.tinfo = null; lock = new ReentrantReadWriteLock(); id = ids.getAndIncrement(); } |