diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-06-24 18:08:23 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-06-24 18:08:23 +0000 |
commit | bc13a0cb50ef1f6da7b3c72e2024ff511b5af631 (patch) | |
tree | 75f5ca8c3eaf006dbf81bcffbd37b22441b3022c /src | |
parent | 32ed38ea8f0dde2b8d3ac5b489510ebd359da979 (diff) |
snapshot MVCC transactions
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 101 |
1 files changed, 89 insertions, 12 deletions
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 14a9a7ec..25ca109b 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -18,6 +18,7 @@ import java.util.concurrent.*; import java.util.ArrayList; import java.util.List; import java.util.Iterator; +import java.util.Collections; public class Transaction{ @@ -166,8 +167,9 @@ Object run(IFn fn) throws Exception{ done = true; //save the read point long readPoint = tstamp.tpoint; - //get a commit point and time - synchronized(tstamp) { + //get a commit point and time, bundle with state transition + synchronized(tstamp) + { tstamp.tpoint = getNextPoint(); tstamp.msecs = System.currentTimeMillis(); //commit! @@ -201,15 +203,25 @@ Object doGet(TRef tref) throws Exception{ return null; if(head.tstamp == tstamp) return head.val; - TVal ver = null; - if(head.tstamp.status == TStamp.Status.COMMITTED) - ver = head; - else + TVal ver; + switch(head.tstamp.status) { - synchronized(head.tstamp){ - ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior; - } + case COMMITTED: + ver = head; + break; + case RETRY: + case ABORTED: + ver = head.prior; + break; + default: + //ensure a running->commit transition happens before/after our read point + synchronized(head.tstamp) + { + ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior; + } + break; } + for(; ver != null; ver = ver.prior) { if(ver.tstamp.tpoint <= tstamp.tpoint) @@ -277,10 +289,71 @@ public static void main(String[] args){ int nitems = Integer.parseInt(args[1]); int niters = Integer.parseInt(args[2]); - ArrayList<TRef> items = new ArrayList(nitems); + final ArrayList<TRef> items = new ArrayList(nitems); for(int i = 0; i < nitems; i++) items.add(new TRef(0)); + Thread trimThread = new Thread(new Runnable(){ + public void run(){ + for(; ;) + { + for(TRef tref : items) + { + tref.trimHistory(); + } + try + { + Thread.sleep(10); + } + catch(InterruptedException e) + { + return; + } + } + } + }); + trimThread.start(); + + class Incr extends AFn{ + public Object invoke(Object arg1) throws Exception{ + Integer i = (Integer) arg1; + return i + 1; + } + } + + class Commuter extends AFn implements Callable{ + int niters; + List<TRef> items; + Incr incr; + + + public Commuter(int niters, List<TRef> items){ + this.niters = niters; + this.items = items; + this.incr = new Incr(); + } + + public Object call() throws Exception{ + long nanos = 0; + for(int i = 0; i < niters; i++) + { + long start = System.nanoTime(); + Transaction.runInTransaction(this); + long dur = System.nanoTime() - start; + nanos += dur; + } + return nanos; + } + + public Object invoke() throws Exception{ + for(TRef tref : items) + { + Transaction.getTransaction().doCommute(tref, incr); + } + return null; + } + } + class Incrementer extends AFn implements Callable{ int niters; List<TRef> items; @@ -316,8 +389,11 @@ public static void main(String[] args){ ArrayList<Callable<Long>> tasks = new ArrayList(nthreads); for(int i = 0; i < nthreads; i++) - tasks.add(new Incrementer(niters, items)); - + { + ArrayList<TRef> si = (ArrayList<TRef>) items.clone(); + Collections.shuffle(si); + tasks.add(new Incrementer(niters, si)); + } ExecutorService e = Executors.newFixedThreadPool(nthreads); long start = System.nanoTime(); @@ -326,6 +402,7 @@ public static void main(String[] args){ System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, estimatedTime / 1000000); e.shutdown(); + trimThread.interrupt(); for(Future<Long> res : results) { System.out.printf("%d, ", res.get() / 1000000); |