diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-27 02:35:23 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-27 02:35:23 +0000 |
commit | 1ae93cc079b2a8c34537b576abb6a4f3cd302a9f (patch) | |
tree | 6fbb03f8ce4620a2f5549c890a63b3a556a8c0c5 /src | |
parent | f39e88435ebb9bba67ce46a758dce82fc71eb0be (diff) |
lock based STM
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/LockingTransaction.java | 443 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 300 |
2 files changed, 743 insertions, 0 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java new file mode 100644 index 00000000..50fd628d --- /dev/null +++ b/src/jvm/clojure/lang/LockingTransaction.java @@ -0,0 +1,443 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jul 26, 2007 */ + +package clojure.lang; + +import java.util.*; +import java.util.concurrent.*; + +public class LockingTransaction{ + +public static int RETRY_LIMIT = 1000; +public static int LOCK_WAIT_MSECS = 100; + +final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>(); + +static class RetryException extends Exception{ +} + +static class AbortException extends Exception{ +} + +//total order on transactions +//transactions will consume a point for init, for each retry, and on commit if writing +private static long lastPoint = 0; +final static PriorityQueue<Long> points = new PriorityQueue<Long>(); + + +long getCommitPoint(){ + synchronized(points) + { + points.remove(readPoint); + completedPriorPoint = completedThroughPoint(); + ++lastPoint; + return lastPoint; + } +} + +static long completedThroughPoint(){ + synchronized(points) + { + Long p = points.peek(); + if(p != null) + return p - 1; + return lastPoint; + } +} + +void relinquishReadPoint(){ + synchronized(points) + { + points.remove(readPoint); + } +} + +void stop(){ + if(tstatus.running) + { + synchronized(tstatus) + { + tstatus.running = false; + tstatus.notifyAll(); + } + vals.clear(); + sets.clear(); + commutes.clear(); + } +} + + +Ref.TStatus tstatus; +long completedPriorPoint; +long readPoint; +HashMap<Ref, Object> vals = new HashMap<Ref, Object>(); +HashSet<Ref> sets = new HashSet<Ref>(); +TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>(); + +void getReadPoint(){ + synchronized(points) + { + ++lastPoint; + points.add(lastPoint); + readPoint = lastPoint; + } +} + +//returns the most recent val +Object lock(Ref ref, boolean ensurePoint) throws Exception{ + boolean unlocked = false; + try + { + ref.lock.writeLock().lock(); + Ref.TStatus status = ref.tstatus; + + if(status != null && status != tstatus && status.running) + { + ref.lock.writeLock().unlock(); + unlocked = true; + stop(); + synchronized(status) + { + if(status.running) + status.wait(LOCK_WAIT_MSECS); + } + throw new RetryException(); + } + if(ensurePoint && ref.tvals != null && ref.tvals.point > readPoint) + { + stop(); + throw new RetryException(); + } + ref.tstatus = tstatus; + return ref.tvals == null ? null : ref.tvals.val; + } + finally + { + if(!unlocked) + ref.lock.writeLock().unlock(); + } +} + +void abort() throws AbortException{ + stop(); + throw new AbortException(); +} + + +static LockingTransaction getEx() throws Exception{ + LockingTransaction t = transaction.get(); + if(!t.tstatus.running) + throw new Exception("No transaction running"); + return t; +} + +static public Object runInTransaction(IFn fn) throws Exception{ + LockingTransaction t = transaction.get(); + if(t == null) + transaction.set(t = new LockingTransaction()); + + if(t.tstatus != null && t.tstatus.running) + return fn.invoke(); + + return t.run(fn); +} + +Object run(IFn fn) throws Exception{ + boolean done = false; + Object ret = null; + ArrayList<Ref> locked = new ArrayList<Ref>(); + + for(int i = 0; !done && i < RETRY_LIMIT; i++) + { + try + { + getReadPoint(); + tstatus = new Ref.TStatus(); + ret = fn.invoke(); + + for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet()) + { + Ref ref = e.getKey(); + ref.lock.writeLock().lock(); + locked.add(ref); + Ref.TStatus status = ref.tstatus; + if(status != null && status != tstatus && status.running) + throw new RetryException(); + Object val = ref.tvals == null ? null : ref.tvals.val; + if(!sets.contains(ref)) + vals.put(ref, val); + for(IFn f : e.getValue()) + { + vals.put(ref, f.invoke(vals.get(ref))); + } + } + for(Ref ref : sets) + { + if(!commutes.containsKey(ref)) + { + ref.lock.writeLock().lock(); + locked.add(ref); + } + } + + //at this point, all values calced, all refs to be written locked + //no more client code to be called + long commitPoint = getCommitPoint(); + long msecs = System.currentTimeMillis(); + for(Map.Entry<Ref, Object> e : vals.entrySet()) + { + Ref ref = e.getKey(); + ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals); + ref.tstatus = null; + //auto-trim + for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior) + { + if(tv.point <= completedPriorPoint) + tv.prior = null; + } + } + done = true; + } + catch(RetryException retry) + { + //eat this so we retry rather than fall out + } + finally + { + for(int k = locked.size() - 1; k >= 0; --k) + { + locked.get(k).lock.writeLock().unlock(); + } + locked.clear(); + stop(); + if(!done) + relinquishReadPoint(); + } + } + if(!done) + throw new Exception("Transaction failed after reaching retry limit"); + return ret; +} + + +Object doGet(Ref ref) throws Exception{ + if(vals.containsKey(ref)) + return vals.get(ref); + try + { + ref.lock.readLock().lock(); + for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) + { + if(ver.point <= readPoint) + return ver.val; + } + } + finally + { + ref.lock.readLock().unlock(); + } + + throw new IllegalStateException(ref.toString() + " is unbound."); +} + +Object doSet(Ref ref, Object val) throws Exception{ + if(commutes.containsKey(ref)) + throw new IllegalStateException("Can't set after commute"); + if(!sets.contains(ref)) + { + sets.add(ref); + lock(ref, true); + } + vals.put(ref, val); + return val; +} + +void doTouch(Ref ref) throws Exception{ + lock(ref, true); +} + +Object doCommute(Ref ref, IFn fn) throws Exception{ + if(!vals.containsKey(ref)) + { + Object val = null; + try + { + ref.lock.readLock().lock(); + val = ref.tvals == null ? null : ref.tvals.val; +// for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior) +// { +// if(ver.point <= readPoint) +// { +// val = ver.val; +// break; +// } +// } + } + finally + { + ref.lock.readLock().unlock(); + } + vals.put(ref, val); + } + ArrayList<IFn> fns = commutes.get(ref); + if(fns == null) + commutes.put(ref, fns = new ArrayList<IFn>()); + fns.add(fn); + Object ret = fn.invoke(vals.get(ref)); + vals.put(ref, ret); + return ret; +} + + +//for test +static CyclicBarrier barrier; + +public static void main(String[] args){ + try + { + if(args.length != 4) + System.err.println("Usage: LockingTransaction nthreads nitems niters ninstances"); + int nthreads = Integer.parseInt(args[0]); + int nitems = Integer.parseInt(args[1]); + int niters = Integer.parseInt(args[2]); + int ninstances = Integer.parseInt(args[3]); + + final ArrayList<Ref> items = new ArrayList(nitems); + for(int i = 0; i < nitems; i++) + items.add(new Ref(0)); + + + class Incr extends AFn{ + public Object invoke(Object arg1) throws Exception{ + Integer i = (Integer) arg1; + return i + 1; + } + + public Obj withMeta(IPersistentMap meta){ + throw new UnsupportedOperationException(); + + } + } + + class Commuter extends AFn implements Callable{ + int niters; + List<Ref> items; + Incr incr; + + + public Commuter(int niters, List<Ref> items){ + this.niters = niters; + this.items = items; + this.incr = new Incr(); + } + + public Object call() throws Exception{ + long nanos = 0; + for(int i = 0; i < niters; i++) + { + long start = System.nanoTime(); + Transaction.runInTransaction(this); + long dur = System.nanoTime() - start; + nanos += dur; + } + return nanos; + } + + public Object invoke() throws Exception{ + for(Ref tref : items) + { + LockingTransaction.getEx().doCommute(tref, incr); + } + return null; + } + + public Obj withMeta(IPersistentMap meta){ + throw new UnsupportedOperationException(); + + } + } + + class Incrementer extends AFn implements Callable{ + int niters; + List<Ref> items; + + + public Incrementer(int niters, List<Ref> items){ + this.niters = niters; + this.items = items; + } + + public Object call() throws Exception{ + long nanos = 0; + for(int i = 0; i < niters; i++) + { + long start = System.nanoTime(); + LockingTransaction.runInTransaction(this); + long dur = System.nanoTime() - start; + nanos += dur; + } + return nanos; + } + + public Object invoke() throws Exception{ + for(Ref tref : items) + { + //Transaction.get().doTouch(tref); + LockingTransaction t = LockingTransaction.getEx(); + int val = (Integer) t.doGet(tref); + t.doSet(tref, val + 1); + } + return null; + } + + public Obj withMeta(IPersistentMap meta){ + throw new UnsupportedOperationException(); + + } + } + + ArrayList<Callable<Long>> tasks = new ArrayList(nthreads); + for(int i = 0; i < nthreads; i++) + { + ArrayList<Ref> si = (ArrayList<Ref>) items.clone(); + Collections.shuffle(si); + tasks.add(new Incrementer(niters, si)); + } + ExecutorService e = Executors.newFixedThreadPool(nthreads); + + if(barrier == null) + barrier = new CyclicBarrier(ninstances); + barrier.await(); + long start = System.nanoTime(); + List<Future<Long>> results = e.invokeAll(tasks); + long estimatedTime = System.nanoTime() - start; + System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, + estimatedTime / 1000000); + e.shutdown(); + barrier.await(); + for(Future<Long> res : results) + { + System.out.printf("%d, ", res.get() / 1000000); + } + System.out.println(); + for(Ref item : items) + { + System.out.printf("%d, ", (Integer) item.currentVal()); + } + } + catch(Exception ex) + { + ex.printStackTrace(); + } +} + +} diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java new file mode 100644 index 00000000..1a2f0153 --- /dev/null +++ b/src/jvm/clojure/lang/Ref.java @@ -0,0 +1,300 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jul 25, 2007 */ + +package clojure.lang; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class Ref implements IFn{ + +public static class TVal{ + final Object val; + final long point; + long msecs; + TVal prior; + + TVal(Object val, long point, long msecs, TVal prior){ + this.val = val; + this.point = point; + this.prior = prior; + this.msecs = msecs; + } + +} + +public static class TStatus{ + boolean running = true; +} + +final static AtomicLong ids = new AtomicLong(); +TVal tvals; +transient volatile InheritableThreadLocal<Binding> dvals; +final ReentrantReadWriteLock lock; +TStatus tstatus; +final long id; + +public Ref(){ + this.tvals = null; + this.dvals = null; + this.tstatus = null; + lock = new ReentrantReadWriteLock(); + id = ids.getAndIncrement(); +} + +public Ref(Object initVal){ + this(); + tvals = new TVal(initVal, 0, System.currentTimeMillis(), null); +} + +//ok out of transaction +public Object currentVal(){ + Binding b = getThreadBinding(); + if(b != null) + return b.val; + try + { + lock.readLock().lock(); + if(tvals != null) + return tvals.val; + throw new IllegalStateException(this.toString() + " is unbound."); + } + finally + { + lock.readLock().unlock(); + } +} + + +final Binding getThreadBinding(){ + if(dvals != null) + return dvals.get(); + return null; +} + +public void pushThreadBinding(Object val){ + if(dvals == null) + { + synchronized(this) + { + if(dvals == null) + dvals = new InheritableThreadLocal(); + } + } + dvals.set(new Binding(val, dvals.get())); +} + +public void popThreadBinding() throws Exception{ + Binding b; + if(dvals == null || (b = dvals.get()) == null) + throw new Exception("Can't pop unbound ref"); + dvals.set(b.rest); +} + +//* + +//must be dynamically bound or transactional read +public Object val() throws Exception{ + Binding b = getThreadBinding(); + if(b != null) + return b.val; + return LockingTransaction.getEx().doGet(this); +} + +public Object set(Object val) throws Exception{ + Binding b = getThreadBinding(); + if(b != null) + return (b.val = val); + //allow out-of-transaction inits via set?? + if(!isBound()) + { + //no + } + return LockingTransaction.getEx().doSet(this, val); +} + +public Object commute(IFn fn) throws Exception{ + Binding b = getThreadBinding(); + if(b != null) + return (b.val = fn.invoke(b.val)); + return LockingTransaction.getEx().doCommute(this, fn); +} + +public void touch() throws Exception{ + Binding b = getThreadBinding(); + if(b == null) + LockingTransaction.getEx().doTouch(this); +} + +//*/ +boolean isBound(){ + if(dvals != null && dvals.get() != null) + return true; + try + { + lock.readLock().lock(); + return tvals != null; + } + finally + { + lock.readLock().unlock(); + } +} + +void trimHistory(){ + long ctp = Transaction.completedThroughPoint(); + try + { + lock.writeLock().lock(); + for(TVal tv = tvals; tv != null; tv = tv.prior) + { + if(tv.point <= ctp) + tv.prior = null; + } + } + finally + { + lock.writeLock().unlock(); + } +} + +final public IFn fn(){ + return (IFn) currentVal(); +} + +public Object invoke() throws Exception{ + return fn().invoke(); +} + +public Object invoke(Object arg1) throws Exception{ + return fn().invoke(arg1); +} + +public Object invoke(Object arg1, Object arg2) throws Exception{ + return fn().invoke(arg1, arg2); +} + +public Object invoke(Object arg1, Object arg2, Object arg3) throws Exception{ + return fn().invoke(arg1, arg2, arg3); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7) + throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13) + throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14) + throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16, Object arg17) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16, arg17); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16, Object arg17, Object arg18) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16, arg17, arg18); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16, Object arg17, Object arg18, Object arg19) throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16, arg17, arg18, arg19); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20) + throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16, arg17, arg18, arg19, arg20); +} + +public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, + Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, + Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20, + Object... args) + throws Exception{ + return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, + arg16, arg17, arg18, arg19, arg20, args); +} + +public Object applyTo(ISeq arglist) throws Exception{ + return AFn.applyToHelper(this, arglist); +} + +} |