diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-06-24 01:28:43 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-06-24 01:28:43 +0000 |
commit | 76e5252aaf17760df635115999bc212a860c8b9d (patch) | |
tree | 7ccde66317ec933c0ffc98a4da9ad7b80cb43cf7 /src/jvm/clojure | |
parent | 97c180a498e6ca60c36b4dbf08581bc27d8aeb2a (diff) |
interim checkin
Diffstat (limited to 'src/jvm/clojure')
-rw-r--r-- | src/jvm/clojure/lang/PersistentTreeMap.java | 8 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TPool.java | 84 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TRef.java | 82 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TStamp.java | 12 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TVal.java | 7 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 382 |
6 files changed, 240 insertions, 335 deletions
diff --git a/src/jvm/clojure/lang/PersistentTreeMap.java b/src/jvm/clojure/lang/PersistentTreeMap.java index f2d4b70e..6ad0c6ca 100644 --- a/src/jvm/clojure/lang/PersistentTreeMap.java +++ b/src/jvm/clojure/lang/PersistentTreeMap.java @@ -791,7 +791,7 @@ static public void main(String args[]){ long estimatedTime = System.nanoTime() - startTime; System.out.println(); - System.out.println("_count = " + set.count() + ", time: " + estimatedTime/10000); + System.out.println("_count = " + set.count() + ", time: " + estimatedTime/1000000); System.out.println("Building ht"); Hashtable ht = new Hashtable(1001); @@ -825,7 +825,7 @@ static public void main(String args[]){ } estimatedTime = System.nanoTime() - startTime; System.out.println(); - System.out.println("size = " + ht.size() + ", time: " + estimatedTime/10000); + System.out.println("size = " + ht.size() + ", time: " + estimatedTime/1000000); System.out.println("set lookup"); startTime = System.nanoTime(); @@ -837,7 +837,7 @@ static public void main(String args[]){ ++c; } estimatedTime = System.nanoTime() - startTime; - System.out.println("notfound = " + c + ", time: " + estimatedTime/10000); + System.out.println("notfound = " + c + ", time: " + estimatedTime/1000000); System.out.println("ht lookup"); startTime = System.nanoTime(); @@ -849,7 +849,7 @@ static public void main(String args[]){ ++c; } estimatedTime = System.nanoTime() - startTime; - System.out.println("notfound = " + c + ", time: " + estimatedTime/10000); + System.out.println("notfound = " + c + ", time: " + estimatedTime/1000000); // System.out.println("_count = " + set._count + ", min: " + set.minKey() + ", max: " + set.maxKey() // + ", depth: " + set.depth()); diff --git a/src/jvm/clojure/lang/TPool.java b/src/jvm/clojure/lang/TPool.java deleted file mode 100644 index 9a20a325..00000000 --- a/src/jvm/clojure/lang/TPool.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) - * which can be found in the file CPL.TXT at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich Jun 22, 2007 */ - -package clojure.lang; - -import java.util.concurrent.ConcurrentHashMap; - -public class TPool{ -final ConcurrentHashMap<TRef,TVal> hist; - - -public TPool(){ - this.hist = new ConcurrentHashMap<TRef,TVal>(); -} - -TRef createRef(){ - return new TRef(this); -} - -void pushVal(TRef tref,TVal tval){ - //note this presumes caller has exclusive rights to tref - tval.prior = hist.get(tref); - hist.put(tref, tval); -} - -TVal valAsOfPoint(TRef tref, int tpoint){ - for(TVal tv = hist.get(tref);tv != null;tv = tv.prior) - { - if(tv.tstamp.tpoint <= tpoint) - return tv; - } - return null; -} - - -TVal valAsOfTime(TRef tref,long msecs){ - for(TVal tv = hist.get(tref);tv != null;tv = tv.prior) - { - if(tv.tstamp.msecs <= msecs) - return tv; - } - return null; -} - -void trimHistory(){ - int ctp = Transaction.completedThroughPoint(); - for(TVal tv : hist.values()) - { - while(tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - -void trimHistoryPriorToPoint(int tpoint){ - int ctp = Transaction.completedThroughPoint(); - for(TVal tv : hist.values()) - { - while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - -void trimHistoryPriorToTime(long msecs){ - int ctp = Transaction.completedThroughPoint(); - for(TVal tv : hist.values()) - { - while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - -} diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java index edf53eee..24105f7a 100644 --- a/src/jvm/clojure/lang/TRef.java +++ b/src/jvm/clojure/lang/TRef.java @@ -12,34 +12,76 @@ package clojure.lang; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; -public class TRef implements Comparable{ -final static AtomicInteger nextSeq = new AtomicInteger(1); +public class TRef{ +//reference to a chain of TVals, only the head of which may be non-committed +AtomicReference<TVal> tvals; -final int lockSeq; -final AtomicInteger lockedBy; -final TPool pool; -volatile TVal tval; +public TRef() { + this.tvals = new AtomicReference<TVal>(); +} + +public Object getCurrentVal(){ + TVal current = getCurrentTVal(); + if(current != null) + return current.val; + return null; +} + +TVal getCurrentTVal(){ + TVal head = tvals.get(); + if(head == null || head.tstamp.status == TStamp.Status.COMMITTED) + return head; + return head.prior; +} + +TVal valAsOfPoint(TRef tref, int tpoint){ + for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior) + { + if(tv.tstamp.tpoint <= tpoint) + return tv; + } + return null; +} -public TRef(TPool pool) { - this.pool = pool; - this.lockSeq = nextSeq.getAndIncrement(); - this.lockedBy = new AtomicInteger(); - this.tval = null; +TVal valAsOfTime(TRef tref,long msecs){ + for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior) + { + if(tv.tstamp.msecs <= msecs) + return tv; + } + return null; } -void push(TVal tval){ - pool.pushVal(this,tval); - this.tval = tval; +void trimHistory(){ + long ctp = Transaction.completedThroughPoint(); + for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior) + { + while(tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } } -public int compareTo(Object o){ - return lockSeq - ((TRef) o).lockSeq; + +void trimHistoryPriorToPoint(int tpoint){ + long ctp = Transaction.completedThroughPoint(); + for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior) + { + while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } } -public Object getLatestVal(){ - //will NPE if never been set - return tval.val; +void trimHistoryPriorToTime(long msecs){ + long ctp = Transaction.completedThroughPoint(); + for(TVal tv = getCurrentTVal();tv != null;tv = tv.prior) + { + while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp) + tv = tv.prior; + tv.prior = null; + } } } diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java index fa9d80ec..88c9e656 100644 --- a/src/jvm/clojure/lang/TStamp.java +++ b/src/jvm/clojure/lang/TStamp.java @@ -13,12 +13,16 @@ package clojure.lang; public class TStamp{ -final int tpoint; -final long msecs; +public static enum Status {RUNNING,COMMITTED,ABORTED,RETRY} -public TStamp(int tpoint, long msecs){ +Status status; +long tpoint; +long msecs; + + +public TStamp(long tpoint){ + this.status = Status.RUNNING; this.tpoint = tpoint; - this.msecs = msecs; } } diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java index 6d0e78ce..916cbaa0 100644 --- a/src/jvm/clojure/lang/TVal.java +++ b/src/jvm/clojure/lang/TVal.java @@ -14,14 +14,13 @@ package clojure.lang; public class TVal{ -public final Object val; +volatile Object val; public final TStamp tstamp; volatile TVal prior; - -TVal(Object val, TStamp tstamp) { +TVal(Object val, TStamp tstamp, TVal prior) { this.val = val; this.tstamp = tstamp; - this.prior = null; + this.prior = prior; } } diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java index 2f6e8a88..8a2a6c89 100644 --- a/src/jvm/clojure/lang/Transaction.java +++ b/src/jvm/clojure/lang/Transaction.java @@ -12,243 +12,173 @@ package clojure.lang; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class Transaction{ +public static int RETRY_LIMIT = 100; +public static int LOCK_WAIT_MSECS = 100; + final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); -//total order on transactions -//transactions will consume a point on init, and another on commit if writing -final static AtomicInteger nextPoint = new AtomicInteger(1); - -final static ConcurrentSkipListSet<Integer> completedPoints = new ConcurrentSkipListSet<Integer>(); - -static int completedThroughPoint(){ - Iterator<Integer> i = completedPoints.iterator(); - if(!i.hasNext()) - return 0; - int base, tp; - base = tp = i.next(); - while(i.hasNext() && i.next() == tp + 1) - ++tp; - if(tp != base) - completedPoints.removeAll(completedPoints.headSet(tp)); - return tp; +static class RetryException extends Exception{ } -static Transaction getTransaction(){ - return transaction.get(); +static class AbortException extends Exception{ } -static void setTransaction(Transaction t){ - transaction.set(t); +//total order on transactions +//transactions will consume a point for init, for each retry, and on commit if writing +final static AtomicLong nextPoint = new AtomicLong(1); + +static long getNextPoint(){ + return nextPoint.getAndIncrement(); } -static public Object runInTransaction(IFn fn) throws Exception{ - if(getTransaction() != null) - return fn.invoke(); +static class PointNode{ + final long tpoint; + volatile PointNode next; - Transaction t = new Transaction(); - setTransaction(t); - try - { - return t.run(fn); - } - finally - { - if(!t.asOf) - { - completedPoints.add(t.readPoint); - if(t.writePoint > 0) - completedPoints.add(t.writePoint); - } - setTransaction(null); - } + public PointNode(long tpoint, PointNode next){ + this.tpoint = tpoint; + this.next = next; + } + static final AtomicReferenceFieldUpdater<PointNode, PointNode> nextUpdater = + AtomicReferenceFieldUpdater.newUpdater(PointNode.class, PointNode.class, "next"); } -static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{ - if(getTransaction() != null) - throw new Exception("As-of transactions cannot be nested"); +volatile static PointNode completedPoints = new PointNode(0, null); - Transaction t = new Transaction(tpoint); - setTransaction(t); - try - { - return t.run(fn); - } - finally - { - setTransaction(null); - } +static long completedThroughPoint(){ + return completedPoints.tpoint; } -static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ - if(getTransaction() != null) - throw new Exception("As-of transactions cannot be nested"); +static void relinquish(long tpoint){ + PointNode p = completedPoints; + //update completedThroughPoint + while(p.next != null && p.next.tpoint == p.tpoint+1) + p = p.next; + completedPoints = p; + + //splice in + PointNode n; + do{ + for(n=p.next;n != null && n.tpoint < tpoint;p = n, n = p.next) + ; + }while(!PointNode.nextUpdater.compareAndSet(p,n,new PointNode(tpoint,n))); +} - Transaction t = new Transaction(msecs); - setTransaction(t); - try - { - return t.run(fn); - } - finally +static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ + synchronized(tstamp) { - setTransaction(null); + tstamp.status = newStatus; + tstamp.notifyAll(); } } -/* -static public Object get(TRef tref) throws Exception{ - Transaction trans = getTransaction(); - if(trans != null) - return trans.doGet(tref); - return getCurrent(tref).val; -} - -static public Object set(TRef tref, Object val) throws Exception{ - return getTransaction().doSet(tref,val); -} -static public void touch(TRef tref) throws Exception{ - getTransaction().doTouch(tref); -} -static public void commute(TRef tref, IFn fn) throws Exception{ - getTransaction().doCommutate(tref, fn); -} -//*/ +TStamp tstamp; -boolean asOf; -int readPoint; -long readTimeMsecs = 0; -int writePoint = 0; +void lock(TRef tref, boolean ensurePoint) throws Exception{ + TVal head = tref.tvals.get(); + //already locked by this transaction + if(head != null && head.tstamp == tstamp) + return; + boolean locked; + if(head != null && head.tstamp.status == TStamp.Status.RUNNING) + { + //already locked by another transaction, block a bit + synchronized(head.tstamp) + { + if(head.tstamp.status == TStamp.Status.RUNNING) + head.tstamp.wait(LOCK_WAIT_MSECS); + } -IdentityHashMap<TRef, Object> sets; -IdentityHashMap<TRef, LinkedList<IFn>> commutes; + locked = false; + } + else + { + TVal prior; + if(head == null || head.tstamp.status == TStamp.Status.COMMITTED) + prior = head; + else //aborted/retried at head, skip over + prior = head.prior; + + if(ensurePoint && prior != null && prior.tstamp.tpoint > tstamp.tpoint) + locked = false; + else + locked = tref.tvals.compareAndSet(head, new TVal(prior == null ? null : prior.val, tstamp, prior)); + } -Transaction(boolean asOf, int readPoint, long readTimeMsecs){ - this.asOf = asOf; - this.readPoint = readPoint; - this.readTimeMsecs = readTimeMsecs; + if(!locked) + { + statusTransition(tstamp,TStamp.Status.RETRY); + throw new RetryException(); + } } -Transaction(){ - this(false, nextPoint.getAndIncrement(), 0); +void abort() throws AbortException{ + statusTransition(tstamp,TStamp.Status.ABORTED); + throw new AbortException(); } -Transaction(int readPoint){ - this(true, readPoint, 0); +static Transaction getTransaction(){ + return transaction.get(); } -Transaction(long readTimeMsecs){ - this(true, 0, readTimeMsecs); +static void setTransaction(Transaction t){ + transaction.set(t); } -boolean casLock(TRef tref) throws Exception{ - //todo - create user-controllable policy - for(int i = 0; i < 100; ++i) +static public Object runInTransaction(IFn fn) throws Exception{ + if(getTransaction() != null) + return fn.invoke(); + + Transaction t = new Transaction(); + setTransaction(t); + try + { + return t.run(fn); + } + finally { - if(tref.lockedBy.compareAndSet(0, readPoint)) - return true; - Thread.sleep(10); + setTransaction(null); } - return false; } Object run(IFn fn) throws Exception{ boolean done = false; Object ret = null; - ArrayList<TRef> locks = null; - ArrayList<TRef> locked = null; - loop: - //todo - create user-controllable policy - for(int i=0;!done && i<100;i++) + for(int i = 0; !done && i < RETRY_LIMIT; i++) { try { + tstamp = new TStamp(getNextPoint()); ret = fn.invoke(); - //read-only trans, return right away - if((sets == null || sets.isEmpty()) && (commutes == null || commutes.isEmpty())) - { - done = true; - return ret; - } - - if(locks == null && (sets != null || commutes != null)) - locks = new ArrayList<TRef>(); - if(sets != null) - locks.addAll(sets.keySet()); - if(locks != null) - { - if(locked == null) - locked = new ArrayList<TRef>(locks.size()); - //lock in order - Collections.sort(locks); - for(TRef tref : locks) - { - if(!casLock(tref)) - continue loop; - locked.add(tref); - if(!commutes.containsKey(tref)) - { - //try again if the thing we are trying to set has changed since we started - TVal curr = getCurrent(tref); - if(curr != null && curr.tstamp.tpoint > readPoint) - continue loop; - } - } - } - - //at this point all write targets are locked - - //turn commutes into sets - for(Map.Entry<TRef, LinkedList<IFn>> e : commutes.entrySet()) - { - TRef tref = e.getKey(); - //note this will npe if tref has never been set, as designed - Object val = getCurrent(tref).val; - for(IFn f : e.getValue()) - { - val = f.invoke(val); - } - sets.put(tref, val); - } - - //we presume we won't throw an exception after this - writePoint = nextPoint.getAndIncrement(); - TStamp ts = new TStamp(writePoint, System.currentTimeMillis()); - - //set the new vals, unlock as we go - for(Map.Entry<TRef, Object> entry : sets.entrySet()) - { - TRef tref = entry.getKey(); - tref.push(new TVal(entry.getValue(), ts)); - tref.lockedBy.set(0); - } done = true; + //save the read point + long readPoint = tstamp.tpoint; + //get a commit point and time + tstamp.tpoint = getNextPoint(); + tstamp.msecs = System.currentTimeMillis(); + //commit! + statusTransition(tstamp, TStamp.Status.COMMITTED); + + relinquish(readPoint); + relinquish(tstamp.tpoint); + } + catch(RetryException retry) + { + //eat this so we retry rather than fall out } finally { if(!done) { - if(locked != null) - { - for(TRef tref : locked) - { - tref.lockedBy.set(0); - } - locked.clear(); - } - if(sets != null) - sets.clear(); - if(commutes != null) - commutes.clear(); - if(locks != null) - locks.clear(); + statusTransition(tstamp,TStamp.Status.ABORTED); + relinquish(tstamp.tpoint); } } } @@ -258,56 +188,70 @@ Object run(IFn fn) throws Exception{ } -Object doGet(TRef tref) throws Exception{ - if(sets != null && sets.containsKey(tref)) - return sets.get(tref); - for(TVal ver = tref.tval; ver != null; ver = ver.prior) +Object doGet(TRef tref) throws Exception{ + TVal head = tref.tvals.get(); + if(head == null) + return null; + if(head.tstamp == tstamp) + return head.val; + for(TVal ver = head.tstamp.status == TStamp.Status.COMMITTED?head:head.prior; ver != null; ver = ver.prior) { - //note this will npe if tref has never been set, as designed - if(readPoint > 0 && ver.tstamp.tpoint <= readPoint - || - readTimeMsecs > 0 && ver.tstamp.msecs <= readTimeMsecs) + if(ver.tstamp.tpoint <= tstamp.tpoint) return ver.val; } - throw new Exception("Version not found"); -} - -static TVal getCurrent(TRef tref) throws Exception{ - return tref.tval; + return null; } Object doSet(TRef tref, Object val) throws Exception{ - if(asOf) - throw new Exception("Can't set during as-of transaction"); - if(sets == null) - sets = new IdentityHashMap<TRef, Object>(); - if(commutes != null && commutes.containsKey(tref)) - throw new Exception("Can't commute and set a TRef in the same transaction"); - - sets.put(tref, val); + lock(tref,true); + tref.tvals.get().val = val; return val; } void doTouch(TRef tref) throws Exception{ - doSet(tref, doGet(tref)); + lock(tref,true); } void doCommute(TRef tref, IFn fn) throws Exception{ - if(asOf) - throw new Exception("Can't commute during as-of transaction"); - if(commutes == null) - commutes = new IdentityHashMap<TRef, LinkedList<IFn>>(); - LinkedList<IFn> cs = commutes.get(tref); - if(cs == null) + lock(tref,false); + TVal head = tref.tvals.get(); + head.val = fn.invoke(head.val); +} + + +/* +static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{ + if(getTransaction() != null) + throw new Exception("As-of transactions cannot be nested"); + + Transaction t = new Transaction(tpoint); + setTransaction(t); + try { - if(sets != null && sets.containsKey(tref)) - throw new Exception("Can't commute and set a TRef in the same transaction"); - cs = new LinkedList<IFn>(); - commutes.put(tref, cs); + return fn.invoke();//t.run(fn); + } + finally + { + setTransaction(null); + } +} + +static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ + if(getTransaction() != null) + throw new Exception("As-of transactions cannot be nested"); + + Transaction t = new Transaction(msecs); + setTransaction(t); + try + { + return fn.invoke();//t.run(fn); + } + finally + { + setTransaction(null); } - cs.addLast(fn); - sets.put(tref, fn.invoke(doGet(tref))); } + */ } |