summaryrefslogtreecommitdiff
path: root/src/jvm/clojure
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-06-24 01:28:43 +0000
committerRich Hickey <richhickey@gmail.com>2007-06-24 01:28:43 +0000
commit76e5252aaf17760df635115999bc212a860c8b9d (patch)
tree7ccde66317ec933c0ffc98a4da9ad7b80cb43cf7 /src/jvm/clojure
parent97c180a498e6ca60c36b4dbf08581bc27d8aeb2a (diff)
interim checkin
Diffstat (limited to 'src/jvm/clojure')
-rw-r--r--src/jvm/clojure/lang/PersistentTreeMap.java8
-rw-r--r--src/jvm/clojure/lang/TPool.java84
-rw-r--r--src/jvm/clojure/lang/TRef.java82
-rw-r--r--src/jvm/clojure/lang/TStamp.java12
-rw-r--r--src/jvm/clojure/lang/TVal.java7
-rw-r--r--src/jvm/clojure/lang/Transaction.java382
6 files changed, 240 insertions, 335 deletions
diff --git a/src/jvm/clojure/lang/PersistentTreeMap.java b/src/jvm/clojure/lang/PersistentTreeMap.java
index f2d4b70e..6ad0c6ca 100644
--- a/src/jvm/clojure/lang/PersistentTreeMap.java
+++ b/src/jvm/clojure/lang/PersistentTreeMap.java
@@ -791,7 +791,7 @@ static public void main(String args[]){
long estimatedTime = System.nanoTime() - startTime;
System.out.println();
- System.out.println("_count = " + set.count() + ", time: " + estimatedTime/10000);
+ System.out.println("_count = " + set.count() + ", time: " + estimatedTime/1000000);
System.out.println("Building ht");
Hashtable ht = new Hashtable(1001);
@@ -825,7 +825,7 @@ static public void main(String args[]){
}
estimatedTime = System.nanoTime() - startTime;
System.out.println();
- System.out.println("size = " + ht.size() + ", time: " + estimatedTime/10000);
+ System.out.println("size = " + ht.size() + ", time: " + estimatedTime/1000000);
System.out.println("set lookup");
startTime = System.nanoTime();
@@ -837,7 +837,7 @@ static public void main(String args[]){
++c;
}
estimatedTime = System.nanoTime() - startTime;
- System.out.println("notfound = " + c + ", time: " + estimatedTime/10000);
+ System.out.println("notfound = " + c + ", time: " + estimatedTime/1000000);
System.out.println("ht lookup");
startTime = System.nanoTime();
@@ -849,7 +849,7 @@ static public void main(String args[]){
++c;
}
estimatedTime = System.nanoTime() - startTime;
- System.out.println("notfound = " + c + ", time: " + estimatedTime/10000);
+ System.out.println("notfound = " + c + ", time: " + estimatedTime/1000000);
// System.out.println("_count = " + set._count + ", min: " + set.minKey() + ", max: " + set.maxKey()
// + ", depth: " + set.depth());
diff --git a/src/jvm/clojure/lang/TPool.java b/src/jvm/clojure/lang/TPool.java
deleted file mode 100644
index 9a20a325..00000000
--- a/src/jvm/clojure/lang/TPool.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich Jun 22, 2007 */
-
-package clojure.lang;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TPool{
-final ConcurrentHashMap<TRef,TVal> hist;
-
-
-public TPool(){
- this.hist = new ConcurrentHashMap<TRef,TVal>();
-}
-
-TRef createRef(){
- return new TRef(this);
-}
-
-void pushVal(TRef tref,TVal tval){
- //note this presumes caller has exclusive rights to tref
- tval.prior = hist.get(tref);
- hist.put(tref, tval);
-}
-
-TVal valAsOfPoint(TRef tref, int tpoint){
- for(TVal tv = hist.get(tref);tv != null;tv = tv.prior)
- {
- if(tv.tstamp.tpoint <= tpoint)
- return tv;
- }
- return null;
-}
-
-
-TVal valAsOfTime(TRef tref,long msecs){
- for(TVal tv = hist.get(tref);tv != null;tv = tv.prior)
- {
- if(tv.tstamp.msecs <= msecs)
- return tv;
- }
- return null;
-}
-
-void trimHistory(){
- int ctp = Transaction.completedThroughPoint();
- for(TVal tv : hist.values())
- {
- while(tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-void trimHistoryPriorToPoint(int tpoint){
- int ctp = Transaction.completedThroughPoint();
- for(TVal tv : hist.values())
- {
- while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-void trimHistoryPriorToTime(long msecs){
- int ctp = Transaction.completedThroughPoint();
- for(TVal tv : hist.values())
- {
- while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
index edf53eee..24105f7a 100644
--- a/src/jvm/clojure/lang/TRef.java
+++ b/src/jvm/clojure/lang/TRef.java
@@ -12,34 +12,76 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
-public class TRef implements Comparable{
-final static AtomicInteger nextSeq = new AtomicInteger(1);
+public class TRef{
+//reference to a chain of TVals, only the head of which may be non-committed
+AtomicReference<TVal> tvals;
-final int lockSeq;
-final AtomicInteger lockedBy;
-final TPool pool;
-volatile TVal tval;
+public TRef() {
+ this.tvals = new AtomicReference<TVal>();
+}
+
+public Object getCurrentVal(){
+ TVal current = getCurrentTVal();
+ if(current != null)
+ return current.val;
+ return null;
+}
+
+TVal getCurrentTVal(){
+ TVal head = tvals.get();
+ if(head == null || head.tstamp.status == TStamp.Status.COMMITTED)
+ return head;
+ return head.prior;
+}
+
+TVal valAsOfPoint(TRef tref, int tpoint){
+ for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior)
+ {
+ if(tv.tstamp.tpoint <= tpoint)
+ return tv;
+ }
+ return null;
+}
-public TRef(TPool pool) {
- this.pool = pool;
- this.lockSeq = nextSeq.getAndIncrement();
- this.lockedBy = new AtomicInteger();
- this.tval = null;
+TVal valAsOfTime(TRef tref,long msecs){
+ for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior)
+ {
+ if(tv.tstamp.msecs <= msecs)
+ return tv;
+ }
+ return null;
}
-void push(TVal tval){
- pool.pushVal(this,tval);
- this.tval = tval;
+void trimHistory(){
+ long ctp = Transaction.completedThroughPoint();
+ for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior)
+ {
+ while(tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
}
-public int compareTo(Object o){
- return lockSeq - ((TRef) o).lockSeq;
+
+void trimHistoryPriorToPoint(int tpoint){
+ long ctp = Transaction.completedThroughPoint();
+ for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior)
+ {
+ while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
}
-public Object getLatestVal(){
- //will NPE if never been set
- return tval.val;
+void trimHistoryPriorToTime(long msecs){
+ long ctp = Transaction.completedThroughPoint();
+ for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior)
+ {
+ while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
}
}
diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java
index fa9d80ec..88c9e656 100644
--- a/src/jvm/clojure/lang/TStamp.java
+++ b/src/jvm/clojure/lang/TStamp.java
@@ -13,12 +13,16 @@
package clojure.lang;
public class TStamp{
-final int tpoint;
-final long msecs;
+public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY}
-public TStamp(int tpoint, long msecs){
+Status status;
+long tpoint;
+long msecs;
+
+
+public TStamp(long tpoint){
+ this.status = Status.RUNNING;
this.tpoint = tpoint;
- this.msecs = msecs;
}
}
diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java
index 6d0e78ce..916cbaa0 100644
--- a/src/jvm/clojure/lang/TVal.java
+++ b/src/jvm/clojure/lang/TVal.java
@@ -14,14 +14,13 @@ package clojure.lang;
public class TVal{
-public final Object val;
+volatile Object val;
public final TStamp tstamp;
volatile TVal prior;
-
-TVal(Object val, TStamp tstamp) {
+TVal(Object val, TStamp tstamp, TVal prior) {
this.val = val;
this.tstamp = tstamp;
- this.prior = null;
+ this.prior = prior;
}
}
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
index 2f6e8a88..8a2a6c89 100644
--- a/src/jvm/clojure/lang/Transaction.java
+++ b/src/jvm/clojure/lang/Transaction.java
@@ -12,243 +12,173 @@
package clojure.lang;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class Transaction{
+public static int RETRY_LIMIT = 100;
+public static int LOCK_WAIT_MSECS = 100;
+
final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>();
-//total order on transactions
-//transactions will consume a point on init, and another on commit if writing
-final static AtomicInteger nextPoint = new AtomicInteger(1);
-
-final static ConcurrentSkipListSet<Integer> completedPoints = new ConcurrentSkipListSet<Integer>();
-
-static int completedThroughPoint(){
- Iterator<Integer> i = completedPoints.iterator();
- if(!i.hasNext())
- return 0;
- int base, tp;
- base = tp = i.next();
- while(i.hasNext() && i.next() == tp + 1)
- ++tp;
- if(tp != base)
- completedPoints.removeAll(completedPoints.headSet(tp));
- return tp;
+static class RetryException extends Exception{
}
-static Transaction getTransaction(){
- return transaction.get();
+static class AbortException extends Exception{
}
-static void setTransaction(Transaction t){
- transaction.set(t);
+//total order on transactions
+//transactions will consume a point for init, for each retry, and on commit if writing
+final static AtomicLong nextPoint = new AtomicLong(1);
+
+static long getNextPoint(){
+ return nextPoint.getAndIncrement();
}
-static public Object runInTransaction(IFn fn) throws Exception{
- if(getTransaction() != null)
- return fn.invoke();
+static class PointNode{
+ final long tpoint;
+ volatile PointNode next;
- Transaction t = new Transaction();
- setTransaction(t);
- try
- {
- return t.run(fn);
- }
- finally
- {
- if(!t.asOf)
- {
- completedPoints.add(t.readPoint);
- if(t.writePoint > 0)
- completedPoints.add(t.writePoint);
- }
- setTransaction(null);
- }
+ public PointNode(long tpoint, PointNode next){
+ this.tpoint = tpoint;
+ this.next = next;
+ }
+ static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next");
}
-static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{
- if(getTransaction() != null)
- throw new Exception("As-of transactions cannot be nested");
+volatile static PointNode completedPoints = new PointNode(0, null);
- Transaction t = new Transaction(tpoint);
- setTransaction(t);
- try
- {
- return t.run(fn);
- }
- finally
- {
- setTransaction(null);
- }
+static long completedThroughPoint(){
+ return completedPoints.tpoint;
}
-static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
- if(getTransaction() != null)
- throw new Exception("As-of transactions cannot be nested");
+static void relinquish(long tpoint){
+ PointNode p = completedPoints;
+ //update completedThroughPoint
+ while(p.next != null && p.next.tpoint == p.tpoint+1)
+ p = p.next;
+ completedPoints = p;
+
+ //splice in
+ PointNode n;
+ do{
+ for(n=p.next;n != null && n.tpoint < tpoint;p = n, n = p.next)
+ ;
+ }while(!PointNode.nextUpdater.compareAndSet(p,n,new PointNode(tpoint,n)));
+}
- Transaction t = new Transaction(msecs);
- setTransaction(t);
- try
- {
- return t.run(fn);
- }
- finally
+static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
+ synchronized(tstamp)
{
- setTransaction(null);
+ tstamp.status = newStatus;
+ tstamp.notifyAll();
}
}
-/*
-static public Object get(TRef tref) throws Exception{
- Transaction trans = getTransaction();
- if(trans != null)
- return trans.doGet(tref);
- return getCurrent(tref).val;
-}
-
-static public Object set(TRef tref, Object val) throws Exception{
- return getTransaction().doSet(tref,val);
-}
-static public void touch(TRef tref) throws Exception{
- getTransaction().doTouch(tref);
-}
-static public void commute(TRef tref, IFn fn) throws Exception{
- getTransaction().doCommutate(tref, fn);
-}
-//*/
+TStamp tstamp;
-boolean asOf;
-int readPoint;
-long readTimeMsecs = 0;
-int writePoint = 0;
+void lock(TRef tref, boolean ensurePoint) throws Exception{
+ TVal head = tref.tvals.get();
+ //already locked by this transaction
+ if(head != null && head.tstamp == tstamp)
+ return;
+ boolean locked;
+ if(head != null && head.tstamp.status == TStamp.Status.RUNNING)
+ {
+ //already locked by another transaction, block a bit
+ synchronized(head.tstamp)
+ {
+ if(head.tstamp.status == TStamp.Status.RUNNING)
+ head.tstamp.wait(LOCK_WAIT_MSECS);
+ }
-IdentityHashMap<TRef, Object> sets;
-IdentityHashMap<TRef, LinkedList<IFn>> commutes;
+ locked = false;
+ }
+ else
+ {
+ TVal prior;
+ if(head == null || head.tstamp.status == TStamp.Status.COMMITTED)
+ prior = head;
+ else //aborted/retried at head, skip over
+ prior = head.prior;
+
+ if(ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint)
+ locked = false;
+ else
+ locked = tref.tvals.compareAndSet(head, new TVal(prior == null ? null : prior.val, tstamp, prior));
+ }
-Transaction(boolean asOf, int readPoint, long readTimeMsecs){
- this.asOf = asOf;
- this.readPoint = readPoint;
- this.readTimeMsecs = readTimeMsecs;
+ if(!locked)
+ {
+ statusTransition(tstamp,TStamp.Status.RETRY);
+ throw new RetryException();
+ }
}
-Transaction(){
- this(false, nextPoint.getAndIncrement(), 0);
+void abort() throws AbortException{
+ statusTransition(tstamp,TStamp.Status.ABORTED);
+ throw new AbortException();
}
-Transaction(int readPoint){
- this(true, readPoint, 0);
+static Transaction getTransaction(){
+ return transaction.get();
}
-Transaction(long readTimeMsecs){
- this(true, 0, readTimeMsecs);
+static void setTransaction(Transaction t){
+ transaction.set(t);
}
-boolean casLock(TRef tref) throws Exception{
- //todo - create user-controllable policy
- for(int i = 0; i < 100; ++i)
+static public Object runInTransaction(IFn fn) throws Exception{
+ if(getTransaction() != null)
+ return fn.invoke();
+
+ Transaction t = new Transaction();
+ setTransaction(t);
+ try
+ {
+ return t.run(fn);
+ }
+ finally
{
- if(tref.lockedBy.compareAndSet(0, readPoint))
- return true;
- Thread.sleep(10);
+ setTransaction(null);
}
- return false;
}
Object run(IFn fn) throws Exception{
boolean done = false;
Object ret = null;
- ArrayList<TRef> locks = null;
- ArrayList<TRef> locked = null;
- loop:
- //todo - create user-controllable policy
- for(int i=0;!done && i<100;i++)
+ for(int i = 0; !done && i < RETRY_LIMIT; i++)
{
try
{
+ tstamp = new TStamp(getNextPoint());
ret = fn.invoke();
- //read-only trans, return right away
- if((sets == null || sets.isEmpty()) && (commutes == null || commutes.isEmpty()))
- {
- done = true;
- return ret;
- }
-
- if(locks == null && (sets != null || commutes != null))
- locks = new ArrayList<TRef>();
- if(sets != null)
- locks.addAll(sets.keySet());
- if(locks != null)
- {
- if(locked == null)
- locked = new ArrayList<TRef>(locks.size());
- //lock in order
- Collections.sort(locks);
- for(TRef tref : locks)
- {
- if(!casLock(tref))
- continue loop;
- locked.add(tref);
- if(!commutes.containsKey(tref))
- {
- //try again if the thing we are trying to set has changed since we started
- TVal curr = getCurrent(tref);
- if(curr != null && curr.tstamp.tpoint > readPoint)
- continue loop;
- }
- }
- }
-
- //at this point all write targets are locked
-
- //turn commutes into sets
- for(Map.Entry<TRef, LinkedList<IFn>> e : commutes.entrySet())
- {
- TRef tref = e.getKey();
- //note this will npe if tref has never been set, as designed
- Object val = getCurrent(tref).val;
- for(IFn f : e.getValue())
- {
- val = f.invoke(val);
- }
- sets.put(tref, val);
- }
-
- //we presume we won't throw an exception after this
- writePoint = nextPoint.getAndIncrement();
- TStamp ts = new TStamp(writePoint, System.currentTimeMillis());
-
- //set the new vals, unlock as we go
- for(Map.Entry<TRef, Object> entry : sets.entrySet())
- {
- TRef tref = entry.getKey();
- tref.push(new TVal(entry.getValue(), ts));
- tref.lockedBy.set(0);
- }
done = true;
+ //save the read point
+ long readPoint = tstamp.tpoint;
+ //get a commit point and time
+ tstamp.tpoint = getNextPoint();
+ tstamp.msecs = System.currentTimeMillis();
+ //commit!
+ statusTransition(tstamp, TStamp.Status.COMMITTED);
+
+ relinquish(readPoint);
+ relinquish(tstamp.tpoint);
+ }
+ catch(RetryException retry)
+ {
+ //eat this so we retry rather than fall out
}
finally
{
if(!done)
{
- if(locked != null)
- {
- for(TRef tref : locked)
- {
- tref.lockedBy.set(0);
- }
- locked.clear();
- }
- if(sets != null)
- sets.clear();
- if(commutes != null)
- commutes.clear();
- if(locks != null)
- locks.clear();
+ statusTransition(tstamp,TStamp.Status.ABORTED);
+ relinquish(tstamp.tpoint);
}
}
}
@@ -258,56 +188,70 @@ Object run(IFn fn) throws Exception{
}
-Object doGet(TRef tref) throws Exception{
- if(sets != null && sets.containsKey(tref))
- return sets.get(tref);
- for(TVal ver = tref.tval; ver != null; ver = ver.prior)
+Object doGet(TRef tref) throws Exception{
+ TVal head = tref.tvals.get();
+ if(head == null)
+ return null;
+ if(head.tstamp == tstamp)
+ return head.val;
+ for(TVal ver = head.tstamp.status == TStamp.Status.COMMITTED?head:head.prior; ver != null; ver = ver.prior)
{
- //note this will npe if tref has never been set, as designed
- if(readPoint > 0 && ver.tstamp.tpoint <= readPoint
- ||
- readTimeMsecs > 0 && ver.tstamp.msecs <= readTimeMsecs)
+ if(ver.tstamp.tpoint <= tstamp.tpoint)
return ver.val;
}
- throw new Exception("Version not found");
-}
-
-static TVal getCurrent(TRef tref) throws Exception{
- return tref.tval;
+ return null;
}
Object doSet(TRef tref, Object val) throws Exception{
- if(asOf)
- throw new Exception("Can't set during as-of transaction");
- if(sets == null)
- sets = new IdentityHashMap<TRef, Object>();
- if(commutes != null && commutes.containsKey(tref))
- throw new Exception("Can't commute and set a TRef in the same transaction");
-
- sets.put(tref, val);
+ lock(tref,true);
+ tref.tvals.get().val = val;
return val;
}
void doTouch(TRef tref) throws Exception{
- doSet(tref, doGet(tref));
+ lock(tref,true);
}
void doCommute(TRef tref, IFn fn) throws Exception{
- if(asOf)
- throw new Exception("Can't commute during as-of transaction");
- if(commutes == null)
- commutes = new IdentityHashMap<TRef, LinkedList<IFn>>();
- LinkedList<IFn> cs = commutes.get(tref);
- if(cs == null)
+ lock(tref,false);
+ TVal head = tref.tvals.get();
+ head.val = fn.invoke(head.val);
+}
+
+
+/*
+static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{
+ if(getTransaction() != null)
+ throw new Exception("As-of transactions cannot be nested");
+
+ Transaction t = new Transaction(tpoint);
+ setTransaction(t);
+ try
{
- if(sets != null && sets.containsKey(tref))
- throw new Exception("Can't commute and set a TRef in the same transaction");
- cs = new LinkedList<IFn>();
- commutes.put(tref, cs);
+ return fn.invoke();//t.run(fn);
+ }
+ finally
+ {
+ setTransaction(null);
+ }
+}
+
+static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
+ if(getTransaction() != null)
+ throw new Exception("As-of transactions cannot be nested");
+
+ Transaction t = new Transaction(msecs);
+ setTransaction(t);
+ try
+ {
+ return fn.invoke();//t.run(fn);
+ }
+ finally
+ {
+ setTransaction(null);
}
- cs.addLast(fn);
- sets.put(tref, fn.invoke(doGet(tref)));
}
+ */
}