summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-06-24 18:08:23 +0000
committerRich Hickey <richhickey@gmail.com>2007-06-24 18:08:23 +0000
commitbc13a0cb50ef1f6da7b3c72e2024ff511b5af631 (patch)
tree75f5ca8c3eaf006dbf81bcffbd37b22441b3022c /src
parent32ed38ea8f0dde2b8d3ac5b489510ebd359da979 (diff)
snapshot MVCC transactions
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/Transaction.java101
1 files changed, 89 insertions, 12 deletions
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
index 14a9a7ec..25ca109b 100644
--- a/src/jvm/clojure/lang/Transaction.java
+++ b/src/jvm/clojure/lang/Transaction.java
@@ -18,6 +18,7 @@ import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
+import java.util.Collections;
public class Transaction{
@@ -166,8 +167,9 @@ Object run(IFn fn) throws Exception{
done = true;
//save the read point
long readPoint = tstamp.tpoint;
- //get a commit point and time
- synchronized(tstamp) {
+ //get a commit point and time, bundle with state transition
+ synchronized(tstamp)
+ {
tstamp.tpoint = getNextPoint();
tstamp.msecs = System.currentTimeMillis();
//commit!
@@ -201,15 +203,25 @@ Object doGet(TRef tref) throws Exception{
return null;
if(head.tstamp == tstamp)
return head.val;
- TVal ver = null;
- if(head.tstamp.status == TStamp.Status.COMMITTED)
- ver = head;
- else
+ TVal ver;
+ switch(head.tstamp.status)
{
- synchronized(head.tstamp){
- ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior;
- }
+ case COMMITTED:
+ ver = head;
+ break;
+ case RETRY:
+ case ABORTED:
+ ver = head.prior;
+ break;
+ default:
+ //ensure a running->commit transition happens before/after our read point
+ synchronized(head.tstamp)
+ {
+ ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior;
+ }
+ break;
}
+
for(; ver != null; ver = ver.prior)
{
if(ver.tstamp.tpoint <= tstamp.tpoint)
@@ -277,10 +289,71 @@ public static void main(String[] args){
int nitems = Integer.parseInt(args[1]);
int niters = Integer.parseInt(args[2]);
- ArrayList<TRef> items = new ArrayList(nitems);
+ final ArrayList<TRef> items = new ArrayList(nitems);
for(int i = 0; i < nitems; i++)
items.add(new TRef(0));
+ Thread trimThread = new Thread(new Runnable(){
+ public void run(){
+ for(; ;)
+ {
+ for(TRef tref : items)
+ {
+ tref.trimHistory();
+ }
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch(InterruptedException e)
+ {
+ return;
+ }
+ }
+ }
+ });
+ trimThread.start();
+
+ class Incr extends AFn{
+ public Object invoke(Object arg1) throws Exception{
+ Integer i = (Integer) arg1;
+ return i + 1;
+ }
+ }
+
+ class Commuter extends AFn implements Callable{
+ int niters;
+ List<TRef> items;
+ Incr incr;
+
+
+ public Commuter(int niters, List<TRef> items){
+ this.niters = niters;
+ this.items = items;
+ this.incr = new Incr();
+ }
+
+ public Object call() throws Exception{
+ long nanos = 0;
+ for(int i = 0; i < niters; i++)
+ {
+ long start = System.nanoTime();
+ Transaction.runInTransaction(this);
+ long dur = System.nanoTime() - start;
+ nanos += dur;
+ }
+ return nanos;
+ }
+
+ public Object invoke() throws Exception{
+ for(TRef tref : items)
+ {
+ Transaction.getTransaction().doCommute(tref, incr);
+ }
+ return null;
+ }
+ }
+
class Incrementer extends AFn implements Callable{
int niters;
List<TRef> items;
@@ -316,8 +389,11 @@ public static void main(String[] args){
ArrayList<Callable<Long>> tasks = new ArrayList(nthreads);
for(int i = 0; i < nthreads; i++)
- tasks.add(new Incrementer(niters, items));
-
+ {
+ ArrayList<TRef> si = (ArrayList<TRef>) items.clone();
+ Collections.shuffle(si);
+ tasks.add(new Incrementer(niters, si));
+ }
ExecutorService e = Executors.newFixedThreadPool(nthreads);
long start = System.nanoTime();
@@ -326,6 +402,7 @@ public static void main(String[] args){
System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
estimatedTime / 1000000);
e.shutdown();
+ trimThread.interrupt();
for(Future<Long> res : results)
{
System.out.printf("%d, ", res.get() / 1000000);