summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-07-31 15:40:54 +0000
committerRich Hickey <richhickey@gmail.com>2007-07-31 15:40:54 +0000
commit4fb15c3fa20e22258c4aa9a47866443fd80da74a (patch)
treefaeb013988ebc828d5828834aae4a426577ee215 /src
parentdf5c458d0a4ce0fe590f6ce981bd62c19c15558f (diff)
removed old transaction code
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/TObj.java48
-rw-r--r--src/jvm/clojure/lang/TRef.java314
-rw-r--r--src/jvm/clojure/lang/TStamp.java29
-rw-r--r--src/jvm/clojure/lang/TVal.java26
-rw-r--r--src/jvm/clojure/lang/Transaction.java434
5 files changed, 0 insertions, 851 deletions
diff --git a/src/jvm/clojure/lang/TObj.java b/src/jvm/clojure/lang/TObj.java
deleted file mode 100644
index e4366f1f..00000000
--- a/src/jvm/clojure/lang/TObj.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-package clojure.lang;
-
-abstract public class TObj implements IObj{
-TRef _attrs;
- /*
-public TObj() throws Exception{
- this._attrs = Transaction.tref(PersistentArrayMap.EMPTY);
-}
-
-
-public Object putAttr( Object key, Object val) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- t = t.assoc(key, val);
- Transaction.set(_attrs,t);
- return val;
-}
-
-public Object getAttr( Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- return t.get(key);
-}
-
-public boolean hasAttr( Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- return t.contains(key);
-}
-
-public IPersistentMap attrs() throws Exception {
- return (IPersistentMap) Transaction.get(_attrs);
-}
-
-public void removeAttr(Object key) throws Exception {
- IPersistentMap t = (IPersistentMap) Transaction.get( _attrs);
- t = t.without(key);
- Transaction.set(_attrs,t);
-}
-*/
-}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
deleted file mode 100644
index 1f25ac2a..00000000
--- a/src/jvm/clojure/lang/TRef.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich May 30, 2006 */
-
-package clojure.lang;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-public class TRef<T> extends AFn{
-//reference to a chain of TVals, only the head of which may be non-committed
-final AtomicReference<TVal> tvals;
-final AtomicReference<InheritableThreadLocal> dvals;
-
-
-public TRef(){
- this.tvals = new AtomicReference<TVal>();
- this.dvals = new AtomicReference<InheritableThreadLocal>();
-}
-
-public Obj withMeta(IPersistentMap meta){
- return new TRef(meta, tvals, dvals);
-}
-
-
-private TRef(IPersistentMap meta, AtomicReference<TVal> tvals, AtomicReference<InheritableThreadLocal> dvals){
- super(meta);
- this.tvals = tvals;
- this.dvals = dvals;
-}
-
-public TRef(T initVal){
- this();
- tvals.set(new TVal(initVal, Transaction.ZERO_POINT, null));
-}
-
-public boolean equals(Object o){
- if(this == o) return true;
- if(o == null || TRef.class != o.getClass()) return false;
-
- TRef other = (TRef) o;
-
- return dvals == other.dvals && tvals == other.tvals;
-}
-
-public int hashCode(){
- return RT.hashCombine(dvals.hashCode(), tvals.hashCode());
-}
-
-public T currentVal(){
- Binding b = getThreadBinding();
- if(b != null)
- return (T) b.val;
- TVal current = getCurrentTVal();
- if(current != null)
- return (T) current.val;
- throw new IllegalStateException(this.toString() + " is unbound.");
-}
-
-public T val() throws Exception{
- Binding b = getThreadBinding();
- if(b != null)
- return (T) b.val;
- Transaction t = Transaction.get();
- if(t != null)
- return (T) t.doGet(this);
- throw new IllegalStateException(this.toString() + " is unbound.");
-}
-
-final Binding getThreadBinding(){
- InheritableThreadLocal dv = dvals.get();
- if(dv != null)
- return (Binding) dv.get();
- return null;
-}
-
-public void pushThreadBinding(T val){
- InheritableThreadLocal dv = dvals.get();
- if(dv == null)
- {
- dvals.compareAndSet(null, new InheritableThreadLocal());
- dv = dvals.get();
- }
- dv.set(new Binding(val, (Binding) dv.get()));
-}
-
-public void popThreadBinding() throws Exception{
- InheritableThreadLocal dv = dvals.get();
- Binding b;
- if(dv == null || (b = (Binding) dv.get()) == null)
- throw new Exception("Can't pop unbound ref");
- dv.set(b.rest);
-}
-
-public T set(T val) throws Exception{
- Binding b = getThreadBinding();
- if(b != null)
- return (T) (b.val = val);
- //allow out-of-transaction inits?
- if(!isBound())
- {
- tvals.set(new TVal(val, Transaction.ZERO_POINT, null));
- return val;
- }
- return (T) Transaction.getEx().doSet(this, val);
-}
-
-public T commute(IFn fn) throws Exception{
- Binding b = getThreadBinding();
- if(b != null)
- return (T) (b.val = fn.invoke(b.val));
- return (T) Transaction.getEx().doCommute(this, fn);
-}
-
-public void touch() throws Exception{
- Transaction.getEx().doTouch(this);
-}
-
-boolean isBound(){
- InheritableThreadLocal dv = dvals.get();
- return (dv != null && dv.get() != null)
- ||
- getCurrentTVal() != null;
-}
-
-TVal getCurrentTVal(){
- TVal head = tvals.get();
- if(head == null || head.tstamp.status == TStamp.Status.COMMITTED)
- return head;
- return head.prior;
-}
-
-TVal valAsOfPoint(TRef tref, int tpoint){
- for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
- {
- if(tv.tstamp.tpoint <= tpoint)
- return tv;
- }
- return null;
-}
-
-TVal valAsOfTime(TRef tref, long msecs){
- for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
- {
- if(tv.tstamp.msecs <= msecs)
- return tv;
- }
- return null;
-}
-
-void trimHistory(){
- long ctp = Transaction.completedThroughPoint();
- for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
- {
- while(tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-void trimHistoryPriorToPoint(long tpoint){
- long ctp = Transaction.completedThroughPoint();
- for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
- {
- while(tv.tstamp.tpoint > tpoint || tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-void trimHistoryPriorToTime(long msecs){
- long ctp = Transaction.completedThroughPoint();
- for(TVal tv = getCurrentTVal(); tv != null; tv = tv.prior)
- {
- while(tv.tstamp.msecs > msecs || tv.tstamp.tpoint > ctp)
- tv = tv.prior;
- tv.prior = null;
- }
-}
-
-
-final public IFn fn(){
- return (IFn) currentVal();
-}
-
-public Object invoke() throws Exception{
- return fn().invoke();
-}
-
-public Object invoke(Object arg1) throws Exception{
- return fn().invoke(arg1);
-}
-
-public Object invoke(Object arg1, Object arg2) throws Exception{
- return fn().invoke(arg1, arg2);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3) throws Exception{
- return fn().invoke(arg1, arg2, arg3);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7)
- throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13)
- throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14)
- throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16, Object arg17) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16, arg17);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16, Object arg17, Object arg18) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16, arg17, arg18);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16, Object arg17, Object arg18, Object arg19) throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16, arg17, arg18, arg19);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20)
- throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16, arg17, arg18, arg19, arg20);
-}
-
-public Object invoke(Object arg1, Object arg2, Object arg3, Object arg4, Object arg5, Object arg6, Object arg7,
- Object arg8, Object arg9, Object arg10, Object arg11, Object arg12, Object arg13, Object arg14,
- Object arg15, Object arg16, Object arg17, Object arg18, Object arg19, Object arg20,
- Object... args)
- throws Exception{
- return fn().invoke(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15,
- arg16, arg17, arg18, arg19, arg20, args);
-}
-
-}
diff --git a/src/jvm/clojure/lang/TStamp.java b/src/jvm/clojure/lang/TStamp.java
deleted file mode 100644
index 50879cd9..00000000
--- a/src/jvm/clojure/lang/TStamp.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich Jun 22, 2007 */
-
-package clojure.lang;
-
-public class TStamp{
-
-public static enum Status{
- RUNNING, COMMITTED, ABORTED, RETRY
-}
-
-volatile Status status;
-volatile long tpoint;
-volatile long msecs;
-
-
-public TStamp(Status status){
- this.status = status;
-}
-}
diff --git a/src/jvm/clojure/lang/TVal.java b/src/jvm/clojure/lang/TVal.java
deleted file mode 100644
index 916cbaa0..00000000
--- a/src/jvm/clojure/lang/TVal.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich May 30, 2006 */
-
-package clojure.lang;
-
-public class TVal{
-
-volatile Object val;
-public final TStamp tstamp;
-volatile TVal prior;
-
-TVal(Object val, TStamp tstamp, TVal prior) {
- this.val = val;
- this.tstamp = tstamp;
- this.prior = prior;
-}
-}
diff --git a/src/jvm/clojure/lang/Transaction.java b/src/jvm/clojure/lang/Transaction.java
deleted file mode 100644
index c501c346..00000000
--- a/src/jvm/clojure/lang/Transaction.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/**
- * Copyright (c) Rich Hickey. All rights reserved.
- * The use and distribution terms for this software are covered by the
- * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
- * which can be found in the file CPL.TXT at the root of this distribution.
- * By using this software in any fashion, you are agreeing to be bound by
- * the terms of this license.
- * You must not remove this notice, or any other, from this software.
- **/
-
-/* rich May 30, 2006 */
-
-package clojure.lang;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.*;
-import java.util.*;
-
-public class Transaction{
-
-public static int RETRY_LIMIT = 1000;
-public static int LOCK_WAIT_MSECS = 100;
-
-final static ThreadLocal<Transaction> transaction = new ThreadLocal<Transaction>();
-
-static class RetryException extends Exception{
-}
-
-static class AbortException extends Exception{
-}
-
-//total order on transactions
-//transactions will consume a point for init, for each retry, and on commit if writing
-private static long nextPoint = 1;
-final static PriorityQueue<Long> points = new PriorityQueue<Long>();
-
-void getReadPoint(){
- synchronized(points)
- {
- completedPriorPoint = completedThroughPoint();
- points.add(nextPoint);
- readPoint = nextPoint++;
- }
-}
-
-static long getCommitPoint(){
- synchronized(points)
- {
- return nextPoint++;
- }
-}
-
-final static TStamp ZERO_POINT = new TStamp(TStamp.Status.COMMITTED);
-
-static long completedThroughPoint(){
- synchronized(points)
- {
- Long p = points.peek();
- if(p != null)
- return p - 1;
- return nextPoint - 1;
- }
-}
-
-static void relinquish(long tpoint){
- synchronized(points)
- {
- points.remove(tpoint);
- }
-}
-
-static void statusTransition(TStamp tstamp, TStamp.Status newStatus){
- synchronized(tstamp)
- {
- tstamp.status = newStatus;
- tstamp.notifyAll();
- }
-}
-
-
-TStamp tstamp;
-long completedPriorPoint;
-long readPoint;
-
-TVal lock(TRef tref, boolean ensurePoint) throws Exception{
- TVal head = (TVal) tref.tvals.get();
- //already locked by this transaction
- if(head != null && head.tstamp == tstamp)
- return head;
- if(head != null && head.tstamp.status == TStamp.Status.RUNNING)
- {
- //already locked by another transaction, block a bit
- //first drop our locks
- statusTransition(tstamp, TStamp.Status.RETRY);
- synchronized(head.tstamp)
- {
- if(head.tstamp.status == TStamp.Status.RUNNING)
- head.tstamp.wait(LOCK_WAIT_MSECS);
- }
-
- throw new RetryException();
- }
- else
- {
- TVal prior;
- if(head == null || head.tstamp.status == TStamp.Status.COMMITTED)
- {
- prior = head;
- }
- else //aborted/retried at head, skip over
- prior = head.prior;
- TVal ret = null;
- if((ensurePoint && prior != null && prior.tstamp.tpoint > readPoint)
- ||
- !tref.tvals.compareAndSet(head, ret = new TVal(prior == null ? null : prior.val, tstamp, prior)))
- {
- statusTransition(tstamp, TStamp.Status.RETRY);
- throw new RetryException();
- }
- //auto-trim
- for(TVal tv = prior; tv != null; tv = tv.prior)
- {
- if(tv.tstamp.tpoint <= completedPriorPoint)
- tv.prior = null;
- }
- return ret;
- }
-}
-
-void abort() throws AbortException{
- statusTransition(tstamp, TStamp.Status.ABORTED);
- throw new AbortException();
-}
-
-static Transaction get(){
- return transaction.get();
-}
-
-static Transaction getEx() throws Exception{
- Transaction t = transaction.get();
- if(t == null)
- throw new Exception("No transaction running");
- return t;
-}
-
-static void setTransaction(Transaction t){
- transaction.set(t);
-}
-
-static public Object runInTransaction(IFn fn) throws Exception{
- if(get() != null)
- return fn.invoke();
-
- Transaction t = new Transaction();
- setTransaction(t);
- try
- {
- return t.run(fn);
- }
- finally
- {
- setTransaction(null);
- }
-}
-
-Object run(IFn fn) throws Exception{
- boolean done = false;
- Object ret = null;
-
- for(int i = 0; !done && i < RETRY_LIMIT; i++)
- {
- try
- {
- getReadPoint();
- tstamp = new TStamp(TStamp.Status.RUNNING);
- ret = fn.invoke();
- done = true;
- tstamp.msecs = System.currentTimeMillis();
- //get a commit point + alter status, atomically
- synchronized(points)
- {
- tstamp.tpoint = getCommitPoint();
- //commit!
- statusTransition(tstamp, TStamp.Status.COMMITTED);
- relinquish(readPoint);
- //relinquish(tstamp.tpoint);
- }
- }
- catch(RetryException retry)
- {
- //eat this so we retry rather than fall out
- }
- finally
- {
- if(!done)
- {
- statusTransition(tstamp, TStamp.Status.ABORTED);
- relinquish(readPoint);
- //relinquish(tstamp.tpoint);
- }
- }
- }
- if(!done)
- throw new Exception("Transaction failed after reaching retry limit");
- return ret;
-}
-
-
-Object doGet(TRef tref) throws Exception{
- TVal head = (TVal) tref.tvals.get();
- if(head == null)
- return null;
- if(head.tstamp == tstamp)
- return head.val;
- TVal ver;
- switch(head.tstamp.status)
- {
- case COMMITTED:
- ver = head;
- break;
- case RETRY:
- case ABORTED:
- ver = head.prior;
- break;
- default:
- //ensure a running->commit transition happens before/after our read point
- synchronized(head.tstamp)
- {
- ver = head.tstamp.status == TStamp.Status.COMMITTED ? head : head.prior;
- }
- break;
- }
-
- for(; ver != null; ver = ver.prior)
- {
- if(ver.tstamp.tpoint <= readPoint)
- return ver.val;
- }
- throw new IllegalStateException(tref.toString() + " is unbound.");
-}
-
-Object doSet(TRef tref, Object val) throws Exception{
- TVal head = lock(tref, true);
- head.val = val;
- return val;
-}
-
-void doTouch(TRef tref) throws Exception{
- lock(tref, true);
-}
-
-Object doCommute(TRef tref, IFn fn) throws Exception{
- TVal head = lock(tref, false);
- return head.val = fn.invoke(head.val);
-}
-
-/*
-static public Object runInAsOfTransaction(IFn fn, int tpoint) throws Exception{
- if(get() != null)
- throw new Exception("As-of transactions cannot be nested");
-
- Transaction t = new Transaction(tpoint);
- setTransaction(t);
- try
- {
- return fn.invoke();//t.run(fn);
- }
- finally
- {
- setTransaction(null);
- }
-}
-
-static public Object runInAsOfTransaction(IFn fn, long msecs) throws Exception{
- if(get() != null)
- throw new Exception("As-of transactions cannot be nested");
-
- Transaction t = new Transaction(msecs);
- setTransaction(t);
- try
- {
- return fn.invoke();//t.run(fn);
- }
- finally
- {
- setTransaction(null);
- }
-}
-
- */
-
-//for test
-static CyclicBarrier barrier;
-
-public static void main(String[] args){
- try
- {
- if(args.length != 4)
- System.err.println("Usage: Transaction nthreads nitems niters ninstances");
- int nthreads = Integer.parseInt(args[0]);
- int nitems = Integer.parseInt(args[1]);
- int niters = Integer.parseInt(args[2]);
- int ninstances = Integer.parseInt(args[3]);
-
- final ArrayList<TRef> items = new ArrayList(nitems);
- for(int i = 0; i < nitems; i++)
- items.add(new TRef(0));
-
-
- class Incr extends AFn{
- public Object invoke(Object arg1) throws Exception{
- Integer i = (Integer) arg1;
- return i + 1;
- }
-
- public Obj withMeta(IPersistentMap meta){
- throw new UnsupportedOperationException();
-
- }
- }
-
- class Commuter extends AFn implements Callable{
- int niters;
- List<TRef> items;
- Incr incr;
-
-
- public Commuter(int niters, List<TRef> items){
- this.niters = niters;
- this.items = items;
- this.incr = new Incr();
- }
-
- public Object call() throws Exception{
- long nanos = 0;
- for(int i = 0; i < niters; i++)
- {
- long start = System.nanoTime();
- Transaction.runInTransaction(this);
- long dur = System.nanoTime() - start;
- nanos += dur;
- }
- return nanos;
- }
-
- public Object invoke() throws Exception{
- for(TRef tref : items)
- {
- Transaction.get().doCommute(tref, incr);
- }
- return null;
- }
-
- public Obj withMeta(IPersistentMap meta){
- throw new UnsupportedOperationException();
-
- }
- }
-
- class Incrementer extends AFn implements Callable{
- int niters;
- List<TRef> items;
-
-
- public Incrementer(int niters, List<TRef> items){
- this.niters = niters;
- this.items = items;
- }
-
- public Object call() throws Exception{
- long nanos = 0;
- for(int i = 0; i < niters; i++)
- {
- long start = System.nanoTime();
- Transaction.runInTransaction(this);
- long dur = System.nanoTime() - start;
- nanos += dur;
- }
- return nanos;
- }
-
- public Object invoke() throws Exception{
- for(TRef tref : items)
- {
- //Transaction.get().doTouch(tref);
- Transaction t = Transaction.get();
- int val = (Integer) t.doGet(tref);
- t.doSet(tref, val + 1);
- }
- return null;
- }
-
- public Obj withMeta(IPersistentMap meta){
- throw new UnsupportedOperationException();
-
- }
- }
-
- ArrayList<Callable<Long>> tasks = new ArrayList(nthreads);
- for(int i = 0; i < nthreads; i++)
- {
- ArrayList<TRef> si = (ArrayList<TRef>) items.clone();
- Collections.shuffle(si);
- tasks.add(new Incrementer(niters, si));
- }
- ExecutorService e = Executors.newFixedThreadPool(nthreads);
-
- if(barrier == null)
- barrier = new CyclicBarrier(ninstances);
- barrier.await();
- long start = System.nanoTime();
- List<Future<Long>> results = e.invokeAll(tasks);
- long estimatedTime = System.nanoTime() - start;
- System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
- estimatedTime / 1000000);
- e.shutdown();
- barrier.await();
- for(Future<Long> res : results)
- {
- System.out.printf("%d, ", res.get() / 1000000);
- }
- System.out.println();
- for(TRef item : items)
- {
- System.out.printf("%d, ", (Integer) item.currentVal());
- }
- }
- catch(Exception ex)
- {
- ex.printStackTrace();
- }
-}
-
-}