summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2006-05-31 11:49:41 +0000
committerRich Hickey <richhickey@gmail.com>2006-05-31 11:49:41 +0000
commit8b74fbd52683934c6dde25412c4d3ec86384da87 (patch)
treea7914b42768a130d511c43bc9a21bfe12fc9aded /src
parent1cd2547d56cd7f56c1149f27bd77142e3fc40b50 (diff)
first steps towards STM
Diffstat (limited to 'src')
-rw-r--r--src/org/clojure/runtime/TRef.java50
-rw-r--r--src/org/clojure/runtime/TVal.java26
-rw-r--r--src/org/clojure/runtime/ThreadLocalData.java7
-rw-r--r--src/org/clojure/runtime/Transaction.java204
4 files changed, 287 insertions, 0 deletions
diff --git a/src/org/clojure/runtime/TRef.java b/src/org/clojure/runtime/TRef.java
new file mode 100644
index 00000000..36b2866e
--- /dev/null
+++ b/src/org/clojure/runtime/TRef.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich May 30, 2006 */
+
+package org.clojure.runtime;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TRef extends TVal implements Comparable{
+static AtomicInteger nextSeq = new AtomicInteger(1);
+
+final int lockSeq;
+Lock lock;
+
+public TRef(ThreadLocalData tld, Object val) throws Exception{
+ this.lockSeq = nextSeq.getAndIncrement();
+ this.lock = new ReentrantLock();
+ set(tld, val);
+}
+
+public Object get(ThreadLocalData tld) throws Exception{
+ return tld.getTransaction().get(this);
+}
+
+public Object set(ThreadLocalData tld, Object val) throws Exception{
+ return tld.getTransaction().set(this,val);
+}
+
+public void touch(ThreadLocalData tld) throws Exception{
+ tld.getTransaction().touch(this);
+}
+
+public void commutate(ThreadLocalData tld, IFn fn) throws Exception{
+ tld.getTransaction().commutate(this, fn);
+}
+
+public int compareTo(Object o){
+ return lockSeq - ((TRef) o).lockSeq;
+}
+}
diff --git a/src/org/clojure/runtime/TVal.java b/src/org/clojure/runtime/TVal.java
new file mode 100644
index 00000000..6d95fcac
--- /dev/null
+++ b/src/org/clojure/runtime/TVal.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich May 30, 2006 */
+
+package org.clojure.runtime;
+
+public class TVal{
+volatile Object val;
+volatile Transaction.Info tinfo;
+volatile TVal prior;
+
+void push(Object val,Transaction.Info tinfo) throws Exception{
+ this.prior = (TVal) this.clone();
+ this.tinfo = tinfo;
+ this.val = val;
+}
+
+}
diff --git a/src/org/clojure/runtime/ThreadLocalData.java b/src/org/clojure/runtime/ThreadLocalData.java
index 67fd0048..0bc9c665 100644
--- a/src/org/clojure/runtime/ThreadLocalData.java
+++ b/src/org/clojure/runtime/ThreadLocalData.java
@@ -21,6 +21,13 @@ public int mvCount = 0;
public Object[] mvArray = new Object[MULTIPLE_VALUES_LIMIT];
IdentityHashMap dynamicBindings = new IdentityHashMap();
+Transaction transaction;
+
+public Transaction getTransaction() throws Exception{
+ if(transaction == null)
+ throw new Exception("No active transaction");
+ return transaction;
+}
public ThreadLocalData(IdentityHashMap dynamicBindings)
{
diff --git a/src/org/clojure/runtime/Transaction.java b/src/org/clojure/runtime/Transaction.java
new file mode 100644
index 00000000..27f56433
--- /dev/null
+++ b/src/org/clojure/runtime/Transaction.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Common Public License 1.0 (http://opensource.org/licenses/cpl.php)
+ * which can be found in the file CPL.TXT at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich May 30, 2006 */
+
+package org.clojure.runtime;
+
+import java.util.*;
+
+public class Transaction{
+
+public static final int COMMITTED = 0;
+public static final int WORKING = 1;
+static final Object lock = new Object();
+static int nextSeq = 1;
+
+public static class Info{
+int seq;
+int status;
+
+Info(int seq,int status){
+ this.seq = seq;
+ this.status = status;
+}
+}
+
+Info info;
+int startSeq;
+
+IdentityHashMap<TRef,Object> sets;
+IdentityHashMap<TRef,Cons> commutates;
+ArrayList<TRef> locks;
+ArrayList<TRef> locked;
+
+
+static public Object runInTransaction(ThreadLocalData tld,IFn fn) throws Exception{
+ if(tld.transaction != null)
+ return fn.invoke(tld);
+
+ tld.transaction = new Transaction();
+ return tld.transaction.run(tld, fn);
+}
+
+public Object run(ThreadLocalData tld, IFn fn) throws Exception{
+ boolean done = false;
+ Object ret = null;
+
+ loop:
+ while(!done){
+ try
+ {
+ reset();
+ ret = fn.invoke(tld);
+ if(sets != null)
+ getLocks().addAll(sets.keySet());
+ if(commutates != null)
+ getLocks().addAll(commutates.keySet());
+ if(locks != null)
+ {
+ if(locked == null)
+ locked = new ArrayList<TRef>(locks.size());
+ //lock in order, to avoid deadlocks
+ Collections.sort(locks);
+ for(TRef tref : locks)
+ {
+ //will block here
+ tref.lock.lock();
+ locked.add(tref);
+ if(sets.containsKey(tref))
+ {
+ //try again if the thing we are trying to set has changed since we started
+ TVal curr = getCurrent(tref);
+ if(curr.tinfo.seq > startSeq)
+ continue loop;
+ }
+ }
+ }
+
+ //at this point all write targets are locked
+ //turn commutates into sets
+ for(Map.Entry<TRef, Cons> e : commutates.entrySet())
+ {
+ TRef tref = e.getKey();
+ Object val = getCurrent(tref).val;
+ for(Cons c = e.getValue();c!=null;c = c.rest)
+ {
+ IFn f = (IFn) c.first;
+ val = f.invoke(tld, val);
+ }
+ sets.put(tref, val);
+ }
+
+ //set the new vals
+ for(Map.Entry<TRef, Object> entry : sets.entrySet())
+ {
+ TRef tref = entry.getKey();
+ tref.push(entry.getValue(), info);
+ }
+
+ //atomic commit
+ synchronized(lock){
+ info.seq = getNextSeq();
+ info.status = COMMITTED;
+ }
+
+ done = true;
+ }
+ finally{
+ if(locked != null)
+ {
+ for(TRef tref : locked)
+ {
+ tref.lock.unlock();
+ }
+ }
+ }
+ }
+ return ret;
+}
+
+ArrayList<TRef> getLocks(){
+ if(locks == null)
+ locks = new ArrayList<TRef>();
+ return locks;
+}
+
+private void reset(){
+ if(sets != null)
+ sets.clear();
+ if(commutates != null)
+ commutates.clear();
+ if(locks != null)
+ locks.clear();
+ if(locked != null)
+ locked.clear();
+}
+
+int getNextSeq(){
+ synchronized(lock){
+ return nextSeq++;
+ }
+
+}
+Transaction(){
+ synchronized(lock){
+ int seq = getNextSeq();
+ this.info = new Info(seq, WORKING);
+ this.startSeq = seq;
+ }
+}
+
+Object get(TRef tref) throws Exception{
+ if(sets != null && sets.containsKey(tref))
+ return sets.get(tref);
+
+ for(TVal ver = tref;ver != null;ver = ver.prior)
+ {
+ if(ver.tinfo.status == COMMITTED && ver.tinfo.seq <= startSeq)
+ return ver.val;
+ }
+
+ throw new Exception("Version not found");
+
+}
+
+static TVal getCurrent(TRef tref) throws Exception{
+ for(TVal ver = tref;ver != null;ver = ver.prior)
+ {
+ if(ver.tinfo.status == COMMITTED)
+ return ver;
+ }
+ throw new Exception("Version not found");
+}
+
+Object set(TRef tref, Object val) throws Exception{
+ if(sets == null)
+ sets = new IdentityHashMap<TRef,Object>();
+ if(commutates.containsKey(tref))
+ throw new Exception("Can't commutate and set a TRef in the same transaction");
+
+ sets.put(tref,val);
+ return val;
+}
+
+void touch(TRef tref) throws Exception{
+ set(tref, get(tref));
+}
+
+void commutate(TRef tref, IFn fn) throws Exception{
+ if(commutates == null)
+ commutates = new IdentityHashMap<TRef,Cons>();
+ if(sets.containsKey(tref))
+ throw new Exception("Can't commutate and set a TRef in the same transaction");
+ commutates.put(tref, RT.cons(fn, commutates.get(tref)));
+}
+
+}