diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 44 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 32 |
2 files changed, 49 insertions, 27 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java index b9ea39c0..fe0bbd3c 100644 --- a/src/jvm/clojure/lang/LockingTransaction.java +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -203,16 +203,22 @@ Object run(IFn fn) throws Exception{ for(Map.Entry<Ref, Object> e : vals.entrySet()) { Ref ref = e.getKey(); -// ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null); - if(ref.tvals != null) - ref.tvals.prior = null; - ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); - //auto-trim -// for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior) -// { -// if(tv.msecs <= msecs) -// tv.prior = null; -// } + if(ref.tvals == null) + { + ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs); + } + else if(ref.faults.get() > 0) + { + ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); + ref.faults.set(0); + } + else + { + ref.tvals = ref.tvals.next; + ref.tvals.val = e.getValue(); + ref.tvals.point = commitPoint; + ref.tvals.msecs = msecs; + } } done = true; info.status.set(COMMITTED); @@ -239,8 +245,7 @@ Object run(IFn fn) throws Exception{ Object doGet(Ref ref) throws Exception{ - if(//true || - info.running()) + if(info.running()) { if(vals.containsKey(ref)) return vals.get(ref); @@ -249,17 +254,19 @@ Object doGet(Ref ref) throws Exception{ ref.lock.readLock().lock(); if(ref.tvals == null) throw new IllegalStateException(ref.toString() + " is unbound."); - for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) + Ref.TVal ver = ref.tvals; + do { if(ver.point <= readPoint) return ver.val; - } + } while((ver = ver.prior) != ref.tvals); } finally { ref.lock.readLock().unlock(); } //no version of val precedes the read point + ref.faults.incrementAndGet(); throw retryex; } else @@ -268,8 +275,7 @@ Object doGet(Ref ref) throws Exception{ } Object doSet(Ref ref, Object val) throws Exception{ - if(//true || - info.running()) + if(info.running()) { if(commutes.containsKey(ref)) throw new IllegalStateException("Can't set after commute"); @@ -286,16 +292,14 @@ Object doSet(Ref ref, Object val) throws Exception{ } void doTouch(Ref ref) throws Exception{ - if(//true || - info.running()) + if(info.running()) lock(ref); else throw retryex; } Object doCommute(Ref ref, IFn fn) throws Exception{ - if(//true || - info.running()) + if(info.running()) { if(!vals.containsKey(ref)) { diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index cd89a7b3..6c1f9bcd 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -13,6 +13,7 @@ package clojure.lang; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Ref implements IFn, Comparable<Ref>{ @@ -26,16 +27,28 @@ public int compareTo(Ref o){ } public static class TVal{ - final Object val; - final long point; + Object val; + long point; long msecs; TVal prior; + TVal next; TVal(Object val, long point, long msecs, TVal prior){ this.val = val; this.point = point; + this.msecs = msecs; this.prior = prior; + this.next = prior.next; + this.prior.next = this; + this.next.prior = this; + } + + TVal(Object val, long point, long msecs){ + this.val = val; + this.point = point; this.msecs = msecs; + this.next = this; + this.prior = this; } } @@ -43,6 +56,9 @@ public static class TVal{ final static AtomicLong ids = new AtomicLong(); TVal tvals; + +AtomicInteger faults; + transient volatile InheritableThreadLocal<Binding> dvals; final ReentrantReadWriteLock lock; LockingTransaction.Info tinfo; @@ -52,13 +68,14 @@ public Ref(){ this.tvals = null; this.dvals = null; this.tinfo = null; + faults = new AtomicInteger(); lock = new ReentrantReadWriteLock(); id = ids.getAndIncrement(); } public Ref(Object initVal){ this(); - tvals = new TVal(initVal, 0, System.currentTimeMillis(), null); + tvals = new TVal(initVal, 0, System.currentTimeMillis()); } //ok out of transaction @@ -150,15 +167,15 @@ boolean isBound(){ } } + void trimHistory(){ - long ctp = Transaction.completedThroughPoint(); try { lock.writeLock().lock(); - for(TVal tv = tvals; tv != null; tv = tv.prior) + if(tvals != null) { - if(tv.point <= ctp) - tv.prior = null; + tvals.next = tvals; + tvals.prior = tvals; } } finally @@ -167,6 +184,7 @@ void trimHistory(){ } } + final public IFn fn(){ return (IFn) currentVal(); } |