summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java44
-rw-r--r--src/jvm/clojure/lang/Ref.java32
2 files changed, 49 insertions, 27 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index b9ea39c0..fe0bbd3c 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -203,16 +203,22 @@ Object run(IFn fn) throws Exception{
for(Map.Entry<Ref, Object> e : vals.entrySet())
{
Ref ref = e.getKey();
-// ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null);
- if(ref.tvals != null)
- ref.tvals.prior = null;
- ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
- //auto-trim
-// for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior)
-// {
-// if(tv.msecs <= msecs)
-// tv.prior = null;
-// }
+ if(ref.tvals == null)
+ {
+ ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs);
+ }
+ else if(ref.faults.get() > 0)
+ {
+ ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
+ ref.faults.set(0);
+ }
+ else
+ {
+ ref.tvals = ref.tvals.next;
+ ref.tvals.val = e.getValue();
+ ref.tvals.point = commitPoint;
+ ref.tvals.msecs = msecs;
+ }
}
done = true;
info.status.set(COMMITTED);
@@ -239,8 +245,7 @@ Object run(IFn fn) throws Exception{
Object doGet(Ref ref) throws Exception{
- if(//true ||
- info.running())
+ if(info.running())
{
if(vals.containsKey(ref))
return vals.get(ref);
@@ -249,17 +254,19 @@ Object doGet(Ref ref) throws Exception{
ref.lock.readLock().lock();
if(ref.tvals == null)
throw new IllegalStateException(ref.toString() + " is unbound.");
- for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior)
+ Ref.TVal ver = ref.tvals;
+ do
{
if(ver.point <= readPoint)
return ver.val;
- }
+ } while((ver = ver.prior) != ref.tvals);
}
finally
{
ref.lock.readLock().unlock();
}
//no version of val precedes the read point
+ ref.faults.incrementAndGet();
throw retryex;
}
else
@@ -268,8 +275,7 @@ Object doGet(Ref ref) throws Exception{
}
Object doSet(Ref ref, Object val) throws Exception{
- if(//true ||
- info.running())
+ if(info.running())
{
if(commutes.containsKey(ref))
throw new IllegalStateException("Can't set after commute");
@@ -286,16 +292,14 @@ Object doSet(Ref ref, Object val) throws Exception{
}
void doTouch(Ref ref) throws Exception{
- if(//true ||
- info.running())
+ if(info.running())
lock(ref);
else
throw retryex;
}
Object doCommute(Ref ref, IFn fn) throws Exception{
- if(//true ||
- info.running())
+ if(info.running())
{
if(!vals.containsKey(ref))
{
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index cd89a7b3..6c1f9bcd 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -13,6 +13,7 @@
package clojure.lang;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Ref implements IFn, Comparable<Ref>{
@@ -26,16 +27,28 @@ public int compareTo(Ref o){
}
public static class TVal{
- final Object val;
- final long point;
+ Object val;
+ long point;
long msecs;
TVal prior;
+ TVal next;
TVal(Object val, long point, long msecs, TVal prior){
this.val = val;
this.point = point;
+ this.msecs = msecs;
this.prior = prior;
+ this.next = prior.next;
+ this.prior.next = this;
+ this.next.prior = this;
+ }
+
+ TVal(Object val, long point, long msecs){
+ this.val = val;
+ this.point = point;
this.msecs = msecs;
+ this.next = this;
+ this.prior = this;
}
}
@@ -43,6 +56,9 @@ public static class TVal{
final static AtomicLong ids = new AtomicLong();
TVal tvals;
+
+AtomicInteger faults;
+
transient volatile InheritableThreadLocal<Binding> dvals;
final ReentrantReadWriteLock lock;
LockingTransaction.Info tinfo;
@@ -52,13 +68,14 @@ public Ref(){
this.tvals = null;
this.dvals = null;
this.tinfo = null;
+ faults = new AtomicInteger();
lock = new ReentrantReadWriteLock();
id = ids.getAndIncrement();
}
public Ref(Object initVal){
this();
- tvals = new TVal(initVal, 0, System.currentTimeMillis(), null);
+ tvals = new TVal(initVal, 0, System.currentTimeMillis());
}
//ok out of transaction
@@ -150,15 +167,15 @@ boolean isBound(){
}
}
+
void trimHistory(){
- long ctp = Transaction.completedThroughPoint();
try
{
lock.writeLock().lock();
- for(TVal tv = tvals; tv != null; tv = tv.prior)
+ if(tvals != null)
{
- if(tv.point <= ctp)
- tv.prior = null;
+ tvals.next = tvals;
+ tvals.prior = tvals;
}
}
finally
@@ -167,6 +184,7 @@ void trimHistory(){
}
}
+
final public IFn fn(){
return (IFn) currentVal();
}