summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/TRef.java2
-rw-r--r--src/jvm/clojure/lang/TStamp.java7
-rw-r--r--src/jvm/clojure/lang/Transaction.java126
3 files changed, 60 insertions, 75 deletions
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
index a72f1480..1f25ac2a 100644
--- a/src/jvm/clojure/lang/TRef.java
+++ b/src/jvm/clojure/lang/TRef.java
@@ -165,7 +165,7 @@ void trimHistory(){
}
}
-void trimHistoryPriorToPoint(int tpoint){
+void trimHistoryPriorToPoint(long tpoint){
long ctp = Transaction.completedThroughPoint();
for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
{
diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java
index ff3607fc..50879cd9 100644
--- a/src/jvm/clojure/lang/TStamp.java
+++ b/src/jvm/clojure/lang/TStamp.java
@@ -14,15 +14,16 @@ package clojure.lang;
public class TStamp{
-public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY}
+public static enum Status{
+ RUNNING, COMMITTED, ABORTED, RETRY
+}
volatile Status status;
volatile long tpoint;
volatile long msecs;
-public TStamp(long tpoint, Status status){
+public TStamp(Status status){
this.status = status;
- this.tpoint = tpoint;
}
}
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
index 90a1a402..c501c346 100644
--- a/src/jvm/clojure/lang/Transaction.java
+++ b/src/jvm/clojure/lang/Transaction.java
@@ -12,12 +12,9 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Collections;
+import java.util.*;
public class Transaction{
@@ -34,52 +31,42 @@ static class AbortException extends Exception{
//total order on transactions
//transactions will consume a point for init, for each retry, and on commit if writing
-final static Object tlock = new Object();
-static long nextPoint = 1;
-//final static AtomicLong nextPoint = new AtomicLong(1);
+private static long nextPoint = 1;
+final static PriorityQueue<Long> points = new PriorityQueue<Long>();
-static long getNextPoint(){
-// return nextPoint.getAndIncrement();
- synchronized(tlock)
+void getReadPoint(){
+ synchronized(points)
{
- return nextPoint++;
+ completedPriorPoint = completedThroughPoint();
+ points.add(nextPoint);
+ readPoint = nextPoint++;
}
}
-static class PointNode{
- final long tpoint;
- volatile PointNode next;
-
- public PointNode(long tpoint, PointNode next){
- this.tpoint = tpoint;
- this.next = next;
- }
-
- static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater =
- AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next");
+static long getCommitPoint(){
+ synchronized(points)
+ {
+ return nextPoint++;
+ }
}
-final static TStamp ZERO_POINT = new TStamp(0, TStamp.Status.COMMITTED);
-volatile static PointNode completedPoints = new PointNode(0, null);
+final static TStamp ZERO_POINT = new TStamp(TStamp.Status.COMMITTED);
static long completedThroughPoint(){
- return completedPoints.tpoint;
+ synchronized(points)
+ {
+ Long p = points.peek();
+ if(p != null)
+ return p - 1;
+ return nextPoint - 1;
+ }
}
static void relinquish(long tpoint){
- PointNode p = completedPoints;
- //update completedThroughPoint
- while(p.next != null && p.next.tpoint == p.tpoint + 1)
- p = p.next;
- completedPoints = p;
-
- //splice in
- PointNode n;
- do
+ synchronized(points)
{
- for(n = p.next; n != null && n.tpoint < tpoint; p = n, n = p.next)
- ;
- } while(!PointNode.nextUpdater.compareAndSet(p, n, new PointNode(tpoint, n)));
+ points.remove(tpoint);
+ }
}
static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
@@ -92,6 +79,8 @@ static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
TStamp tstamp;
+long completedPriorPoint;
+long readPoint;
TVal lock(TRef tref, boolean ensurePoint) throws Exception{
TVal head = (TVal) tref.tvals.get();
@@ -115,17 +104,25 @@ TVal lock(TRef tref, boolean ensurePoint) throws Exception{
{
TVal prior;
if(head == null || head.tstamp.status == TStamp.Status.COMMITTED)
+ {
prior = head;
+ }
else //aborted/retried at head, skip over
prior = head.prior;
TVal ret = null;
- if((ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint)
+ if((ensurePoint && prior != null && prior.tstamp.tpoint > readPoint)
||
!tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior)))
{
statusTransition(tstamp, TStamp.Status.RETRY);
throw new RetryException();
}
+ //auto-trim
+ for(TVal tv = prior; tv != null; tv = tv.prior)
+ {
+ if(tv.tstamp.tpoint <= completedPriorPoint)
+ tv.prior = null;
+ }
return ret;
}
}
@@ -174,21 +171,20 @@ Object run(IFn fn) throws Exception{
{
try
{
- tstamp = new TStamp(getNextPoint(), TStamp.Status.RUNNING);
+ getReadPoint();
+ tstamp = new TStamp(TStamp.Status.RUNNING);
ret = fn.invoke();
done = true;
- //save the read point
- long readPoint = tstamp.tpoint;
tstamp.msecs = System.currentTimeMillis();
//get a commit point + alter status, atomically
- synchronized(tlock)
+ synchronized(points)
{
- tstamp.tpoint = getNextPoint();
+ tstamp.tpoint = getCommitPoint();
//commit!
statusTransition(tstamp, TStamp.Status.COMMITTED);
+ relinquish(readPoint);
+ //relinquish(tstamp.tpoint);
}
- relinquish(readPoint);
- relinquish(tstamp.tpoint);
}
catch(RetryException retry)
{
@@ -199,7 +195,8 @@ Object run(IFn fn) throws Exception{
if(!done)
{
statusTransition(tstamp, TStamp.Status.ABORTED);
- relinquish(tstamp.tpoint);
+ relinquish(readPoint);
+ //relinquish(tstamp.tpoint);
}
}
}
@@ -236,7 +233,7 @@ Object doGet(TRef tref) throws Exception{
for(; ver != null; ver = ver.prior)
{
- if(ver.tstamp.tpoint <= tstamp.tpoint)
+ if(ver.tstamp.tpoint <= readPoint)
return ver.val;
}
throw new IllegalStateException(tref.toString() + " is unbound.");
@@ -257,7 +254,6 @@ Object doCommute(TRef tref, IFn fn) throws Exception{
return head.val = fn.invoke(head.val);
}
-
/*
static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{
if(get() != null)
@@ -292,39 +288,24 @@ static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
}
*/
+
+//for test
+static CyclicBarrier barrier;
+
public static void main(String[] args){
try
{
- if(args.length != 3)
- System.err.println("Usage: Transaction nthreads nitems niters");
+ if(args.length != 4)
+ System.err.println("Usage: Transaction nthreads nitems niters ninstances");
int nthreads = Integer.parseInt(args[0]);
int nitems = Integer.parseInt(args[1]);
int niters = Integer.parseInt(args[2]);
+ int ninstances = Integer.parseInt(args[3]);
final ArrayList<TRef> items = new ArrayList(nitems);
for(int i = 0; i < nitems; i++)
items.add(new TRef(0));
- Thread trimThread = new Thread(new Runnable(){
- public void run(){
- for(; ;)
- {
- for(TRef tref : items)
- {
- tref.trimHistory();
- }
- try
- {
- Thread.sleep(10);
- }
- catch(InterruptedException e)
- {
- return;
- }
- }
- }
- });
- trimThread.start();
class Incr extends AFn{
public Object invoke(Object arg1) throws Exception{
@@ -424,13 +405,16 @@ public static void main(String[] args){
}
ExecutorService e = Executors.newFixedThreadPool(nthreads);
+ if(barrier == null)
+ barrier = new CyclicBarrier(ninstances);
+ barrier.await();
long start = System.nanoTime();
List<Future<Long>> results = e.invokeAll(tasks);
long estimatedTime = System.nanoTime() - start;
System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
estimatedTime / 1000000);
e.shutdown();
- trimThread.interrupt();
+ barrier.await();
for(Future<Long> res : results)
{
System.out.printf("%d, ", res.get() / 1000000);