summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/TPool.java84
-rw-r--r--src/jvm/clojure/lang/TRef.java26
-rw-r--r--src/jvm/clojure/lang/TStamp.java24
-rw-r--r--src/jvm/clojure/lang/TVal.java24
-rw-r--r--src/jvm/clojure/lang/Transaction.java436
5 files changed, 377 insertions, 217 deletions
diff --git a/src/jvm/clojure/lang/TPool.java b/src/jvm/clojure/lang/TPool.java
new file mode 100644
index 00000000..9a20a325
--- /dev/null
+++ b/src/jvm/clojure/lang/TPool.java
@@ -0,0 +1,84 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Jun 22, 2007 */
+
+package clojure.lang;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TPool{
+final ConcurrentHashMap<TRef,TVal> hist;
+
+
+public TPool(){
+ this.hist = new ConcurrentHashMap<TRef,TVal>();
+}
+
+TRef createRef(){
+ return new TRef(this);
+}
+
+void pushVal(TRef tref,TVal tval){
+ //note this presumes caller has exclusive rights to tref
+ tval.prior = hist.get(tref);
+ hist.put(tref, tval);
+}
+
+TVal valAsOfPoint(TRef tref, int tpoint){
+ for(TVal tv = hist.get(tref);tv != null;tv = tv.prior)
+ {
+ if(tv.tstamp.tpoint <= tpoint)
+ return tv;
+ }
+ return null;
+}
+
+
+TVal valAsOfTime(TRef tref,long msecs){
+ for(TVal tv = hist.get(tref);tv != null;tv = tv.prior)
+ {
+ if(tv.tstamp.msecs <= msecs)
+ return tv;
+ }
+ return null;
+}
+
+void trimHistory(){
+ int ctp = Transaction.completedThroughPoint();
+ for(TVal tv : hist.values())
+ {
+ while(tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
+}
+
+void trimHistoryPriorToPoint(int tpoint){
+ int ctp = Transaction.completedThroughPoint();
+ for(TVal tv : hist.values())
+ {
+ while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
+}
+
+void trimHistoryPriorToTime(long msecs){
+ int ctp = Transaction.completedThroughPoint();
+ for(TVal tv : hist.values())
+ {
+ while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp)
+ tv = tv.prior;
+ tv.prior = null;
+ }
+}
+
+}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
index 0845174a..edf53eee 100644
--- a/src/jvm/clojure/lang/TRef.java
+++ b/src/jvm/clojure/lang/TRef.java
@@ -12,22 +12,34 @@
package clojure.lang;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
-public class TRef extends TVal implements Comparable{
-static AtomicInteger nextSeq = new AtomicInteger(1);
+public class TRef implements Comparable{
+final static AtomicInteger nextSeq = new AtomicInteger(1);
final int lockSeq;
-Lock lock;
+final AtomicInteger lockedBy;
+final TPool pool;
+volatile TVal tval;
-public TRef() {
+public TRef(TPool pool) {
+ this.pool = pool;
this.lockSeq = nextSeq.getAndIncrement();
- this.lock = new ReentrantLock();
+ this.lockedBy = new AtomicInteger();
+ this.tval = null;
}
+void push(TVal tval){
+ pool.pushVal(this,tval);
+ this.tval = tval;
+}
public int compareTo(Object o){
return lockSeq - ((TRef) o).lockSeq;
}
+
+public Object getLatestVal(){
+ //will NPE if never been set
+ return tval.val;
+}
+
}
diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java
new file mode 100644
index 00000000..fa9d80ec
--- /dev/null
+++ b/src/jvm/clojure/lang/TStamp.java
@@ -0,0 +1,24 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Jun 22, 2007 */
+
+package clojure.lang;
+
+public class TStamp{
+final int tpoint;
+final long msecs;
+
+
+public TStamp(int tpoint, long msecs){
+ this.tpoint = tpoint;
+ this.msecs = msecs;
+}
+}
diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java
index a9f47136..6d0e78ce 100644
--- a/src/jvm/clojure/lang/TVal.java
+++ b/src/jvm/clojure/lang/TVal.java
@@ -13,27 +13,15 @@
package clojure.lang;
public class TVal{
-volatile Object val;
-volatile Transaction.Info tinfo;
-volatile TVal prior;
-TVal(){
+public final Object val;
+public final TStamp tstamp;
+volatile TVal prior;
-}
-TVal(Object val, Transaction.Info tinfo, TVal prior) {
+TVal(Object val, TStamp tstamp) {
this.val = val;
- this.tinfo = tinfo;
- this.prior = prior;
+ this.tstamp = tstamp;
+ this.prior = null;
}
-
-void push(Object val,Transaction.Info tinfo) throws Exception{
- if(tinfo != null) //not newly created, clone tval part
- {
- this.prior = new TVal(this.val,this.tinfo,this.prior);
- }
- this.tinfo = tinfo;
- this.val = val;
-}
-
}
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
index 0aae1e1b..2f6e8a88 100644
--- a/src/jvm/clojure/lang/Transaction.java
+++ b/src/jvm/clojure/lang/Transaction.java
@@ -14,82 +14,93 @@ package clojure.lang;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListSet;
public class Transaction{
-public static final int COMMITTED = 0;
-public static final int WORKING = 1;
-static final Object lock = new Object();
-private static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>();
-
-static AtomicInteger tcount = new AtomicInteger(0);
-
-static Transaction getTransaction() {
- if(tcount.get() == 0)
- return null;
- return transaction.get();
+final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>();
+
+//total order on transactions
+//transactions will consume a point on init, and another on commit if writing
+final static AtomicInteger nextPoint = new AtomicInteger(1);
+
+final static ConcurrentSkipListSet<Integer> completedPoints = new ConcurrentSkipListSet<Integer>();
+
+static int completedThroughPoint(){
+ Iterator<Integer> i = completedPoints.iterator();
+ if(!i.hasNext())
+ return 0;
+ int base, tp;
+ base = tp = i.next();
+ while(i.hasNext() && i.next() == tp + 1)
+ ++tp;
+ if(tp != base)
+ completedPoints.removeAll(completedPoints.headSet(tp));
+ return tp;
}
-static void setTransaction(Transaction t){
- transaction.set(t);
+static Transaction getTransaction(){
+ return transaction.get();
}
-volatile static int nextSeq = 1;
-
-static int getNextSeq(){
- synchronized(lock){
- return nextSeq++;
- }
-}
-
-public static class Info{
-int seq;
-int status;
-
-
-Info(int seq,int status){
- this.seq = seq;
- this.status = status;
-}
+static void setTransaction(Transaction t){
+ transaction.set(t);
}
-static Info bigbang = new Info(0,COMMITTED);
-
-
-Info info;
-int startSeq;
-
-IdentityHashMap<TRef,Object> sets;
-IdentityHashMap<TRef,ISeq> commutates;
-
-
static public Object runInTransaction(IFn fn) throws Exception{
- if(getTransaction() != null)
- return fn.invoke();
-
- Transaction t = new Transaction();
- setTransaction(t);
- tcount.incrementAndGet();
- try{
- return t.run(fn);
- }
- finally{
- tcount.decrementAndGet();
- setTransaction(null);
- }
+ if(getTransaction() != null)
+ return fn.invoke();
+
+ Transaction t = new Transaction();
+ setTransaction(t);
+ try
+ {
+ return t.run(fn);
+ }
+ finally
+ {
+ if(!t.asOf)
+ {
+ completedPoints.add(t.readPoint);
+ if(t.writePoint > 0)
+ completedPoints.add(t.writePoint);
+ }
+ setTransaction(null);
+ }
}
-static public TRef tref(Object val) throws Exception{
- Transaction trans = getTransaction();
- TRef tref = new TRef();
- if(trans == null)
- tref.push(val,bigbang);
- else
- trans.doSet(tref, val);
- return tref;
+static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{
+ if(getTransaction() != null)
+ throw new Exception("As-of transactions cannot be nested");
+
+ Transaction t = new Transaction(tpoint);
+ setTransaction(t);
+ try
+ {
+ return t.run(fn);
+ }
+ finally
+ {
+ setTransaction(null);
+ }
}
-//*
+static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
+ if(getTransaction() != null)
+ throw new Exception("As-of transactions cannot be nested");
+
+ Transaction t = new Transaction(msecs);
+ setTransaction(t);
+ try
+ {
+ return t.run(fn);
+ }
+ finally
+ {
+ setTransaction(null);
+ }
+}
+/*
static public Object get(TRef tref) throws Exception{
Transaction trans = getTransaction();
if(trans != null)
@@ -105,157 +116,198 @@ static public void touch(TRef tref) throws Exception{
getTransaction().doTouch(tref);
}
-static public void commutate(TRef tref, IFn fn) throws Exception{
+static public void commute(TRef tref, IFn fn) throws Exception{
getTransaction().doCommutate(tref, fn);
}
//*/
-Object run(IFn fn) throws Exception{
- boolean done = false;
- Object ret = null;
- ArrayList<TRef> locks = null;
- ArrayList<TRef> locked = null;
-
- loop:
- while(!done){
- try
- {
- ret = fn.invoke();
- if(locks == null && (sets != null || commutates != null))
- locks = new ArrayList<TRef>();
- if(sets != null)
- locks.addAll(sets.keySet());
- if(commutates != null)
- locks.addAll(commutates.keySet());
- if(locks != null)
- {
- if(locked == null)
- locked = new ArrayList<TRef>(locks.size());
- //lock in order, to avoid deadlocks
- Collections.sort(locks);
- for(TRef tref : locks)
- {
- //will block here
- tref.lock.lock();
- locked.add(tref);
- if(sets.containsKey(tref))
- {
- //try again if the thing we are trying to set has changed since we started
- TVal curr = getCurrent(tref);
- if(curr != null && curr.tinfo.seq > startSeq)
- continue loop;
- }
- }
- }
-
- //at this point all write targets are locked
- //turn commutates into sets
- for(Map.Entry<TRef, ISeq> e : commutates.entrySet())
- {
- TRef tref = e.getKey();
- //note this will npe if tref has never been set, as designed
- Object val = getCurrent(tref).val;
- for(ISeq c = e.getValue();c!=null;c = c.rest())
- {
- IFn f = (IFn) c.first();
- val = f.invoke(val);
- }
- sets.put(tref, val);
- }
-
- //set the new vals
- for(Map.Entry<TRef, Object> entry : sets.entrySet())
- {
- TRef tref = entry.getKey();
- tref.push(entry.getValue(), info);
- }
-
- //atomic commit
- synchronized(lock){
- info.seq = getNextSeq();
- info.status = COMMITTED;
- }
-
- done = true;
- }
- finally{
- if(locked != null)
- {
- for(TRef tref : locked)
- {
- tref.lock.unlock();
- }
- locked.clear();
- }
- reset();
- if(locks != null)
- locks.clear();
- }
- }
- return ret;
+boolean asOf;
+int readPoint;
+long readTimeMsecs = 0;
+int writePoint = 0;
+
+IdentityHashMap<TRef, Object> sets;
+IdentityHashMap<TRef, LinkedList<IFn>> commutes;
+
+Transaction(boolean asOf, int readPoint, long readTimeMsecs){
+ this.asOf = asOf;
+ this.readPoint = readPoint;
+ this.readTimeMsecs = readTimeMsecs;
}
-private void reset(){
- if(sets != null)
- sets.clear();
- if(commutates != null)
- commutates.clear();
+Transaction(){
+ this(false, nextPoint.getAndIncrement(), 0);
+}
+Transaction(int readPoint){
+ this(true, readPoint, 0);
}
+Transaction(long readTimeMsecs){
+ this(true, 0, readTimeMsecs);
+}
-Transaction(){
- synchronized(lock){
- int seq = getNextSeq();
- this.info = new Info(seq, WORKING);
- this.startSeq = seq;
- }
+boolean casLock(TRef tref) throws Exception{
+ //todo - create user-controllable policy
+ for(int i = 0; i < 100; ++i)
+ {
+ if(tref.lockedBy.compareAndSet(0, readPoint))
+ return true;
+ Thread.sleep(10);
+ }
+ return false;
}
-Object doGet(TRef tref) throws Exception{
- if(sets != null && sets.containsKey(tref))
- return sets.get(tref);
+Object run(IFn fn) throws Exception{
+ boolean done = false;
+ Object ret = null;
+ ArrayList<TRef> locks = null;
+ ArrayList<TRef> locked = null;
+
+ loop:
+ //todo - create user-controllable policy
+ for(int i=0;!done && i<100;i++)
+ {
+ try
+ {
+ ret = fn.invoke();
+ //read-only trans, return right away
+ if((sets == null || sets.isEmpty()) && (commutes == null || commutes.isEmpty()))
+ {
+ done = true;
+ return ret;
+ }
+
+ if(locks == null && (sets != null || commutes != null))
+ locks = new ArrayList<TRef>();
+ if(sets != null)
+ locks.addAll(sets.keySet());
+ if(locks != null)
+ {
+ if(locked == null)
+ locked = new ArrayList<TRef>(locks.size());
+ //lock in order
+ Collections.sort(locks);
+ for(TRef tref : locks)
+ {
+ if(!casLock(tref))
+ continue loop;
+ locked.add(tref);
+ if(!commutes.containsKey(tref))
+ {
+ //try again if the thing we are trying to set has changed since we started
+ TVal curr = getCurrent(tref);
+ if(curr != null && curr.tstamp.tpoint > readPoint)
+ continue loop;
+ }
+ }
+ }
+
+ //at this point all write targets are locked
+
+ //turn commutes into sets
+ for(Map.Entry<TRef, LinkedList<IFn>> e : commutes.entrySet())
+ {
+ TRef tref = e.getKey();
+ //note this will npe if tref has never been set, as designed
+ Object val = getCurrent(tref).val;
+ for(IFn f : e.getValue())
+ {
+ val = f.invoke(val);
+ }
+ sets.put(tref, val);
+ }
+
+ //we presume we won't throw an exception after this
+ writePoint = nextPoint.getAndIncrement();
+ TStamp ts = new TStamp(writePoint, System.currentTimeMillis());
+
+ //set the new vals, unlock as we go
+ for(Map.Entry<TRef, Object> entry : sets.entrySet())
+ {
+ TRef tref = entry.getKey();
+ tref.push(new TVal(entry.getValue(), ts));
+ tref.lockedBy.set(0);
+ }
+ done = true;
+ }
+ finally
+ {
+ if(!done)
+ {
+ if(locked != null)
+ {
+ for(TRef tref : locked)
+ {
+ tref.lockedBy.set(0);
+ }
+ locked.clear();
+ }
+ if(sets != null)
+ sets.clear();
+ if(commutes != null)
+ commutes.clear();
+ if(locks != null)
+ locks.clear();
+ }
+ }
+ }
+ if(!done)
+ throw new Exception("Transaction failed after reaching retry limit");
+ return ret;
+}
- for(TVal ver = tref;ver != null;ver = ver.prior)
- {
- //note this will npe if tref has never been set, as designed
- if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq)
- return ver.val;
- }
- throw new Exception("Version not found");
+Object doGet(TRef tref) throws Exception{
+ if(sets != null && sets.containsKey(tref))
+ return sets.get(tref);
+
+ for(TVal ver = tref.tval; ver != null; ver = ver.prior)
+ {
+ //note this will npe if tref has never been set, as designed
+ if(readPoint > 0 && ver.tstamp.tpoint <= readPoint
+ ||
+ readTimeMsecs > 0 && ver.tstamp.msecs <= readTimeMsecs)
+ return ver.val;
+ }
+ throw new Exception("Version not found");
}
static TVal getCurrent(TRef tref) throws Exception{
- for(TVal ver = tref;ver != null;ver = ver.prior)
- {
- if(ver.tinfo != null && ver.tinfo.status == COMMITTED)
- return ver;
- }
- //this return only if no value was ever successfully set
- return null;
+ return tref.tval;
}
Object doSet(TRef tref, Object val) throws Exception{
- if(sets == null)
- sets = new IdentityHashMap<TRef,Object>();
- if(commutates != null && commutates.containsKey(tref))
- throw new Exception("Can't commutate and set a TRef in the same transaction");
-
- sets.put(tref,val);
- return val;
- }
+ if(asOf)
+ throw new Exception("Can't set during as-of transaction");
+ if(sets == null)
+ sets = new IdentityHashMap<TRef, Object>();
+ if(commutes != null && commutes.containsKey(tref))
+ throw new Exception("Can't commute and set a TRef in the same transaction");
+
+ sets.put(tref, val);
+ return val;
+}
void doTouch(TRef tref) throws Exception{
- doSet(tref, doGet(tref));
- }
-
-void doCommutate(TRef tref, IFn fn) throws Exception{
- if(commutates == null)
- commutates = new IdentityHashMap<TRef,ISeq>();
- if(sets != null && sets.containsKey(tref))
- throw new Exception("Can't commutate and set a TRef in the same transaction");
- commutates.put(tref, RT.cons(fn, commutates.get(tref)));
- }
+ doSet(tref, doGet(tref));
+}
+
+void doCommute(TRef tref, IFn fn) throws Exception{
+ if(asOf)
+ throw new Exception("Can't commute during as-of transaction");
+ if(commutes == null)
+ commutes = new IdentityHashMap<TRef, LinkedList<IFn>>();
+ LinkedList<IFn> cs = commutes.get(tref);
+ if(cs == null)
+ {
+ if(sets != null && sets.containsKey(tref))
+ throw new Exception("Can't commute and set a TRef in the same transaction");
+ cs = new LinkedList<IFn>();
+ commutes.put(tref, cs);
+ }
+ cs.addLast(fn);
+ sets.put(tref, fn.invoke(doGet(tref)));
+}
}