diff options
-rw-r--r-- | src/boot.clj | 19 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Actor.java | 37 | ||||
-rw-r--r-- | src/jvm/clojure/lang/IRef.java | 17 | ||||
-rw-r--r-- | src/jvm/clojure/lang/RT.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Ref.java | 2 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Var.java | 2 |
6 files changed, 64 insertions, 15 deletions
diff --git a/src/boot.clj b/src/boot.clj index 5401fb4e..5a9b1c4d 100644 --- a/src/boot.clj +++ b/src/boot.clj @@ -35,12 +35,22 @@ ([comparator & args] (. clojure.lang.PersistentTreeMap (create comparator args)))) + +;;;;;;;;;;;;;;;;; metadata ;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn meta [#^clojure.lang.IObj x] (. x (meta))) (defn with-meta [#^clojure.lang.IObj x m] (. x (withMeta m))) +;;;;;;;;;;;;;;;;;;;; actors ;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn actor [state] + (new clojure.lang.Actor state)) + +(defn actor-of [state] + (:actor ^state)) + +;;;;;;;;;;;;;;;;;;;; (def defmacro (fn [name & args] (list 'do (cons 'defn (cons name args)) @@ -300,6 +310,10 @@ ([x form] `(. ~x ~form)) ([x form & more] `(.. (. ~x ~form) ~@more))) +(defmacro -> + ([x form] `(~(first form) ~x ~@(rest form))) + ([x form & more] `(-> ~(thisfn x form) ~@more))) + ;;multimethods (defmacro defmulti ([name dispatch-fn] (thisfn name dispatch-fn :default)) @@ -336,7 +350,7 @@ (defn ref [x] (new clojure.lang.Ref x)) -(defn deref [#^clojure.lang.Ref ref] +(defn deref [#^clojure.lang.IRef ref] (. ref (get))) (defn deref! [#^clojure.lang.Ref ref] @@ -742,7 +756,7 @@ complement constantly identity seq count peek pop nth contains get assoc dissoc find keys vals merge - rseq sym name namespace locking .. + rseq sym name namespace locking .. -> defmulti defmethod remove-method binding find-var ref deref deref! commute set sync @@ -762,5 +776,6 @@ int long float double short byte boolean aget aset aset-boolean aset-int aset-long aset-float aset-double aset-short aset-byte make-array + actor actor-of )) diff --git a/src/jvm/clojure/lang/Actor.java b/src/jvm/clojure/lang/Actor.java index 7c532f51..b2475bca 100644 --- a/src/jvm/clojure/lang/Actor.java +++ b/src/jvm/clojure/lang/Actor.java @@ -18,14 +18,15 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -public class Actor extends RestFn{ +public class Actor extends RestFn implements IRef{ volatile Object state; final Queue q = new LinkedList(); boolean busy = false; +final public Queue<Exception> errors = new LinkedBlockingQueue<Exception>(); //todo - make tuneable -final public static Queue errors = new LinkedBlockingQueue(); -final static Executor executor = Executors.newCachedThreadPool(); +final static Executor executor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); +//final static Executor executor = Executors.newCachedThreadPool(); final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); static class Action implements Runnable{ @@ -45,15 +46,15 @@ static class Action implements Runnable{ boolean hadError = false; try { - actor.state = fn.applyTo(RT.cons(actor, args)); + actor.setState(fn.applyTo(RT.cons(actor.state, args))); } catch(Exception e) { //todo report/callback - errors.add(e); + actor.errors.add(e); hadError = true; } - + if(!hadError) { for(ISeq s = nested.get().seq(); s != null; s = s.rest()) @@ -81,14 +82,30 @@ static class Action implements Runnable{ public Actor(Object state){ super(1); - this.state = state; + setState(state); +} + +void setState(Object newState){ + if(newState instanceof IObj) + { + IObj o = (IObj) newState; + if(RT.get(o.meta(), RT.ACTOR_KEY) != this) + { + newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.ACTOR_KEY, this)); + } + } + state = newState; } -public Object getState(){ +public Object get(){ return state; } -public Object doInvoke(Object fn, Object args){ +public Object doInvoke(Object fn, Object args) throws Exception{ + if(!errors.isEmpty()) + { + throw new Exception("Actor has errors", errors.peek()); + } Action action = new Action(this, (IFn) fn, (ISeq) args); LockingTransaction trans = LockingTransaction.getRunning(); if(trans != null) @@ -99,7 +116,7 @@ public Object doInvoke(Object fn, Object args){ } else enqueue(action); - + return this; } diff --git a/src/jvm/clojure/lang/IRef.java b/src/jvm/clojure/lang/IRef.java new file mode 100644 index 00000000..42a8185a --- /dev/null +++ b/src/jvm/clojure/lang/IRef.java @@ -0,0 +1,17 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Common Public License 1.0 (http://opensource.org/licenses/cpl.php) + * which can be found in the file CPL.TXT at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Nov 18, 2007 */ + +package clojure.lang; + +public interface IRef{ + Object get(); +} diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java index bdfedecc..82b31e56 100644 --- a/src/jvm/clojure/lang/RT.java +++ b/src/jvm/clojure/lang/RT.java @@ -25,7 +25,7 @@ final static public Var IN = Var.intern(Symbol.create("clojure", "*in*"), new LineNumberingPushbackReader(new InputStreamReader(System.in))); final static Keyword TAG_KEY = Keyword.intern("clojure", "tag"); - +final static Keyword ACTOR_KEY = Keyword.intern("clojure", "actor"); //final static public Var CURRENT_MODULE = Var.intern(Symbol.create("clojure", "current-module"), // Module.findOrCreateModule("clojure/user")); diff --git a/src/jvm/clojure/lang/Ref.java b/src/jvm/clojure/lang/Ref.java index cdbcba8a..cbbfe5a3 100644 --- a/src/jvm/clojure/lang/Ref.java +++ b/src/jvm/clojure/lang/Ref.java @@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.UUID; -public class Ref implements IFn, Comparable<Ref>{ +public class Ref implements IFn, Comparable<Ref>, IRef{ public int compareTo(Ref o){ return uuid.compareTo(o.uuid); diff --git a/src/jvm/clojure/lang/Var.java b/src/jvm/clojure/lang/Var.java index 338e3d95..ed176740 100644 --- a/src/jvm/clojure/lang/Var.java +++ b/src/jvm/clojure/lang/Var.java @@ -16,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -public final class Var implements IFn{ +public final class Var implements IFn, IRef{ static class Frame{ //Var->Box |