summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-07-29 14:25:48 +0000
committerRich Hickey <richhickey@gmail.com>2007-07-29 14:25:48 +0000
commit5c719b2f91f723c78d0f066f6ddd7fa558ff8bcc (patch)
tree19b13d2b0a7812b8daa040ef60380a4cd5a65a1e /src
parent4294c94279eecee549d4969e194a40b5ea50649f (diff)
lock based STM
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java210
-rw-r--r--src/jvm/clojure/lang/Ref.java9
2 files changed, 124 insertions, 95 deletions
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index a0ae1c88..3faf1b0a 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -14,11 +14,16 @@ package clojure.lang;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
public class LockingTransaction{
public static int RETRY_LIMIT = 1000;
public static int LOCK_WAIT_MSECS = 100;
+static final int RUNNING = 0;
+static final int COMMITTED = 1;
+static final int RETRY = 2;
+static final int KILLED = 3;
final static ThreadLocal<LockingTransaction> transaction = new ThreadLocal<LockingTransaction>();
@@ -28,22 +33,38 @@ static class RetryException extends Exception{
static class AbortException extends Exception{
}
+public static class Info{
+ final AtomicInteger status;
+ final long point;
+
+
+ public Info(int status, long point){
+ this.status = new AtomicInteger(status);
+ this.point = point;
+ }
+
+ public boolean running(){
+ return status.get() == RUNNING;
+ }
+}
//total order on transactions
//transactions will consume a point for init, for each retry, and on commit if writing
-private static long lastPoint;
-final static PriorityQueue<Long> points = new PriorityQueue<Long>();
+final private static AtomicInteger lastPoint = new AtomicInteger();
+//final static PriorityQueue<Long> points = new PriorityQueue<Long>();
long getCommitPoint(){
- synchronized(points)
- {
- points.remove(readPoint);
- completedPriorPoint = completedThroughPoint();
- ++lastPoint;
- return lastPoint;
- }
+ return lastPoint.incrementAndGet();
+// synchronized(points)
+// {
+// points.remove(readPoint);
+// completedPriorPoint = completedThroughPoint();
+// ++lastPoint;
+// return lastPoint;
+// }
}
+/*
static long completedThroughPoint(){
synchronized(points)
{
@@ -60,15 +81,17 @@ void relinquishReadPoint(){
points.remove(readPoint);
}
}
+*/
-void stop(){
- if(tstatus.running)
+void stop(int status){
+ if(info != null)
{
- synchronized(tstatus)
+ synchronized(info)
{
- tstatus.running = false;
- tstatus.notifyAll();
+ info.status.set(status);
+ info.notifyAll();
}
+ info = null;
vals.clear();
sets.clear();
commutes.clear();
@@ -76,20 +99,23 @@ void stop(){
}
-volatile Ref.TStatus tstatus;
-long completedPriorPoint;
+Info info;
+//long completedPriorPoint;
long readPoint;
+long startPoint;
+RetryException retryex = new RetryException();
HashMap<Ref, Object> vals = new HashMap<Ref, Object>();
HashSet<Ref> sets = new HashSet<Ref>();
TreeMap<Ref, ArrayList<IFn>> commutes = new TreeMap<Ref, ArrayList<IFn>>();
void getReadPoint(){
- synchronized(points)
- {
- ++lastPoint;
- points.add(lastPoint);
- readPoint = lastPoint;
- }
+ readPoint = lastPoint.incrementAndGet();
+// synchronized(points)
+// {
+// ++lastPoint;
+// points.add(lastPoint);
+// readPoint = lastPoint;
+// }
}
//returns the most recent val
@@ -98,26 +124,27 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{
try
{
ref.lock.writeLock().lock();
- Ref.TStatus status = ref.tstatus;
+ Info refinfo = ref.tinfo;
- if(status != null && status != tstatus && status.running)
+ if(refinfo != null && refinfo != info && refinfo.running())
{
ref.lock.writeLock().unlock();
unlocked = true;
- stop();
- synchronized(status)
+ //stop prior to blocking
+ stop(RETRY);
+ synchronized(refinfo)
{
- if(status.running)
- status.wait(LOCK_WAIT_MSECS);
+ if(refinfo.running())
+ refinfo.wait(LOCK_WAIT_MSECS);
}
- throw new RetryException();
+ throw retryex;//new RetryException();
}
if(ensurePoint && ref.tvals != null && ref.tvals.point > readPoint)
{
- stop();
- throw new RetryException();
+// stop();
+ throw retryex;//throw new RetryException();
}
- ref.tstatus = tstatus;
+ ref.tinfo = info;
return ref.tvals == null ? null : ref.tvals.val;
}
finally
@@ -128,14 +155,14 @@ Object lock(Ref ref, boolean ensurePoint) throws Exception{
}
void abort() throws AbortException{
- stop();
+ stop(KILLED);
throw new AbortException();
}
static LockingTransaction getEx() throws Exception{
LockingTransaction t = transaction.get();
- if(!t.tstatus.running)
+ if(t.info == null)
throw new Exception("No transaction running");
return t;
}
@@ -145,7 +172,7 @@ static public Object runInTransaction(IFn fn) throws Exception{
if(t == null)
transaction.set(t = new LockingTransaction());
- if(t.tstatus != null && t.tstatus.running)
+ if(t.info != null)
return fn.invoke();
return t.run(fn);
@@ -161,51 +188,61 @@ Object run(IFn fn) throws Exception{
try
{
getReadPoint();
- tstatus = new Ref.TStatus();
+ if(i == 0)
+ startPoint = readPoint;
+ info = new Info(RUNNING, startPoint);
ret = fn.invoke();
-
- for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet())
+ synchronized(info)
{
- Ref ref = e.getKey();
- ref.lock.writeLock().lock();
- locked.add(ref);
- Ref.TStatus status = ref.tstatus;
- if(status != null && status != tstatus && status.running)
- throw new RetryException();
- Object val = ref.tvals == null ? null : ref.tvals.val;
- if(!sets.contains(ref))
- vals.put(ref, val);
- for(IFn f : e.getValue())
+ if(info.status.get() == RUNNING)
{
- vals.put(ref, f.invoke(vals.get(ref)));
- }
- }
- for(Ref ref : sets)
- {
- if(!commutes.containsKey(ref))
- {
- ref.lock.writeLock().lock();
- locked.add(ref);
- }
- }
-
- //at this point, all values calced, all refs to be written locked
- //no more client code to be called
- long commitPoint = getCommitPoint();
- long msecs = System.currentTimeMillis();
- for(Map.Entry<Ref, Object> e : vals.entrySet())
- {
- Ref ref = e.getKey();
- ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
- ref.tstatus = null;
- //auto-trim
- for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior)
- {
- if(tv.point <= completedPriorPoint)
- tv.prior = null;
+ for(Map.Entry<Ref, ArrayList<IFn>> e : commutes.entrySet())
+ {
+ Ref ref = e.getKey();
+ ref.lock.writeLock().lock();
+ locked.add(ref);
+ Info refinfo = ref.tinfo;
+ if(refinfo != null && refinfo != info && refinfo.running())
+ throw retryex;//new RetryException();
+ Object val = ref.tvals == null ? null : ref.tvals.val;
+ if(!sets.contains(ref))
+ vals.put(ref, val);
+ for(IFn f : e.getValue())
+ {
+ vals.put(ref, f.invoke(vals.get(ref)));
+ }
+ }
+ for(Ref ref : sets)
+ {
+ if(!commutes.containsKey(ref))
+ {
+ ref.lock.writeLock().lock();
+ locked.add(ref);
+ }
+ }
+
+ //at this point, all values calced, all refs to be written locked
+ //no more client code to be called
+ long commitPoint = getCommitPoint();
+ long msecs = System.currentTimeMillis();
+ for(Map.Entry<Ref, Object> e : vals.entrySet())
+ {
+ Ref ref = e.getKey();
+// ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, null);
+ if(ref.tvals != null)
+ ref.tvals.prior = null;
+ ref.tvals = new Ref.TVal(e.getValue(), commitPoint, msecs, ref.tvals);
+ //ref.tstatus = null;
+ //auto-trim
+// for(Ref.TVal tv = ref.tvals; tv != null; tv = tv.prior)
+// {
+// if(tv.msecs <= msecs)
+// tv.prior = null;
+// }
+ }
+ done = true;
}
}
- done = true;
}
catch(RetryException retry)
{
@@ -218,9 +255,9 @@ Object run(IFn fn) throws Exception{
locked.get(k).lock.writeLock().unlock();
}
locked.clear();
- stop();
- if(!done)
- relinquishReadPoint();
+ stop(done ? COMMITTED : RETRY);
+// if(!done)
+// relinquishReadPoint();
}
}
if(!done)
@@ -235,6 +272,8 @@ Object doGet(Ref ref) throws Exception{
try
{
ref.lock.readLock().lock();
+ if(ref.tvals == null)
+ throw new IllegalStateException(ref.toString() + " is unbound.");
for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior)
{
if(ver.point <= readPoint)
@@ -245,8 +284,8 @@ Object doGet(Ref ref) throws Exception{
{
ref.lock.readLock().unlock();
}
-
- throw new IllegalStateException(ref.toString() + " is unbound.");
+ //no version of val precedes the read point
+ throw retryex;//new RetryException();
}
Object doSet(Ref ref, Object val) throws Exception{
@@ -273,14 +312,6 @@ Object doCommute(Ref ref, IFn fn) throws Exception{
{
ref.lock.readLock().lock();
val = ref.tvals == null ? null : ref.tvals.val;
-// for(Ref.TVal ver = ref.tvals; ver != null; ver = ver.prior)
-// {
-// if(ver.point <= readPoint)
-// {
-// val = ver.val;
-// break;
-// }
-// }
}
finally
{
@@ -434,8 +465,9 @@ public static void main(String[] args){
System.out.printf("nthreads: %d, nitems: %d, niters: %d, time: %d%n", nthreads, nitems, niters,
estimatedTime / 1000000);
e.shutdown();
- for(Future<Long> res : results)
+ for(Future<Long> result : results)
{
+ Future<Long> res = (Future<Long>) result;
System.out.printf("%d, ", res.get() / 1000000);
}
System.out.println();
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index 7e7d29d1..cd89a7b3 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -12,11 +12,11 @@
package clojure.lang;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Ref implements IFn, Comparable<Ref>{
+
public int compareTo(Ref o){
if(o.id == id)
return 0;
@@ -40,21 +40,18 @@ public static class TVal{
}
-public static class TStatus{
- boolean running = true;
-}
final static AtomicLong ids = new AtomicLong();
TVal tvals;
transient volatile InheritableThreadLocal<Binding> dvals;
final ReentrantReadWriteLock lock;
-TStatus tstatus;
+LockingTransaction.Info tinfo;
final long id;
public Ref(){
this.tvals = null;
this.dvals = null;
- this.tstatus = null;
+ this.tinfo = null;
lock = new ReentrantReadWriteLock();
id = ids.getAndIncrement();
}