diff options
author | Rich Hickey <richhickey@gmail.com> | 2006-05-31 11:49:41 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2006-05-31 11:49:41 +0000 |
commit | 8b74fbd52683934c6dde25412c4d3ec86384da87 (patch) | |
tree | a7914b42768a130d511c43bc9a21bfe12fc9aded /src | |
parent | 1cd2547d56cd7f56c1149f27bd77142e3fc40b50 (diff) |
first steps towards STM
Diffstat (limited to 'src')
-rw-r--r-- | src/org/clojure/runtime/TRef.java | 50 | ||||
-rw-r--r-- | src/org/clojure/runtime/TVal.java | 26 | ||||
-rw-r--r-- | src/org/clojure/runtime/ThreadLocalData.java | 7 | ||||
-rw-r--r-- | src/org/clojure/runtime/Transaction.java | 204 |
4 files changed, 287 insertions, 0 deletions
diff --git a/src/org/clojure/runtime/TRef.java b/src/org/clojure/runtime/TRef.java new file mode 100644 index 00000000..36b2866e --- /dev/null +++ b/src/org/clojure/runtime/TRef.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich May 30, 2006 */ + +package org.clojure.runtime; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; + +public class TRef extends TVal implements Comparable{ +static AtomicInteger nextSeq = new AtomicInteger(1); + +final int lockSeq; +Lock lock; + +public TRef(ThreadLocalData tld, Object val) throws Exception{ + this.lockSeq = nextSeq.getAndIncrement(); + this.lock = new ReentrantLock(); + set(tld, val); +} + +public Object get(ThreadLocalData tld) throws Exception{ + return tld.getTransaction().get(this); +} + +public Object set(ThreadLocalData tld, Object val) throws Exception{ + return tld.getTransaction().set(this,val); +} + +public void touch(ThreadLocalData tld) throws Exception{ + tld.getTransaction().touch(this); +} + +public void commutate(ThreadLocalData tld, IFn fn) throws Exception{ + tld.getTransaction().commutate(this, fn); +} + +public int compareTo(Object o){ + return lockSeq - ((TRef) o).lockSeq; +} +} diff --git a/src/org/clojure/runtime/TVal.java b/src/org/clojure/runtime/TVal.java new file mode 100644 index 00000000..6d95fcac --- /dev/null +++ b/src/org/clojure/runtime/TVal.java @@ -0,0 +1,26 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich May 30, 2006 */ + +package org.clojure.runtime; + +public class TVal{ +volatile Object val; +volatile Transaction.Info tinfo; +volatile TVal prior; + +void push(Object val,Transaction.Info tinfo) throws Exception{ + this.prior = (TVal) this.clone(); + this.tinfo = tinfo; + this.val = val; +} + +} diff --git a/src/org/clojure/runtime/ThreadLocalData.java b/src/org/clojure/runtime/ThreadLocalData.java index 67fd0048..0bc9c665 100644 --- a/src/org/clojure/runtime/ThreadLocalData.java +++ b/src/org/clojure/runtime/ThreadLocalData.java @@ -21,6 +21,13 @@ public int mvCount = 0; public Object[] mvArray = new Object[MULTIPLE_VALUES_LIMIT]; IdentityHashMap dynamicBindings = new IdentityHashMap(); +Transaction transaction; + +public Transaction getTransaction() throws Exception{ + if(transaction == null) + throw new Exception("No active transaction"); + return transaction; +} public ThreadLocalData(IdentityHashMap dynamicBindings) { diff --git a/src/org/clojure/runtime/Transaction.java b/src/org/clojure/runtime/Transaction.java new file mode 100644 index 00000000..27f56433 --- /dev/null +++ b/src/org/clojure/runtime/Transaction.java @@ -0,0 +1,204 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich May 30, 2006 */ + +package org.clojure.runtime; + +import java.util.*; + +public class Transaction{ + +public static final int COMMITTED = 0; +public static final int WORKING = 1; +static final Object lock = new Object(); +static int nextSeq = 1; + +public static class Info{ +int seq; +int status; + +Info(int seq,int status){ + this.seq = seq; + this.status = status; +} +} + +Info info; +int startSeq; + +IdentityHashMap<TRef,Object> sets; +IdentityHashMap<TRef,Cons> commutates; +ArrayList<TRef> locks; +ArrayList<TRef> locked; + + +static public Object runInTransaction(ThreadLocalData tld,IFn fn) throws Exception{ + if(tld.transaction != null) + return fn.invoke(tld); + + tld.transaction = new Transaction(); + return tld.transaction.run(tld, fn); +} + +public Object run(ThreadLocalData tld, IFn fn) throws Exception{ + boolean done = false; + Object ret = null; + + loop: + while(!done){ + try + { + reset(); + ret = fn.invoke(tld); + if(sets != null) + getLocks().addAll(sets.keySet()); + if(commutates != null) + getLocks().addAll(commutates.keySet()); + if(locks != null) + { + if(locked == null) + locked = new ArrayList<TRef>(locks.size()); + //lock in order, to avoid deadlocks + Collections.sort(locks); + for(TRef tref : locks) + { + //will block here + tref.lock.lock(); + locked.add(tref); + if(sets.containsKey(tref)) + { + //try again if the thing we are trying to set has changed since we started + TVal curr = getCurrent(tref); + if(curr.tinfo.seq > startSeq) + continue loop; + } + } + } + + //at this point all write targets are locked + //turn commutates into sets + for(Map.Entry<TRef, Cons> e : commutates.entrySet()) + { + TRef tref = e.getKey(); + Object val = getCurrent(tref).val; + for(Cons c = e.getValue();c!=null;c = c.rest) + { + IFn f = (IFn) c.first; + val = f.invoke(tld, val); + } + sets.put(tref, val); + } + + //set the new vals + for(Map.Entry<TRef, Object> entry : sets.entrySet()) + { + TRef tref = entry.getKey(); + tref.push(entry.getValue(), info); + } + + //atomic commit + synchronized(lock){ + info.seq = getNextSeq(); + info.status = COMMITTED; + } + + done = true; + } + finally{ + if(locked != null) + { + for(TRef tref : locked) + { + tref.lock.unlock(); + } + } + } + } + return ret; +} + +ArrayList<TRef> getLocks(){ + if(locks == null) + locks = new ArrayList<TRef>(); + return locks; +} + +private void reset(){ + if(sets != null) + sets.clear(); + if(commutates != null) + commutates.clear(); + if(locks != null) + locks.clear(); + if(locked != null) + locked.clear(); +} + +int getNextSeq(){ + synchronized(lock){ + return nextSeq++; + } + +} +Transaction(){ + synchronized(lock){ + int seq = getNextSeq(); + this.info = new Info(seq, WORKING); + this.startSeq = seq; + } +} + +Object get(TRef tref) throws Exception{ + if(sets != null && sets.containsKey(tref)) + return sets.get(tref); + + for(TVal ver = tref;ver != null;ver = ver.prior) + { + if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq) + return ver.val; + } + + throw new Exception("Version not found"); + +} + +static TVal getCurrent(TRef tref) throws Exception{ + for(TVal ver = tref;ver != null;ver = ver.prior) + { + if(ver.tinfo.status == COMMITTED) + return ver; + } + throw new Exception("Version not found"); +} + +Object set(TRef tref, Object val) throws Exception{ + if(sets == null) + sets = new IdentityHashMap<TRef,Object>(); + if(commutates.containsKey(tref)) + throw new Exception("Can't commutate and set a TRef in the same transaction"); + + sets.put(tref,val); + return val; +} + +void touch(TRef tref) throws Exception{ + set(tref, get(tref)); +} + +void commutate(TRef tref, IFn fn) throws Exception{ + if(commutates == null) + commutates = new IdentityHashMap<TRef,Cons>(); + if(sets.containsKey(tref)) + throw new Exception("Can't commutate and set a TRef in the same transaction"); + commutates.put(tref, RT.cons(fn, commutates.get(tref))); +} + +} |