diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/TPool.java | 84 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TRef.java | 26 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TStamp.java | 24 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TVal.java | 24 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 436 |
5 files changed, 377 insertions, 217 deletions
diff --git a/src/jvm/clojure/lang/TPool.java b/src/jvm/clojure/lang/TPool.java new file mode 100644 index 00000000..9a20a325 --- /dev/null +++ b/src/jvm/clojure/lang/TPool.java @@ -0,0 +1,84 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jun 22, 2007 */ + +package clojure.lang; + +import java.util.concurrent.ConcurrentHashMap; + +public class TPool{ +final ConcurrentHashMap<TRef,TVal> hist; + + +public TPool(){ + this.hist = new ConcurrentHashMap<TRef,TVal>(); +} + +TRef createRef(){ + return new TRef(this); +} + +void pushVal(TRef tref,TVal tval){ + //note this presumes caller has exclusive rights to tref + tval.prior = hist.get(tref); + hist.put(tref, tval); +} + +TVal valAsOfPoint(TRef tref, int tpoint){ + for(TVal tv = hist.get(tref);tv != null;tv = tv.prior) + { + if(tv.tstamp.tpoint <= tpoint) + return tv; + } + return null; +} + + +TVal valAsOfTime(TRef tref,long msecs){ + for(TVal tv = hist.get(tref);tv != null;tv = tv.prior) + { + if(tv.tstamp.msecs <= msecs) + return tv; + } + return null; +} + +void trimHistory(){ + int ctp = Transaction.completedThroughPoint(); + for(TVal tv : hist.values()) + { + while(tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } +} + +void trimHistoryPriorToPoint(int tpoint){ + int ctp = Transaction.completedThroughPoint(); + for(TVal tv : hist.values()) + { + while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } +} + +void trimHistoryPriorToTime(long msecs){ + int ctp = Transaction.completedThroughPoint(); + for(TVal tv : hist.values()) + { + while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } +} + +} diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java index 0845174a..edf53eee 100644 --- a/src/jvm/clojure/lang/TRef.java +++ b/src/jvm/clojure/lang/TRef.java @@ -12,22 +12,34 @@ package clojure.lang; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicInteger; -public class TRef extends TVal implements Comparable{ -static AtomicInteger nextSeq = new AtomicInteger(1); +public class TRef implements Comparable{ +final static AtomicInteger nextSeq = new AtomicInteger(1); final int lockSeq; -Lock lock; +final AtomicInteger lockedBy; +final TPool pool; +volatile TVal tval; -public TRef() { +public TRef(TPool pool) { + this.pool = pool; this.lockSeq = nextSeq.getAndIncrement(); - this.lock = new ReentrantLock(); + this.lockedBy = new AtomicInteger(); + this.tval = null; } +void push(TVal tval){ + pool.pushVal(this,tval); + this.tval = tval; +} public int compareTo(Object o){ return lockSeq - ((TRef) o).lockSeq; } + +public Object getLatestVal(){ + //will NPE if never been set + return tval.val; +} + } diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java new file mode 100644 index 00000000..fa9d80ec --- /dev/null +++ b/src/jvm/clojure/lang/TStamp.java @@ -0,0 +1,24 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jun 22, 2007 */ + +package clojure.lang; + +public class TStamp{ +final int tpoint; +final long msecs; + + +public TStamp(int tpoint, long msecs){ + this.tpoint = tpoint; + this.msecs = msecs; +} +} diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java index a9f47136..6d0e78ce 100644 --- a/src/jvm/clojure/lang/TVal.java +++ b/src/jvm/clojure/lang/TVal.java @@ -13,27 +13,15 @@ package clojure.lang; public class TVal{ -volatile Object val; -volatile Transaction.Info tinfo; -volatile TVal prior; -TVal(){ +public final Object val; +public final TStamp tstamp; +volatile TVal prior; -} -TVal(Object val, Transaction.Info tinfo, TVal prior) { +TVal(Object val, TStamp tstamp) { this.val = val; - this.tinfo = tinfo; - this.prior = prior; + this.tstamp = tstamp; + this.prior = null; } - -void push(Object val,Transaction.Info tinfo) throws Exception{ - if(tinfo != null) //not newly created, clone tval part - { - this.prior = new TVal(this.val,this.tinfo,this.prior); - } - this.tinfo = tinfo; - this.val = val; -} - } diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 0aae1e1b..2f6e8a88 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -14,82 +14,93 @@ package clojure.lang; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentSkipListSet; public class Transaction{ -public static final int COMMITTED = 0; -public static final int WORKING = 1; -static final Object lock = new Object(); -private static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); - -static AtomicInteger tcount = new AtomicInteger(0); - -static Transaction getTransaction() { - if(tcount.get() == 0) - return null; - return transaction.get(); +final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); + +//total order on transactions +//transactions will consume a point on init, and another on commit if writing +final static AtomicInteger nextPoint = new AtomicInteger(1); + +final static ConcurrentSkipListSet<Integer> completedPoints = new ConcurrentSkipListSet<Integer>(); + +static int completedThroughPoint(){ + Iterator<Integer> i = completedPoints.iterator(); + if(!i.hasNext()) + return 0; + int base, tp; + base = tp = i.next(); + while(i.hasNext() && i.next() == tp + 1) + ++tp; + if(tp != base) + completedPoints.removeAll(completedPoints.headSet(tp)); + return tp; } -static void setTransaction(Transaction t){ - transaction.set(t); +static Transaction getTransaction(){ + return transaction.get(); } -volatile static int nextSeq = 1; - -static int getNextSeq(){ - synchronized(lock){ - return nextSeq++; - } -} - -public static class Info{ -int seq; -int status; - - -Info(int seq,int status){ - this.seq = seq; - this.status = status; -} +static void setTransaction(Transaction t){ + transaction.set(t); } -static Info bigbang = new Info(0,COMMITTED); - - -Info info; -int startSeq; - -IdentityHashMap<TRef,Object> sets; -IdentityHashMap<TRef,ISeq> commutates; - - static public Object runInTransaction(IFn fn) throws Exception{ - if(getTransaction() != null) - return fn.invoke(); - - Transaction t = new Transaction(); - setTransaction(t); - tcount.incrementAndGet(); - try{ - return t.run(fn); - } - finally{ - tcount.decrementAndGet(); - setTransaction(null); - } + if(getTransaction() != null) + return fn.invoke(); + + Transaction t = new Transaction(); + setTransaction(t); + try + { + return t.run(fn); + } + finally + { + if(!t.asOf) + { + completedPoints.add(t.readPoint); + if(t.writePoint > 0) + completedPoints.add(t.writePoint); + } + setTransaction(null); + } } -static public TRef tref(Object val) throws Exception{ - Transaction trans = getTransaction(); - TRef tref = new TRef(); - if(trans == null) - tref.push(val,bigbang); - else - trans.doSet(tref, val); - return tref; +static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{ + if(getTransaction() != null) + throw new Exception("As-of transactions cannot be nested"); + + Transaction t = new Transaction(tpoint); + setTransaction(t); + try + { + return t.run(fn); + } + finally + { + setTransaction(null); + } } -//* +static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ + if(getTransaction() != null) + throw new Exception("As-of transactions cannot be nested"); + + Transaction t = new Transaction(msecs); + setTransaction(t); + try + { + return t.run(fn); + } + finally + { + setTransaction(null); + } +} +/* static public Object get(TRef tref) throws Exception{ Transaction trans = getTransaction(); if(trans != null) @@ -105,157 +116,198 @@ static public void touch(TRef tref) throws Exception{ getTransaction().doTouch(tref); } -static public void commutate(TRef tref, IFn fn) throws Exception{ +static public void commute(TRef tref, IFn fn) throws Exception{ getTransaction().doCommutate(tref, fn); } //*/ -Object run(IFn fn) throws Exception{ - boolean done = false; - Object ret = null; - ArrayList<TRef> locks = null; - ArrayList<TRef> locked = null; - - loop: - while(!done){ - try - { - ret = fn.invoke(); - if(locks == null && (sets != null || commutates != null)) - locks = new ArrayList<TRef>(); - if(sets != null) - locks.addAll(sets.keySet()); - if(commutates != null) - locks.addAll(commutates.keySet()); - if(locks != null) - { - if(locked == null) - locked = new ArrayList<TRef>(locks.size()); - //lock in order, to avoid deadlocks - Collections.sort(locks); - for(TRef tref : locks) - { - //will block here - tref.lock.lock(); - locked.add(tref); - if(sets.containsKey(tref)) - { - //try again if the thing we are trying to set has changed since we started - TVal curr = getCurrent(tref); - if(curr != null && curr.tinfo.seq > startSeq) - continue loop; - } - } - } - - //at this point all write targets are locked - //turn commutates into sets - for(Map.Entry<TRef, ISeq> e : commutates.entrySet()) - { - TRef tref = e.getKey(); - //note this will npe if tref has never been set, as designed - Object val = getCurrent(tref).val; - for(ISeq c = e.getValue();c!=null;c = c.rest()) - { - IFn f = (IFn) c.first(); - val = f.invoke(val); - } - sets.put(tref, val); - } - - //set the new vals - for(Map.Entry<TRef, Object> entry : sets.entrySet()) - { - TRef tref = entry.getKey(); - tref.push(entry.getValue(), info); - } - - //atomic commit - synchronized(lock){ - info.seq = getNextSeq(); - info.status = COMMITTED; - } - - done = true; - } - finally{ - if(locked != null) - { - for(TRef tref : locked) - { - tref.lock.unlock(); - } - locked.clear(); - } - reset(); - if(locks != null) - locks.clear(); - } - } - return ret; +boolean asOf; +int readPoint; +long readTimeMsecs = 0; +int writePoint = 0; + +IdentityHashMap<TRef, Object> sets; +IdentityHashMap<TRef, LinkedList<IFn>> commutes; + +Transaction(boolean asOf, int readPoint, long readTimeMsecs){ + this.asOf = asOf; + this.readPoint = readPoint; + this.readTimeMsecs = readTimeMsecs; } -private void reset(){ - if(sets != null) - sets.clear(); - if(commutates != null) - commutates.clear(); +Transaction(){ + this(false, nextPoint.getAndIncrement(), 0); +} +Transaction(int readPoint){ + this(true, readPoint, 0); } +Transaction(long readTimeMsecs){ + this(true, 0, readTimeMsecs); +} -Transaction(){ - synchronized(lock){ - int seq = getNextSeq(); - this.info = new Info(seq, WORKING); - this.startSeq = seq; - } +boolean casLock(TRef tref) throws Exception{ + //todo - create user-controllable policy + for(int i = 0; i < 100; ++i) + { + if(tref.lockedBy.compareAndSet(0, readPoint)) + return true; + Thread.sleep(10); + } + return false; } -Object doGet(TRef tref) throws Exception{ - if(sets != null && sets.containsKey(tref)) - return sets.get(tref); +Object run(IFn fn) throws Exception{ + boolean done = false; + Object ret = null; + ArrayList<TRef> locks = null; + ArrayList<TRef> locked = null; + + loop: + //todo - create user-controllable policy + for(int i=0;!done && i<100;i++) + { + try + { + ret = fn.invoke(); + //read-only trans, return right away + if((sets == null || sets.isEmpty()) && (commutes == null || commutes.isEmpty())) + { + done = true; + return ret; + } + + if(locks == null && (sets != null || commutes != null)) + locks = new ArrayList<TRef>(); + if(sets != null) + locks.addAll(sets.keySet()); + if(locks != null) + { + if(locked == null) + locked = new ArrayList<TRef>(locks.size()); + //lock in order + Collections.sort(locks); + for(TRef tref : locks) + { + if(!casLock(tref)) + continue loop; + locked.add(tref); + if(!commutes.containsKey(tref)) + { + //try again if the thing we are trying to set has changed since we started + TVal curr = getCurrent(tref); + if(curr != null && curr.tstamp.tpoint > readPoint) + continue loop; + } + } + } + + //at this point all write targets are locked + + //turn commutes into sets + for(Map.Entry<TRef, LinkedList<IFn>> e : commutes.entrySet()) + { + TRef tref = e.getKey(); + //note this will npe if tref has never been set, as designed + Object val = getCurrent(tref).val; + for(IFn f : e.getValue()) + { + val = f.invoke(val); + } + sets.put(tref, val); + } + + //we presume we won't throw an exception after this + writePoint = nextPoint.getAndIncrement(); + TStamp ts = new TStamp(writePoint, System.currentTimeMillis()); + + //set the new vals, unlock as we go + for(Map.Entry<TRef, Object> entry : sets.entrySet()) + { + TRef tref = entry.getKey(); + tref.push(new TVal(entry.getValue(), ts)); + tref.lockedBy.set(0); + } + done = true; + } + finally + { + if(!done) + { + if(locked != null) + { + for(TRef tref : locked) + { + tref.lockedBy.set(0); + } + locked.clear(); + } + if(sets != null) + sets.clear(); + if(commutes != null) + commutes.clear(); + if(locks != null) + locks.clear(); + } + } + } + if(!done) + throw new Exception("Transaction failed after reaching retry limit"); + return ret; +} - for(TVal ver = tref;ver != null;ver = ver.prior) - { - //note this will npe if tref has never been set, as designed - if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq) - return ver.val; - } - throw new Exception("Version not found"); +Object doGet(TRef tref) throws Exception{ + if(sets != null && sets.containsKey(tref)) + return sets.get(tref); + + for(TVal ver = tref.tval; ver != null; ver = ver.prior) + { + //note this will npe if tref has never been set, as designed + if(readPoint > 0 && ver.tstamp.tpoint <= readPoint + || + readTimeMsecs > 0 && ver.tstamp.msecs <= readTimeMsecs) + return ver.val; + } + throw new Exception("Version not found"); } static TVal getCurrent(TRef tref) throws Exception{ - for(TVal ver = tref;ver != null;ver = ver.prior) - { - if(ver.tinfo != null && ver.tinfo.status == COMMITTED) - return ver; - } - //this return only if no value was ever successfully set - return null; + return tref.tval; } Object doSet(TRef tref, Object val) throws Exception{ - if(sets == null) - sets = new IdentityHashMap<TRef,Object>(); - if(commutates != null && commutates.containsKey(tref)) - throw new Exception("Can't commutate and set a TRef in the same transaction"); - - sets.put(tref,val); - return val; - } + if(asOf) + throw new Exception("Can't set during as-of transaction"); + if(sets == null) + sets = new IdentityHashMap<TRef, Object>(); + if(commutes != null && commutes.containsKey(tref)) + throw new Exception("Can't commute and set a TRef in the same transaction"); + + sets.put(tref, val); + return val; +} void doTouch(TRef tref) throws Exception{ - doSet(tref, doGet(tref)); - } - -void doCommutate(TRef tref, IFn fn) throws Exception{ - if(commutates == null) - commutates = new IdentityHashMap<TRef,ISeq>(); - if(sets != null && sets.containsKey(tref)) - throw new Exception("Can't commutate and set a TRef in the same transaction"); - commutates.put(tref, RT.cons(fn, commutates.get(tref))); - } + doSet(tref, doGet(tref)); +} + +void doCommute(TRef tref, IFn fn) throws Exception{ + if(asOf) + throw new Exception("Can't commute during as-of transaction"); + if(commutes == null) + commutes = new IdentityHashMap<TRef, LinkedList<IFn>>(); + LinkedList<IFn> cs = commutes.get(tref); + if(cs == null) + { + if(sets != null && sets.containsKey(tref)) + throw new Exception("Can't commute and set a TRef in the same transaction"); + cs = new LinkedList<IFn>(); + commutes.put(tref, cs); + } + cs.addLast(fn); + sets.put(tref, fn.invoke(doGet(tref))); +} } |