summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-06-24 15:52:17 +0000
committerRich Hickey <richhickey@gmail.com>2007-06-24 15:52:17 +0000
commit32ed38ea8f0dde2b8d3ac5b489510ebd359da979 (patch)
treeb8a18c5aef14669c6afe469bb03874e1a50947d9
parent76e5252aaf17760df635115999bc212a860c8b9d (diff)
interim checkin
-rw-r--r--src/jvm/clojure/lang/TObj.java5
-rw-r--r--src/jvm/clojure/lang/TRef.java5
-rw-r--r--src/jvm/clojure/lang/TStamp.java10
-rw-r--r--src/jvm/clojure/lang/Transaction.java164
4 files changed, 139 insertions, 45 deletions
diff --git a/src/jvm/clojure/lang/TObj.java b/src/jvm/clojure/lang/TObj.java
index 50c9337e..e4366f1f 100644
--- a/src/jvm/clojure/lang/TObj.java
+++ b/src/jvm/clojure/lang/TObj.java
@@ -10,9 +10,9 @@
package clojure.lang;
-public class TObj implements IObj{
+abstract public class TObj implements IObj{
TRef _attrs;
-
+ /*
public TObj() throws Exception{
this._attrs = Transaction.tref(PersistentArrayMap.EMPTY);
}
@@ -44,4 +44,5 @@ public void removeAttr(Object key) throws Exception {
t = t.without(key);
Transaction.set(_attrs,t);
}
+*/
}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
index 24105f7a..33f05faa 100644
--- a/src/jvm/clojure/lang/TRef.java
+++ b/src/jvm/clojure/lang/TRef.java
@@ -22,6 +22,11 @@ public TRef() {
this.tvals = new AtomicReference<TVal>();
}
+public TRef(Object initVal) {
+ this.tvals = new AtomicReference<TVal>();
+ tvals.set(new TVal(initVal, Transaction.ZERO_POINT, null));
+}
+
public Object getCurrentVal(){
TVal current = getCurrentTVal();
if(current != null)
diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java
index 88c9e656..ff3607fc 100644
--- a/src/jvm/clojure/lang/TStamp.java
+++ b/src/jvm/clojure/lang/TStamp.java
@@ -16,13 +16,13 @@ public class TStamp{
public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY}
-Status status;
-long tpoint;
-long msecs;
+volatile Status status;
+volatile long tpoint;
+volatile long msecs;
-public TStamp(long tpoint){
- this.status = Status.RUNNING;
+public TStamp(long tpoint, Status status){
+ this.status = status;
this.tpoint = tpoint;
}
}
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
index 8a2a6c89..14a9a7ec 100644
--- a/src/jvm/clojure/lang/Transaction.java
+++ b/src/jvm/clojure/lang/Transaction.java
@@ -14,10 +14,14 @@ package clojure.lang;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
public class Transaction{
-public static int RETRY_LIMIT = 100;
+public static int RETRY_LIMIT = 1000;
public static int LOCK_WAIT_MSECS = 100;
final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>();
@@ -44,10 +48,12 @@ static class PointNode{
this.tpoint = tpoint;
this.next = next;
}
- static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater =
- AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next");
+
+ static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next");
}
+final static TStamp ZERO_POINT = new TStamp(0, TStamp.Status.COMMITTED);
volatile static PointNode completedPoints = new PointNode(0, null);
static long completedThroughPoint(){
@@ -57,16 +63,17 @@ static long completedThroughPoint(){
static void relinquish(long tpoint){
PointNode p = completedPoints;
//update completedThroughPoint
- while(p.next != null && p.next.tpoint == p.tpoint+1)
+ while(p.next != null && p.next.tpoint == p.tpoint + 1)
p = p.next;
completedPoints = p;
//splice in
PointNode n;
- do{
- for(n=p.next;n != null && n.tpoint < tpoint;p = n, n = p.next)
- ;
- }while(!PointNode.nextUpdater.compareAndSet(p,n,new PointNode(tpoint,n)));
+ do
+ {
+ for(n = p.next; n != null && n.tpoint < tpoint; p = n, n = p.next)
+ ;
+ } while(!PointNode.nextUpdater.compareAndSet(p, n, new PointNode(tpoint, n)));
}
static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
@@ -80,22 +87,23 @@ static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
TStamp tstamp;
-void lock(TRef tref, boolean ensurePoint) throws Exception{
+TVal lock(TRef tref, boolean ensurePoint) throws Exception{
TVal head = tref.tvals.get();
//already locked by this transaction
if(head != null && head.tstamp == tstamp)
- return;
- boolean locked;
+ return head;
if(head != null && head.tstamp.status == TStamp.Status.RUNNING)
{
//already locked by another transaction, block a bit
+ //first drop our locks
+ statusTransition(tstamp, TStamp.Status.RETRY);
synchronized(head.tstamp)
{
if(head.tstamp.status == TStamp.Status.RUNNING)
head.tstamp.wait(LOCK_WAIT_MSECS);
}
- locked = false;
+ throw new RetryException();
}
else
{
@@ -104,22 +112,20 @@ void lock(TRef tref, boolean ensurePoint) throws Exception{
prior = head;
else //aborted/retried at head, skip over
prior = head.prior;
-
- if(ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint)
- locked = false;
- else
- locked = tref.tvals.compareAndSet(head, new TVal(prior == null ? null : prior.val, tstamp, prior));
- }
-
- if(!locked)
- {
- statusTransition(tstamp,TStamp.Status.RETRY);
- throw new RetryException();
+ TVal ret = null;
+ if((ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint)
+ ||
+ !tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior)))
+ {
+ statusTransition(tstamp, TStamp.Status.RETRY);
+ throw new RetryException();
+ }
+ return ret;
}
}
void abort() throws AbortException{
- statusTransition(tstamp,TStamp.Status.ABORTED);
+ statusTransition(tstamp, TStamp.Status.ABORTED);
throw new AbortException();
}
@@ -155,17 +161,18 @@ Object run(IFn fn) throws Exception{
{
try
{
- tstamp = new TStamp(getNextPoint());
+ tstamp = new TStamp(getNextPoint(), TStamp.Status.RUNNING);
ret = fn.invoke();
done = true;
//save the read point
long readPoint = tstamp.tpoint;
//get a commit point and time
- tstamp.tpoint = getNextPoint();
- tstamp.msecs = System.currentTimeMillis();
- //commit!
- statusTransition(tstamp, TStamp.Status.COMMITTED);
-
+ synchronized(tstamp) {
+ tstamp.tpoint = getNextPoint();
+ tstamp.msecs = System.currentTimeMillis();
+ //commit!
+ statusTransition(tstamp, TStamp.Status.COMMITTED);
+ }
relinquish(readPoint);
relinquish(tstamp.tpoint);
}
@@ -177,7 +184,7 @@ Object run(IFn fn) throws Exception{
{
if(!done)
{
- statusTransition(tstamp,TStamp.Status.ABORTED);
+ statusTransition(tstamp, TStamp.Status.ABORTED);
relinquish(tstamp.tpoint);
}
}
@@ -188,14 +195,22 @@ Object run(IFn fn) throws Exception{
}
-
Object doGet(TRef tref) throws Exception{
TVal head = tref.tvals.get();
if(head == null)
return null;
if(head.tstamp == tstamp)
return head.val;
- for(TVal ver = head.tstamp.status == TStamp.Status.COMMITTED?head:head.prior; ver != null; ver = ver.prior)
+ TVal ver = null;
+ if(head.tstamp.status == TStamp.Status.COMMITTED)
+ ver = head;
+ else
+ {
+ synchronized(head.tstamp){
+ ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior;
+ }
+ }
+ for(; ver != null; ver = ver.prior)
{
if(ver.tstamp.tpoint <= tstamp.tpoint)
return ver.val;
@@ -204,18 +219,17 @@ Object doGet(TRef tref) throws Exception{
}
Object doSet(TRef tref, Object val) throws Exception{
- lock(tref,true);
- tref.tvals.get().val = val;
+ TVal head = lock(tref, true);
+ head.val = val;
return val;
}
void doTouch(TRef tref) throws Exception{
- lock(tref,true);
+ lock(tref, true);
}
void doCommute(TRef tref, IFn fn) throws Exception{
- lock(tref,false);
- TVal head = tref.tvals.get();
+ TVal head = lock(tref, false);
head.val = fn.invoke(head.val);
}
@@ -254,4 +268,78 @@ static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
}
*/
+public static void main(String[] args){
+ try
+ {
+ if(args.length != 3)
+ System.err.println("Usage: Transaction nthreads nitems niters");
+ int nthreads = Integer.parseInt(args[0]);
+ int nitems = Integer.parseInt(args[1]);
+ int niters = Integer.parseInt(args[2]);
+
+ ArrayList<TRef> items = new ArrayList(nitems);
+ for(int i = 0; i < nitems; i++)
+ items.add(new TRef(0));
+
+ class Incrementer extends AFn implements Callable{
+ int niters;
+ List<TRef> items;
+
+
+ public Incrementer(int niters, List<TRef> items){
+ this.niters = niters;
+ this.items = items;
+ }
+
+ public Object call() throws Exception{
+ long nanos = 0;
+ for(int i = 0; i < niters; i++)
+ {
+ long start = System.nanoTime();
+ Transaction.runInTransaction(this);
+ long dur = System.nanoTime() - start;
+ nanos += dur;
+ }
+ return nanos;
+ }
+
+ public Object invoke() throws Exception{
+ for(TRef tref : items)
+ {
+ //Transaction.getTransaction().doTouch(tref);
+ int val = (Integer) Transaction.getTransaction().doGet(tref);
+ Transaction.getTransaction().doSet(tref, val + 1);
+ }
+ return null;
+ }
+ }
+
+ ArrayList<Callable<Long>> tasks = new ArrayList(nthreads);
+ for(int i = 0; i < nthreads; i++)
+ tasks.add(new Incrementer(niters, items));
+
+ ExecutorService e = Executors.newFixedThreadPool(nthreads);
+
+ long start = System.nanoTime();
+ List<Future<Long>> results = e.invokeAll(tasks);
+ long estimatedTime = System.nanoTime() - start;
+ System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
+ estimatedTime / 1000000);
+ e.shutdown();
+ for(Future<Long> res : results)
+ {
+ System.out.printf("%d, ", res.get() / 1000000);
+ }
+ System.out.println();
+ for(TRef item : items)
+ {
+ System.out.printf("%d, ", (Integer) item.getCurrentVal());
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+}
+
}