diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-26 15:13:50 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-26 15:13:50 +0000 |
commit | f39e88435ebb9bba67ce46a758dce82fc71eb0be (patch) | |
tree | f675934704e5a2ec14e2e956729570271f7ab5c3 /src | |
parent | 099953cd4057aacf696de026baaee0295fc4db19 (diff) |
priority queue for outstanding points, auto-trim
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/TRef.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TStamp.java | 7 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 126 |
3 files changed, 60 insertions, 75 deletions
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java index a72f1480..1f25ac2a 100644 --- a/src/jvm/clojure/lang/TRef.java +++ b/src/jvm/clojure/lang/TRef.java @@ -165,7 +165,7 @@ void trimHistory(){ } } -void trimHistoryPriorToPoint(int tpoint){ +void trimHistoryPriorToPoint(long tpoint){ long ctp = Transaction.completedThroughPoint(); for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) { diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java index ff3607fc..50879cd9 100644 --- a/src/jvm/clojure/lang/TStamp.java +++ b/src/jvm/clojure/lang/TStamp.java @@ -14,15 +14,16 @@ package clojure.lang; public class TStamp{ -public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY} +public static enum Status{ + RUNNING, COMMITTED, ABORTED, RETRY +} volatile Status status; volatile long tpoint; volatile long msecs; -public TStamp(long tpoint, Status status){ +public TStamp(Status status){ this.status = status; - this.tpoint = tpoint; } } diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 90a1a402..c501c346 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -12,12 +12,9 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.*; -import java.util.ArrayList; -import java.util.List; -import java.util.Collections; +import java.util.*; public class Transaction{ @@ -34,52 +31,42 @@ static class AbortException extends Exception{ //total order on transactions //transactions will consume a point for init, for each retry, and on commit if writing -final static Object tlock = new Object(); -static long nextPoint = 1; -//final static AtomicLong nextPoint = new AtomicLong(1); +private static long nextPoint = 1; +final static PriorityQueue<Long> points = new PriorityQueue<Long>(); -static long getNextPoint(){ -// return nextPoint.getAndIncrement(); - synchronized(tlock) +void getReadPoint(){ + synchronized(points) { - return nextPoint++; + completedPriorPoint = completedThroughPoint(); + points.add(nextPoint); + readPoint = nextPoint++; } } -static class PointNode{ - final long tpoint; - volatile PointNode next; - - public PointNode(long tpoint, PointNode next){ - this.tpoint = tpoint; - this.next = next; - } - - static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater = - AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next"); +static long getCommitPoint(){ + synchronized(points) + { + return nextPoint++; + } } -final static TStamp ZERO_POINT = new TStamp(0, TStamp.Status.COMMITTED); -volatile static PointNode completedPoints = new PointNode(0, null); +final static TStamp ZERO_POINT = new TStamp(TStamp.Status.COMMITTED); static long completedThroughPoint(){ - return completedPoints.tpoint; + synchronized(points) + { + Long p = points.peek(); + if(p != null) + return p - 1; + return nextPoint - 1; + } } static void relinquish(long tpoint){ - PointNode p = completedPoints; - //update completedThroughPoint - while(p.next != null && p.next.tpoint == p.tpoint + 1) - p = p.next; - completedPoints = p; - - //splice in - PointNode n; - do + synchronized(points) { - for(n = p.next; n != null && n.tpoint < tpoint; p = n, n = p.next) - ; - } while(!PointNode.nextUpdater.compareAndSet(p, n, new PointNode(tpoint, n))); + points.remove(tpoint); + } } static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ @@ -92,6 +79,8 @@ static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ TStamp tstamp; +long completedPriorPoint; +long readPoint; TVal lock(TRef tref, boolean ensurePoint) throws Exception{ TVal head = (TVal) tref.tvals.get(); @@ -115,17 +104,25 @@ TVal lock(TRef tref, boolean ensurePoint) throws Exception{ { TVal prior; if(head == null || head.tstamp.status == TStamp.Status.COMMITTED) + { prior = head; + } else //aborted/retried at head, skip over prior = head.prior; TVal ret = null; - if((ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint) + if((ensurePoint && prior != null && prior.tstamp.tpoint > readPoint) || !tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior))) { statusTransition(tstamp, TStamp.Status.RETRY); throw new RetryException(); } + //auto-trim + for(TVal tv = prior; tv != null; tv = tv.prior) + { + if(tv.tstamp.tpoint <= completedPriorPoint) + tv.prior = null; + } return ret; } } @@ -174,21 +171,20 @@ Object run(IFn fn) throws Exception{ { try { - tstamp = new TStamp(getNextPoint(), TStamp.Status.RUNNING); + getReadPoint(); + tstamp = new TStamp(TStamp.Status.RUNNING); ret = fn.invoke(); done = true; - //save the read point - long readPoint = tstamp.tpoint; tstamp.msecs = System.currentTimeMillis(); //get a commit point + alter status, atomically - synchronized(tlock) + synchronized(points) { - tstamp.tpoint = getNextPoint(); + tstamp.tpoint = getCommitPoint(); //commit! statusTransition(tstamp, TStamp.Status.COMMITTED); + relinquish(readPoint); + //relinquish(tstamp.tpoint); } - relinquish(readPoint); - relinquish(tstamp.tpoint); } catch(RetryException retry) { @@ -199,7 +195,8 @@ Object run(IFn fn) throws Exception{ if(!done) { statusTransition(tstamp, TStamp.Status.ABORTED); - relinquish(tstamp.tpoint); + relinquish(readPoint); + //relinquish(tstamp.tpoint); } } } @@ -236,7 +233,7 @@ Object doGet(TRef tref) throws Exception{ for(; ver != null; ver = ver.prior) { - if(ver.tstamp.tpoint <= tstamp.tpoint) + if(ver.tstamp.tpoint <= readPoint) return ver.val; } throw new IllegalStateException(tref.toString() + " is unbound."); @@ -257,7 +254,6 @@ Object doCommute(TRef tref, IFn fn) throws Exception{ return head.val = fn.invoke(head.val); } - /* static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{ if(get() != null) @@ -292,39 +288,24 @@ static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ } */ + +//for test +static CyclicBarrier barrier; + public static void main(String[] args){ try { - if(args.length != 3) - System.err.println("Usage: Transaction nthreads nitems niters"); + if(args.length != 4) + System.err.println("Usage: Transaction nthreads nitems niters ninstances"); int nthreads = Integer.parseInt(args[0]); int nitems = Integer.parseInt(args[1]); int niters = Integer.parseInt(args[2]); + int ninstances = Integer.parseInt(args[3]); final ArrayList<TRef> items = new ArrayList(nitems); for(int i = 0; i < nitems; i++) items.add(new TRef(0)); - Thread trimThread = new Thread(new Runnable(){ - public void run(){ - for(; ;) - { - for(TRef tref : items) - { - tref.trimHistory(); - } - try - { - Thread.sleep(10); - } - catch(InterruptedException e) - { - return; - } - } - } - }); - trimThread.start(); class Incr extends AFn{ public Object invoke(Object arg1) throws Exception{ @@ -424,13 +405,16 @@ public static void main(String[] args){ } ExecutorService e = Executors.newFixedThreadPool(nthreads); + if(barrier == null) + barrier = new CyclicBarrier(ninstances); + barrier.await(); long start = System.nanoTime(); List<Future<Long>> results = e.invokeAll(tasks); long estimatedTime = System.nanoTime() - start; System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, estimatedTime / 1000000); e.shutdown(); - trimThread.interrupt(); + barrier.await(); for(Future<Long> res : results) { System.out.printf("%d, ", res.get() / 1000000); |