summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-12-02 18:18:00 +0000
committerRich Hickey <richhickey@gmail.com>2007-12-02 18:18:00 +0000
commit139ddd146f2a272b7ddda397f54b501ff499c643 (patch)
tree02e24ccd90687a514fa25e71589f3c89efd9bbdb /src
parent340b89a20353fb118a923fcf0109b2feeeae88e0 (diff)
interim checkin
Diffstat (limited to 'src')
-rw-r--r--src/boot.clj51
-rw-r--r--src/jvm/clojure/lang/IRef.java35
-rw-r--r--src/jvm/clojure/lang/LispReader.java6
-rw-r--r--src/jvm/clojure/lang/LockingTransaction.java28
-rw-r--r--src/jvm/clojure/lang/Ref.java9
-rw-r--r--src/jvm/clojure/lang/TRef.java17
-rw-r--r--src/jvm/clojure/lang/Var.java9
7 files changed, 103 insertions, 52 deletions
diff --git a/src/boot.clj b/src/boot.clj
index 9f2347e6..72c39b12 100644
--- a/src/boot.clj
+++ b/src/boot.clj
@@ -43,21 +43,7 @@
(defn with-meta [#^clojure.lang.IObj x m]
(. x (withMeta m)))
-;;;;;;;;;;;;;;;;;;;; actors ;;;;;;;;;;;;;;;;;;;;;;;;;;
-(defn iref [state]
- (new clojure.lang.IRef state))
-(defn iref-of [state]
- (:iref ^state))
-
-(defn ! [#^clojure.lang.IRef a f & args]
- (. a (commute f args)))
-
-(defn actor-errors [#^clojure.lang.IRef a]
- (. a (getErrors)))
-
-(defn clear-actor-errors [#^clojure.lang.IRef a]
- (. a (clearErrors)))
;;;;;;;;;;;;;;;;;;;;
(def defmacro (fn [name & args]
@@ -363,24 +349,39 @@
(. clojure.lang.Var (find sym)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; Refs ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn iref [state]
+ (new clojure.lang.IRef state))
+
+(defn iref-of [state]
+ (:iref ^state))
+
+(defn ! [#^clojure.lang.IRef a f & args]
+ (. a (commute f args)))
+
+(defn iref-errors [#^clojure.lang.IRef a]
+ (. a (getErrors)))
+
+(defn clear-iref-errors [#^clojure.lang.IRef a]
+ (. a (clearErrors)))
+
(defn tref [x]
(new clojure.lang.TRef x))
(defn deref [#^clojure.lang.Ref ref]
(. ref (get)))
-(defn deref! [#^clojure.lang.TRef ref]
- (. ref (currentVal)))
+(defn commute [#^clojure.lang.Ref ref fun & args]
+ (. ref (commute fun args)))
+
+(defn alter [#^clojure.lang.Ref ref fun & args]
+ (. ref (alter fun args)))
-(defn commute [#^clojure.lang.TRef ref fun]
- (. ref (commute fun)))
+(defn set [#^clojure.lang.Ref ref val]
+ (. ref (set val)))
-(defn set
- ([#^clojure.lang.TRef ref]
+(defn ensure [#^clojure.lang.TRef ref]
(. ref (touch))
(. ref (get)))
- ([#^clojure.lang.TRef ref val]
- (. ref (set val))))
(defmacro sync [flags-ignored-for-now & body]
`(. clojure.lang.LockingTransaction
@@ -733,7 +734,7 @@
(let [job (sync nil
(when @todo
(let [item (first @todo)]
- (set todo (rest @todo))
+ (alter todo rest)
(commute out inc)
(list item))))]
(when job
@@ -778,7 +779,8 @@
rseq sym name namespace locking .. ->
defmulti defmethod remove-method
binding find-var
- tref deref deref! commute set sync
+ tref deref commute alter set ensure sync
+ iref iref-of iref-errors clear-iref-errors
reduce reverse comp appl
every not-every any not-any
map pmap mapcat filter take take-while drop drop-while
@@ -795,6 +797,5 @@
int long float double short byte boolean char
aget aset aset-boolean aset-int aset-long aset-float aset-double aset-short aset-byte
make-array
- iref iref-of !
))
diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java
index 81d463d2..e9d8417f 100644
--- a/src/jvm/clojure/lang/IRef.java
+++ b/src/jvm/clojure/lang/IRef.java
@@ -21,14 +21,14 @@ public class IRef implements Ref{
volatile Object state;
final Queue q = new LinkedList();
boolean busy = false;
-boolean commuting = false;
+boolean altering = false;
volatile ISeq errors = null;
//todo - make tuneable
final static Executor executor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
//final static Executor executor = Executors.newCachedThreadPool();
final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
-final static ThreadLocal inChange = new ThreadLocal();
+final static ThreadLocal inAlter = new ThreadLocal();
static class Action implements Runnable{
final IRef iref;
@@ -112,12 +112,12 @@ public void clearErrors(){
synchronized void doAlter(IFn fn, ISeq args) throws Exception{
try
{
- commuting = true;
+ altering = true;
setState(fn.applyTo(RT.cons(state, args)));
}
finally
{
- commuting = false;
+ altering = false;
}
}
@@ -127,22 +127,22 @@ public Object alter(IFn fn, ISeq args) throws Exception{
throw new Exception("IRef has errors", (Exception) RT.first(errors));
}
//Action action = new Action(this, fn, args);
- if(commuting)
+ if(altering)
throw new Exception("Recursive change");
LockingTransaction trans = LockingTransaction.getRunning();
if(trans != null)
- throw new Exception("Cannot change an IRef in a transaction");
- if(inChange.get() != null)
- throw new Exception("Cannot nest changes, use send");
+ throw new Exception("Cannot alter an IRef in a transaction");
+ if(inAlter.get() != null)
+ throw new Exception("Cannot nest alters, use commute");
try
{
- inChange.set(this);
+ inAlter.set(this);
doAlter(fn, args);
}
finally
{
- inChange.set(null);
+ inAlter.set(null);
}
return this;
@@ -167,6 +167,21 @@ public Object commute(IFn fn, ISeq args) throws Exception{
return this;
}
+public Object set(Object val) throws Exception{
+ synchronized(this)
+ {
+ if(altering)
+ throw new Exception("Recursive change");
+ LockingTransaction trans = LockingTransaction.getRunning();
+ if(trans != null)
+ throw new Exception("Cannot set an IRef in a transaction");
+ if(inAlter.get() != null)
+ throw new Exception("Cannot nest alters, use commute");
+ setState(val);
+ return val;
+ }
+}
+
void enqueue(Action action){
synchronized(this)
{
diff --git a/src/jvm/clojure/lang/LispReader.java b/src/jvm/clojure/lang/LispReader.java
index 9fb0db35..8fd86832 100644
--- a/src/jvm/clojure/lang/LispReader.java
+++ b/src/jvm/clojure/lang/LispReader.java
@@ -31,7 +31,7 @@ static Symbol VECTOR = Symbol.create("clojure", "vector");
static Symbol WITH_META = Symbol.create("clojure", "with-meta");
static Symbol META = Symbol.create("clojure", "meta");
static Symbol DEREF = Symbol.create("clojure", "deref");
-static Symbol DEREF_BANG = Symbol.create("clojure", "deref!");
+//static Symbol DEREF_BANG = Symbol.create("clojure", "deref!");
static Keyword LINE_KEY = Keyword.intern("clojure", "line");
static IFn[] macros = new IFn[256];
@@ -55,7 +55,7 @@ static
macros['"'] = new StringReader();
macros[';'] = new CommentReader();
macros['\''] = new WrappingReader(Compiler.QUOTE);
- macros['@'] = new DerefReader();
+ macros['@'] = new WrappingReader(DEREF);//new DerefReader();
macros['^'] = new WrappingReader(META);
macros['`'] = new SyntaxQuoteReader();
macros['~'] = new UnquoteReader();
@@ -315,6 +315,7 @@ static class WrappingReader extends AFn{
}
+/*
static class DerefReader extends AFn{
public Object invoke(Object reader, Object quote) throws Exception{
@@ -336,6 +337,7 @@ static class DerefReader extends AFn{
}
}
+*/
static class DispatchReader extends AFn{
public Object invoke(Object reader, Object hash) throws Exception{
diff --git a/src/jvm/clojure/lang/LockingTransaction.java b/src/jvm/clojure/lang/LockingTransaction.java
index b09f49ea..79c49829 100644
--- a/src/jvm/clojure/lang/LockingTransaction.java
+++ b/src/jvm/clojure/lang/LockingTransaction.java
@@ -53,6 +53,16 @@ public static class Info{
return s == RUNNING || s == COMMITTING;
}
}
+
+static class CFn{
+ final IFn fn;
+ final ISeq args;
+
+ public CFn(IFn fn, ISeq args){
+ this.fn = fn;
+ this.args = args;
+ }
+}
//total order on transactions
//transactions will consume a point for init, for each retry, and on commit if writing
final private static AtomicInteger lastPoint = new AtomicInteger();
@@ -90,7 +100,7 @@ final RetryException retryex = new RetryException();
final ArrayList<IRef.Action> actions = new ArrayList<IRef.Action>();
final HashMap<TRef, Object> vals = new HashMap<TRef, Object>();
final HashSet<TRef> sets = new HashSet<TRef>();
-final TreeMap<TRef, ArrayList<IFn>> commutes = new TreeMap<TRef, ArrayList<IFn>>();
+final TreeMap<TRef, ArrayList<CFn>> commutes = new TreeMap<TRef, ArrayList<CFn>>();
//returns the most recent val
@@ -208,7 +218,7 @@ Object run(IFn fn) throws Exception{
//make sure no one has killed us before this point, and can't from now on
if(info.status.compareAndSet(RUNNING, COMMITTING))
{
- for(Map.Entry<TRef, ArrayList<IFn>> e : commutes.entrySet())
+ for(Map.Entry<TRef, ArrayList<CFn>> e : commutes.entrySet())
{
TRef ref = e.getKey();
ref.lock.writeLock().lock();
@@ -222,9 +232,9 @@ Object run(IFn fn) throws Exception{
Object val = ref.tvals == null ? null : ref.tvals.val;
if(!sets.contains(ref))
vals.put(ref, val);
- for(IFn f : e.getValue())
+ for(CFn f : e.getValue())
{
- vals.put(ref, f.invoke(vals.get(ref)));
+ vals.put(ref, f.fn.applyTo(RT.cons(vals.get(ref), f.args)));
}
}
for(TRef ref : sets)
@@ -338,7 +348,7 @@ void doTouch(TRef ref){
lock(ref);
}
-Object doCommute(TRef ref, IFn fn) throws Exception{
+Object doCommute(TRef ref, IFn fn, ISeq args) throws Exception{
if(!info.running())
throw retryex;
if(!vals.containsKey(ref))
@@ -355,11 +365,11 @@ Object doCommute(TRef ref, IFn fn) throws Exception{
}
vals.put(ref, val);
}
- ArrayList<IFn> fns = commutes.get(ref);
+ ArrayList<CFn> fns = commutes.get(ref);
if(fns == null)
- commutes.put(ref, fns = new ArrayList<IFn>());
- fns.add(fn);
- Object ret = fn.invoke(vals.get(ref));
+ commutes.put(ref, fns = new ArrayList<CFn>());
+ fns.add(new CFn(fn,args));
+ Object ret = fn.applyTo(RT.cons(vals.get(ref),args));
vals.put(ref, ret);
return ret;
}
diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java
index 56f9da0f..4d81efed 100644
--- a/src/jvm/clojure/lang/Ref.java
+++ b/src/jvm/clojure/lang/Ref.java
@@ -13,5 +13,12 @@
package clojure.lang;
public interface Ref{
- Object get();
+
+Object get() throws Exception;
+
+Object alter(IFn fn, ISeq args) throws Exception;
+
+Object commute(IFn fn, ISeq args) throws Exception;
+
+Object set(Object val) throws Exception;
}
diff --git a/src/jvm/clojure/lang/TRef.java b/src/jvm/clojure/lang/TRef.java
index bcaff392..9619fd78 100644
--- a/src/jvm/clojure/lang/TRef.java
+++ b/src/jvm/clojure/lang/TRef.java
@@ -86,7 +86,7 @@ public UUID getUUID(){
//the latest val
// ok out of transaction
-public Object currentVal(){
+Object currentVal(){
try
{
lock.readLock().lock();
@@ -102,17 +102,24 @@ public Object currentVal(){
//*
-//must be dynamically bound or transactional read
public Object get(){
- return LockingTransaction.getEx().doGet(this);
+ LockingTransaction t = LockingTransaction.getRunning();
+ if(t == null)
+ return currentVal();
+ return t.doGet(this);
}
public Object set(Object val){
return LockingTransaction.getEx().doSet(this, val);
}
-public Object commute(IFn fn) throws Exception{
- return LockingTransaction.getEx().doCommute(this, fn);
+public Object commute(IFn fn, ISeq args) throws Exception{
+ return LockingTransaction.getEx().doCommute(this, fn, args);
+}
+
+public Object alter(IFn fn, ISeq args) throws Exception{
+ LockingTransaction t = LockingTransaction.getEx();
+ return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args)));
}
public void touch(){
diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java
index 95d4ee1d..ac020867 100644
--- a/src/jvm/clojure/lang/Var.java
+++ b/src/jvm/clojure/lang/Var.java
@@ -128,6 +128,15 @@ final public Object get(){
throw new IllegalStateException(String.format("Var %s is unbound.", sym));
}
+public Object alter(IFn fn, ISeq args) throws Exception{
+ set(fn.applyTo(RT.cons(get(), args)));
+ return this;
+}
+
+public Object commute(IFn fn, ISeq args) throws Exception{
+ return alter(fn,args);
+}
+
public Object set(Object val){
Box b = getThreadBinding();
if(b != null)