diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-06-24 15:52:17 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-06-24 15:52:17 +0000 |
commit | 32ed38ea8f0dde2b8d3ac5b489510ebd359da979 (patch) | |
tree | b8a18c5aef14669c6afe469bb03874e1a50947d9 | |
parent | 76e5252aaf17760df635115999bc212a860c8b9d (diff) |
interim checkin
-rw-r--r-- | src/jvm/clojure/lang/TObj.java | 5 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TRef.java | 5 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TStamp.java | 10 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 164 |
4 files changed, 139 insertions, 45 deletions
diff --git a/src/jvm/clojure/lang/TObj.java b/src/jvm/clojure/lang/TObj.java index 50c9337e..e4366f1f 100644 --- a/src/jvm/clojure/lang/TObj.java +++ b/src/jvm/clojure/lang/TObj.java @@ -10,9 +10,9 @@ package clojure.lang;
-public class TObj implements IObj{
+abstract public class TObj implements IObj{
TRef _attrs;
-
+ /*
public TObj() throws Exception{
this._attrs = Transaction.tref(PersistentArrayMap.EMPTY);
}
@@ -44,4 +44,5 @@ public void removeAttr(Object key) throws Exception { t = t.without(key);
Transaction.set(_attrs,t);
}
+*/
}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java index 24105f7a..33f05faa 100644 --- a/src/jvm/clojure/lang/TRef.java +++ b/src/jvm/clojure/lang/TRef.java @@ -22,6 +22,11 @@ public TRef() { this.tvals = new AtomicReference<TVal>(); } +public TRef(Object initVal) { + this.tvals = new AtomicReference<TVal>(); + tvals.set(new TVal(initVal, Transaction.ZERO_POINT, null)); +} + public Object getCurrentVal(){ TVal current = getCurrentTVal(); if(current != null) diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java index 88c9e656..ff3607fc 100644 --- a/src/jvm/clojure/lang/TStamp.java +++ b/src/jvm/clojure/lang/TStamp.java @@ -16,13 +16,13 @@ public class TStamp{ public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY} -Status status; -long tpoint; -long msecs; +volatile Status status; +volatile long tpoint; +volatile long msecs; -public TStamp(long tpoint){ - this.status = Status.RUNNING; +public TStamp(long tpoint, Status status){ + this.status = status; this.tpoint = tpoint; } } diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 8a2a6c89..14a9a7ec 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -14,10 +14,14 @@ package clojure.lang; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; public class Transaction{ -public static int RETRY_LIMIT = 100; +public static int RETRY_LIMIT = 1000; public static int LOCK_WAIT_MSECS = 100; final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); @@ -44,10 +48,12 @@ static class PointNode{ this.tpoint = tpoint; this.next = next; } - static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater = - AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next"); + + static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater = + AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next"); } +final static TStamp ZERO_POINT = new TStamp(0, TStamp.Status.COMMITTED); volatile static PointNode completedPoints = new PointNode(0, null); static long completedThroughPoint(){ @@ -57,16 +63,17 @@ static long completedThroughPoint(){ static void relinquish(long tpoint){ PointNode p = completedPoints; //update completedThroughPoint - while(p.next != null && p.next.tpoint == p.tpoint+1) + while(p.next != null && p.next.tpoint == p.tpoint + 1) p = p.next; completedPoints = p; //splice in PointNode n; - do{ - for(n=p.next;n != null && n.tpoint < tpoint;p = n, n = p.next) - ; - }while(!PointNode.nextUpdater.compareAndSet(p,n,new PointNode(tpoint,n))); + do + { + for(n = p.next; n != null && n.tpoint < tpoint; p = n, n = p.next) + ; + } while(!PointNode.nextUpdater.compareAndSet(p, n, new PointNode(tpoint, n))); } static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ @@ -80,22 +87,23 @@ static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ TStamp tstamp; -void lock(TRef tref, boolean ensurePoint) throws Exception{ +TVal lock(TRef tref, boolean ensurePoint) throws Exception{ TVal head = tref.tvals.get(); //already locked by this transaction if(head != null && head.tstamp == tstamp) - return; - boolean locked; + return head; if(head != null && head.tstamp.status == TStamp.Status.RUNNING) { //already locked by another transaction, block a bit + //first drop our locks + statusTransition(tstamp, TStamp.Status.RETRY); synchronized(head.tstamp) { if(head.tstamp.status == TStamp.Status.RUNNING) head.tstamp.wait(LOCK_WAIT_MSECS); } - locked = false; + throw new RetryException(); } else { @@ -104,22 +112,20 @@ void lock(TRef tref, boolean ensurePoint) throws Exception{ prior = head; else //aborted/retried at head, skip over prior = head.prior; - - if(ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint) - locked = false; - else - locked = tref.tvals.compareAndSet(head, new TVal(prior == null ? null : prior.val, tstamp, prior)); - } - - if(!locked) - { - statusTransition(tstamp,TStamp.Status.RETRY); - throw new RetryException(); + TVal ret = null; + if((ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint) + || + !tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior))) + { + statusTransition(tstamp, TStamp.Status.RETRY); + throw new RetryException(); + } + return ret; } } void abort() throws AbortException{ - statusTransition(tstamp,TStamp.Status.ABORTED); + statusTransition(tstamp, TStamp.Status.ABORTED); throw new AbortException(); } @@ -155,17 +161,18 @@ Object run(IFn fn) throws Exception{ { try { - tstamp = new TStamp(getNextPoint()); + tstamp = new TStamp(getNextPoint(), TStamp.Status.RUNNING); ret = fn.invoke(); done = true; //save the read point long readPoint = tstamp.tpoint; //get a commit point and time - tstamp.tpoint = getNextPoint(); - tstamp.msecs = System.currentTimeMillis(); - //commit! - statusTransition(tstamp, TStamp.Status.COMMITTED); - + synchronized(tstamp) { + tstamp.tpoint = getNextPoint(); + tstamp.msecs = System.currentTimeMillis(); + //commit! + statusTransition(tstamp, TStamp.Status.COMMITTED); + } relinquish(readPoint); relinquish(tstamp.tpoint); } @@ -177,7 +184,7 @@ Object run(IFn fn) throws Exception{ { if(!done) { - statusTransition(tstamp,TStamp.Status.ABORTED); + statusTransition(tstamp, TStamp.Status.ABORTED); relinquish(tstamp.tpoint); } } @@ -188,14 +195,22 @@ Object run(IFn fn) throws Exception{ } - Object doGet(TRef tref) throws Exception{ TVal head = tref.tvals.get(); if(head == null) return null; if(head.tstamp == tstamp) return head.val; - for(TVal ver = head.tstamp.status == TStamp.Status.COMMITTED?head:head.prior; ver != null; ver = ver.prior) + TVal ver = null; + if(head.tstamp.status == TStamp.Status.COMMITTED) + ver = head; + else + { + synchronized(head.tstamp){ + ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior; + } + } + for(; ver != null; ver = ver.prior) { if(ver.tstamp.tpoint <= tstamp.tpoint) return ver.val; @@ -204,18 +219,17 @@ Object doGet(TRef tref) throws Exception{ } Object doSet(TRef tref, Object val) throws Exception{ - lock(tref,true); - tref.tvals.get().val = val; + TVal head = lock(tref, true); + head.val = val; return val; } void doTouch(TRef tref) throws Exception{ - lock(tref,true); + lock(tref, true); } void doCommute(TRef tref, IFn fn) throws Exception{ - lock(tref,false); - TVal head = tref.tvals.get(); + TVal head = lock(tref, false); head.val = fn.invoke(head.val); } @@ -254,4 +268,78 @@ static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ } */ +public static void main(String[] args){ + try + { + if(args.length != 3) + System.err.println("Usage: Transaction nthreads nitems niters"); + int nthreads = Integer.parseInt(args[0]); + int nitems = Integer.parseInt(args[1]); + int niters = Integer.parseInt(args[2]); + + ArrayList<TRef> items = new ArrayList(nitems); + for(int i = 0; i < nitems; i++) + items.add(new TRef(0)); + + class Incrementer extends AFn implements Callable{ + int niters; + List<TRef> items; + + + public Incrementer(int niters, List<TRef> items){ + this.niters = niters; + this.items = items; + } + + public Object call() throws Exception{ + long nanos = 0; + for(int i = 0; i < niters; i++) + { + long start = System.nanoTime(); + Transaction.runInTransaction(this); + long dur = System.nanoTime() - start; + nanos += dur; + } + return nanos; + } + + public Object invoke() throws Exception{ + for(TRef tref : items) + { + //Transaction.getTransaction().doTouch(tref); + int val = (Integer) Transaction.getTransaction().doGet(tref); + Transaction.getTransaction().doSet(tref, val + 1); + } + return null; + } + } + + ArrayList<Callable<Long>> tasks = new ArrayList(nthreads); + for(int i = 0; i < nthreads; i++) + tasks.add(new Incrementer(niters, items)); + + ExecutorService e = Executors.newFixedThreadPool(nthreads); + + long start = System.nanoTime(); + List<Future<Long>> results = e.invokeAll(tasks); + long estimatedTime = System.nanoTime() - start; + System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, + estimatedTime / 1000000); + e.shutdown(); + for(Future<Long> res : results) + { + System.out.printf("%d, ", res.get() / 1000000); + } + System.out.println(); + for(TRef item : items) + { + System.out.printf("%d, ", (Integer) item.getCurrentVal()); + } + } + catch(Exception ex) + { + ex.printStackTrace(); + } +} + } |