diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-07-31 15:40:54 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-07-31 15:40:54 +0000 |
commit | 4fb15c3fa20e22258c4aa9a47866443fd80da74a (patch) | |
tree | faeb013988ebc828d5828834aae4a426577ee215 /src | |
parent | df5c458d0a4ce0fe590f6ce981bd62c19c15558f (diff) |
removed old transaction code
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/TObj.java | 48 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TRef.java | 314 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TStamp.java | 29 | ||||
-rw-r--r-- | src/jvm/clojure/lang/TVal.java | 26 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Transaction.java | 434 |
5 files changed, 0 insertions, 851 deletions
diff --git a/src/jvm/clojure/lang/TObj.java b/src/jvm/clojure/lang/TObj.java deleted file mode 100644 index e4366f1f..00000000 --- a/src/jvm/clojure/lang/TObj.java +++ /dev/null @@ -1,48 +0,0 @@ -/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-package clojure.lang;
-
-abstract public class TObj implements IObj{
-TRef _attrs;
- /*
-public TObj() throws Exception{
- this._attrs = Transaction.tref(PersistentArrayMap.EMPTY);
-}
-
-
-public Object putAttr( Object key, Object val) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- t = t.assoc(key, val);
- Transaction.set(_attrs,t);
- return val;
-}
-
-public Object getAttr( Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- return t.get(key);
-}
-
-public boolean hasAttr( Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- return t.contains(key);
-}
-
-public IPersistentMap attrs() throws Exception {
- return (IPersistentMap) Transaction.get(_attrs);
-}
-
-public void removeAttr(Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- t = t.without(key);
- Transaction.set(_attrs,t);
-}
-*/
-}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java deleted file mode 100644 index 1f25ac2a..00000000 --- a/src/jvm/clojure/lang/TRef.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) - * which can be found in the file CPL.TXT at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich May 30, 2006 */ - -package clojure.lang; - -import java.util.concurrent.atomic.AtomicReference; - -public class TRef<T> extends AFn{ -//reference to a chain of TVals, only the head of which may be non-committed -final AtomicReference<TVal> tvals; -final AtomicReference<InheritableThreadLocal> dvals; - - -public TRef(){ - this.tvals = new AtomicReference<TVal>(); - this.dvals = new AtomicReference<InheritableThreadLocal>(); -} - -public Obj withMeta(IPersistentMap meta){ - return new TRef(meta, tvals, dvals); -} - - -private TRef(IPersistentMap meta, AtomicReference<TVal> tvals, AtomicReference<InheritableThreadLocal> dvals){ - super(meta); - this.tvals = tvals; - this.dvals = dvals; -} - -public TRef(T initVal){ - this(); - tvals.set(new TVal(initVal, Transaction.ZERO_POINT, null)); -} - -public boolean equals(Object o){ - if(this == o) return true; - if(o == null || TRef.class != o.getClass()) return false; - - TRef other = (TRef) o; - - return dvals == other.dvals && tvals == other.tvals; -} - -public int hashCode(){ - return RT.hashCombine(dvals.hashCode(), tvals.hashCode()); -} - -public T currentVal(){ - Binding b = getThreadBinding(); - if(b != null) - return (T) b.val; - TVal current = getCurrentTVal(); - if(current != null) - return (T) current.val; - throw new IllegalStateException(this.toString() + " is unbound."); -} - -public T val() throws Exception{ - Binding b = getThreadBinding(); - if(b != null) - return (T) b.val; - Transaction t = Transaction.get(); - if(t != null) - return (T) t.doGet(this); - throw new IllegalStateException(this.toString() + " is unbound."); -} - -final Binding getThreadBinding(){ - InheritableThreadLocal dv = dvals.get(); - if(dv != null) - return (Binding) dv.get(); - return null; -} - -public void pushThreadBinding(T val){ - InheritableThreadLocal dv = dvals.get(); - if(dv == null) - { - dvals.compareAndSet(null, new InheritableThreadLocal()); - dv = dvals.get(); - } - dv.set(new Binding(val, (Binding) dv.get())); -} - -public void popThreadBinding() throws Exception{ - InheritableThreadLocal dv = dvals.get(); - Binding b; - if(dv == null || (b = (Binding) dv.get()) == null) - throw new Exception("Can't pop unbound ref"); - dv.set(b.rest); -} - -public T set(T val) throws Exception{ - Binding b = getThreadBinding(); - if(b != null) - return (T) (b.val = val); - //allow out-of-transaction inits? - if(!isBound()) - { - tvals.set(new TVal(val, Transaction.ZERO_POINT, null)); - return val; - } - return (T) Transaction.getEx().doSet(this, val); -} - -public T commute(IFn fn) throws Exception{ - Binding b = getThreadBinding(); - if(b != null) - return (T) (b.val = fn.invoke(b.val)); - return (T) Transaction.getEx().doCommute(this, fn); -} - -public void touch() throws Exception{ - Transaction.getEx().doTouch(this); -} - -boolean isBound(){ - InheritableThreadLocal dv = dvals.get(); - return (dv != null && dv.get() != null) - || - getCurrentTVal() != null; -} - -TVal getCurrentTVal(){ - TVal head = tvals.get(); - if(head == null || head.tstamp.status == TStamp.Status.COMMITTED) - return head; - return head.prior; -} - -TVal valAsOfPoint(TRef tref, int tpoint){ - for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) - { - if(tv.tstamp.tpoint <= tpoint) - return tv; - } - return null; -} - -TVal valAsOfTime(TRef tref, long msecs){ - for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) - { - if(tv.tstamp.msecs <= msecs) - return tv; - } - return null; -} - -void trimHistory(){ - long ctp = Transaction.completedThroughPoint(); - for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) - { - while(tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - -void trimHistoryPriorToPoint(long tpoint){ - long ctp = Transaction.completedThroughPoint(); - for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) - { - while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - -void trimHistoryPriorToTime(long msecs){ - long ctp = Transaction.completedThroughPoint(); - for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior) - { - while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp) - tv = tv.prior; - tv.prior = null; - } -} - - -final public IFn fn(){ - return (IFn) currentVal(); -} - -public Object invoke() throws Exception{ - return fn().invoke(); -} - -public Object invoke(Object arg1) throws Exception{ - return fn().invoke(arg1); -} - -public Object invoke(Object arg1, Object arg2) throws Exception{ - return fn().invoke(arg1, arg2); -} - -public Object invoke(Object arg1, Object arg2, Object arg3) throws Exception{ - return fn().invoke(arg1, arg2, arg3); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7) - throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13) - throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14) - throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16, Object arg17) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16, arg17); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16, Object arg17, Object arg18) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16, arg17, arg18); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16, Object arg17, Object arg18, Object arg19) throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16, arg17, arg18, arg19); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20) - throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16, arg17, arg18, arg19, arg20); -} - -public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7, - Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14, - Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20, - Object... args) - throws Exception{ - return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, - arg16, arg17, arg18, arg19, arg20, args); -} - -} diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java deleted file mode 100644 index 50879cd9..00000000 --- a/src/jvm/clojure/lang/TStamp.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) - * which can be found in the file CPL.TXT at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich Jun 22, 2007 */ - -package clojure.lang; - -public class TStamp{ - -public static enum Status{ - RUNNING, COMMITTED, ABORTED, RETRY -} - -volatile Status status; -volatile long tpoint; -volatile long msecs; - - -public TStamp(Status status){ - this.status = status; -} -} diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java deleted file mode 100644 index 916cbaa0..00000000 --- a/src/jvm/clojure/lang/TVal.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) - * which can be found in the file CPL.TXT at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich May 30, 2006 */ - -package clojure.lang; - -public class TVal{ - -volatile Object val; -public final TStamp tstamp; -volatile TVal prior; - -TVal(Object val, TStamp tstamp, TVal prior) { - this.val = val; - this.tstamp = tstamp; - this.prior = prior; -} -} diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java deleted file mode 100644 index c501c346..00000000 --- a/src/jvm/clojure/lang/Transaction.java +++ /dev/null @@ -1,434 +0,0 @@ -/** - * Copyright (c) Rich Hickey. All rights reserved. - * The use and distribution terms for this software are covered by the - * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) - * which can be found in the file CPL.TXT at the root of this distribution. - * By using this software in any fashion, you are agreeing to be bound by - * the terms of this license. - * You must not remove this notice, or any other, from this software. - **/ - -/* rich May 30, 2006 */ - -package clojure.lang; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.*; -import java.util.*; - -public class Transaction{ - -public static int RETRY_LIMIT = 1000; -public static int LOCK_WAIT_MSECS = 100; - -final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>(); - -static class RetryException extends Exception{ -} - -static class AbortException extends Exception{ -} - -//total order on transactions -//transactions will consume a point for init, for each retry, and on commit if writing -private static long nextPoint = 1; -final static PriorityQueue<Long> points = new PriorityQueue<Long>(); - -void getReadPoint(){ - synchronized(points) - { - completedPriorPoint = completedThroughPoint(); - points.add(nextPoint); - readPoint = nextPoint++; - } -} - -static long getCommitPoint(){ - synchronized(points) - { - return nextPoint++; - } -} - -final static TStamp ZERO_POINT = new TStamp(TStamp.Status.COMMITTED); - -static long completedThroughPoint(){ - synchronized(points) - { - Long p = points.peek(); - if(p != null) - return p - 1; - return nextPoint - 1; - } -} - -static void relinquish(long tpoint){ - synchronized(points) - { - points.remove(tpoint); - } -} - -static void statusTransition(TStamp tstamp, TStamp.Status newStatus){ - synchronized(tstamp) - { - tstamp.status = newStatus; - tstamp.notifyAll(); - } -} - - -TStamp tstamp; -long completedPriorPoint; -long readPoint; - -TVal lock(TRef tref, boolean ensurePoint) throws Exception{ - TVal head = (TVal) tref.tvals.get(); - //already locked by this transaction - if(head != null && head.tstamp == tstamp) - return head; - if(head != null && head.tstamp.status == TStamp.Status.RUNNING) - { - //already locked by another transaction, block a bit - //first drop our locks - statusTransition(tstamp, TStamp.Status.RETRY); - synchronized(head.tstamp) - { - if(head.tstamp.status == TStamp.Status.RUNNING) - head.tstamp.wait(LOCK_WAIT_MSECS); - } - - throw new RetryException(); - } - else - { - TVal prior; - if(head == null || head.tstamp.status == TStamp.Status.COMMITTED) - { - prior = head; - } - else //aborted/retried at head, skip over - prior = head.prior; - TVal ret = null; - if((ensurePoint && prior != null && prior.tstamp.tpoint > readPoint) - || - !tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior))) - { - statusTransition(tstamp, TStamp.Status.RETRY); - throw new RetryException(); - } - //auto-trim - for(TVal tv = prior; tv != null; tv = tv.prior) - { - if(tv.tstamp.tpoint <= completedPriorPoint) - tv.prior = null; - } - return ret; - } -} - -void abort() throws AbortException{ - statusTransition(tstamp, TStamp.Status.ABORTED); - throw new AbortException(); -} - -static Transaction get(){ - return transaction.get(); -} - -static Transaction getEx() throws Exception{ - Transaction t = transaction.get(); - if(t == null) - throw new Exception("No transaction running"); - return t; -} - -static void setTransaction(Transaction t){ - transaction.set(t); -} - -static public Object runInTransaction(IFn fn) throws Exception{ - if(get() != null) - return fn.invoke(); - - Transaction t = new Transaction(); - setTransaction(t); - try - { - return t.run(fn); - } - finally - { - setTransaction(null); - } -} - -Object run(IFn fn) throws Exception{ - boolean done = false; - Object ret = null; - - for(int i = 0; !done && i < RETRY_LIMIT; i++) - { - try - { - getReadPoint(); - tstamp = new TStamp(TStamp.Status.RUNNING); - ret = fn.invoke(); - done = true; - tstamp.msecs = System.currentTimeMillis(); - //get a commit point + alter status, atomically - synchronized(points) - { - tstamp.tpoint = getCommitPoint(); - //commit! - statusTransition(tstamp, TStamp.Status.COMMITTED); - relinquish(readPoint); - //relinquish(tstamp.tpoint); - } - } - catch(RetryException retry) - { - //eat this so we retry rather than fall out - } - finally - { - if(!done) - { - statusTransition(tstamp, TStamp.Status.ABORTED); - relinquish(readPoint); - //relinquish(tstamp.tpoint); - } - } - } - if(!done) - throw new Exception("Transaction failed after reaching retry limit"); - return ret; -} - - -Object doGet(TRef tref) throws Exception{ - TVal head = (TVal) tref.tvals.get(); - if(head == null) - return null; - if(head.tstamp == tstamp) - return head.val; - TVal ver; - switch(head.tstamp.status) - { - case COMMITTED: - ver = head; - break; - case RETRY: - case ABORTED: - ver = head.prior; - break; - default: - //ensure a running->commit transition happens before/after our read point - synchronized(head.tstamp) - { - ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior; - } - break; - } - - for(; ver != null; ver = ver.prior) - { - if(ver.tstamp.tpoint <= readPoint) - return ver.val; - } - throw new IllegalStateException(tref.toString() + " is unbound."); -} - -Object doSet(TRef tref, Object val) throws Exception{ - TVal head = lock(tref, true); - head.val = val; - return val; -} - -void doTouch(TRef tref) throws Exception{ - lock(tref, true); -} - -Object doCommute(TRef tref, IFn fn) throws Exception{ - TVal head = lock(tref, false); - return head.val = fn.invoke(head.val); -} - -/* -static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{ - if(get() != null) - throw new Exception("As-of transactions cannot be nested"); - - Transaction t = new Transaction(tpoint); - setTransaction(t); - try - { - return fn.invoke();//t.run(fn); - } - finally - { - setTransaction(null); - } -} - -static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{ - if(get() != null) - throw new Exception("As-of transactions cannot be nested"); - - Transaction t = new Transaction(msecs); - setTransaction(t); - try - { - return fn.invoke();//t.run(fn); - } - finally - { - setTransaction(null); - } -} - - */ - -//for test -static CyclicBarrier barrier; - -public static void main(String[] args){ - try - { - if(args.length != 4) - System.err.println("Usage: Transaction nthreads nitems niters ninstances"); - int nthreads = Integer.parseInt(args[0]); - int nitems = Integer.parseInt(args[1]); - int niters = Integer.parseInt(args[2]); - int ninstances = Integer.parseInt(args[3]); - - final ArrayList<TRef> items = new ArrayList(nitems); - for(int i = 0; i < nitems; i++) - items.add(new TRef(0)); - - - class Incr extends AFn{ - public Object invoke(Object arg1) throws Exception{ - Integer i = (Integer) arg1; - return i + 1; - } - - public Obj withMeta(IPersistentMap meta){ - throw new UnsupportedOperationException(); - - } - } - - class Commuter extends AFn implements Callable{ - int niters; - List<TRef> items; - Incr incr; - - - public Commuter(int niters, List<TRef> items){ - this.niters = niters; - this.items = items; - this.incr = new Incr(); - } - - public Object call() throws Exception{ - long nanos = 0; - for(int i = 0; i < niters; i++) - { - long start = System.nanoTime(); - Transaction.runInTransaction(this); - long dur = System.nanoTime() - start; - nanos += dur; - } - return nanos; - } - - public Object invoke() throws Exception{ - for(TRef tref : items) - { - Transaction.get().doCommute(tref, incr); - } - return null; - } - - public Obj withMeta(IPersistentMap meta){ - throw new UnsupportedOperationException(); - - } - } - - class Incrementer extends AFn implements Callable{ - int niters; - List<TRef> items; - - - public Incrementer(int niters, List<TRef> items){ - this.niters = niters; - this.items = items; - } - - public Object call() throws Exception{ - long nanos = 0; - for(int i = 0; i < niters; i++) - { - long start = System.nanoTime(); - Transaction.runInTransaction(this); - long dur = System.nanoTime() - start; - nanos += dur; - } - return nanos; - } - - public Object invoke() throws Exception{ - for(TRef tref : items) - { - //Transaction.get().doTouch(tref); - Transaction t = Transaction.get(); - int val = (Integer) t.doGet(tref); - t.doSet(tref, val + 1); - } - return null; - } - - public Obj withMeta(IPersistentMap meta){ - throw new UnsupportedOperationException(); - - } - } - - ArrayList<Callable<Long>> tasks = new ArrayList(nthreads); - for(int i = 0; i < nthreads; i++) - { - ArrayList<TRef> si = (ArrayList<TRef>) items.clone(); - Collections.shuffle(si); - tasks.add(new Incrementer(niters, si)); - } - ExecutorService e = Executors.newFixedThreadPool(nthreads); - - if(barrier == null) - barrier = new CyclicBarrier(ninstances); - barrier.await(); - long start = System.nanoTime(); - List<Future<Long>> results = e.invokeAll(tasks); - long estimatedTime = System.nanoTime() - start; - System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters, - estimatedTime / 1000000); - e.shutdown(); - barrier.await(); - for(Future<Long> res : results) - { - System.out.printf("%d, ", res.get() / 1000000); - } - System.out.println(); - for(TRef item : items) - { - System.out.printf("%d, ", (Integer) item.currentVal()); - } - } - catch(Exception ex) - { - ex.printStackTrace(); - } -} - -} |