summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2008-03-13 19:24:06 +0000
committerRich Hickey <richhickey@gmail.com>2008-03-13 19:24:06 +0000
commit1527b871c6edb798c7b8877d95864fbba3aca5fc (patch)
treefd1371fbc0fda04d38827a64e78ff75385a13815 /src
parent2e300c1a0a82c39d5e6b5766f1e8df422ebf31b1 (diff)
renamed ! to send, added send-off, *agent* is currently running agent
Diffstat (limited to 'src')
-rw-r--r--src/boot.clj17
-rw-r--r--src/jvm/clojure/lang/Agent.java136
-rw-r--r--src/jvm/clojure/lang/RT.java2
3 files changed, 40 insertions, 115 deletions
diff --git a/src/boot.clj b/src/boot.clj
index 2870a450..ae23b3e4 100644
--- a/src/boot.clj
+++ b/src/boot.clj
@@ -834,18 +834,25 @@
"Creates and returns an agent with an initial value of state."
[state] (new clojure.lang.Agent state))
-(defn agent-of
- {:private true}
- [state] (:agent ^state))
+(defn ! [& args] (throw (new Exception "! is now send. See also send-off")))
-(defn !
+(defn send
"Dispatch an action to an agent. Returns the agent immediately.
Subsequently, in a thread in a thread pool, the state of the will be
set to the value of:
(apply action-fn state-of-agent args)"
[#^clojure.lang.Agent a f & args]
- (. a (dispatch f args)))
+ (. a (dispatch f args false)))
+
+(defn send-off
+ "Dispatch a potentially blocking action to an agent. Returns the
+ agent immediately. Subsequently, in a separate thread, the state of
+ the will be set to the value of:
+
+ (apply action-fn state-of-agent args)"
+ [#^clojure.lang.Agent a f & args]
+ (. a (dispatch f args true)))
(defn agent-errors
"Returns a sequence of the exceptions thrown during asynchronous
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index af494e36..0fd4f628 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -17,121 +17,45 @@ import java.util.concurrent.atomic.AtomicReference;
public class Agent implements IRef{
volatile Object state;
-//final Queue q = new LinkedList();
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
-//boolean busy = false;
volatile ISeq errors = null;
-//todo - make tuneable
-//final public static ThreadPoolExecutor executor =
-// new ThreadPoolExecutor(
-// 2 * Runtime.getRuntime().availableProcessors(),
-// 2 * Runtime.getRuntime().availableProcessors(),
-// 0L, TimeUnit.MILLISECONDS,
-// new LinkedBlockingQueue<Runnable>());
-final public static ThreadPoolExecutor executor =
- new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
- Integer.MAX_VALUE,
- 200L, TimeUnit.MILLISECONDS,
- new SynchronousQueue());
-
-// new LinkedBlockingQueue<Runnable>());
-//Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
-//final static Executor executor = Executors.newCachedThreadPool();
-final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
-
-static class ThreadPool{
- static class Worker implements Runnable{
- Runnable task;
- public Worker(Runnable task){
- this.task = task;
- }
-
- public void newTask(Runnable task){
- synchronized(this)
- {
- this.task = task;
- notify();
- }
- }
-
- public void run(){
- for(; ;)
- {
- task.run();
- synchronized(this)
- {
- IPersistentStack workers = ThreadPool.workers.get();
- if(workers.count() < ThreadPool.workerReserve)
- {
- boolean pushed = false;
- while(!pushed)
- {
- IPersistentStack prior = ThreadPool.workers.get();
- IPersistentStack next = (IPersistentStack) prior.cons(this);
- pushed = ThreadPool.workers.compareAndSet(prior, next);
- }
- task = null;
- try
- {
- while(task == null)
- wait();
- }
- catch(InterruptedException e)
- {
- break;
- }
- }
- else //have enough reserve workers, die
- break;
- }
- }
- }
- }
+final public static Executor pooledExecutor =
+ Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
- static int workerReserve = 2 * Runtime.getRuntime().availableProcessors();
- static AtomicReference<IPersistentStack> workers = new AtomicReference(PersistentList.EMPTY);
+final static Executor soloExecutor = Executors.newCachedThreadPool();
- static void execute(Runnable r){
- IPersistentStack ws = null;
- while(ws == null)
- {
- ws = workers.get();
- if(ws.count() > 0)
- {
- IPersistentStack popped = ws.pop();
- if(!workers.compareAndSet(ws, popped))
- ws = null;
- }
- }
- if(ws.count() > 0)
- {
- Worker worker = (Worker) ws.peek();
- worker.newTask(r);
- }
- else
- (new Thread(new Worker(r))).start();
+final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
- }
-}
static class Action implements Runnable{
final Agent agent;
final IFn fn;
final ISeq args;
+ final boolean solo;
- public Action(Agent agent, IFn fn, ISeq args){
+ public Action(Agent agent, IFn fn, ISeq args, boolean solo){
this.agent = agent;
this.args = args;
this.fn = fn;
+ this.solo = solo;
+ }
+
+ void execute(){
+ if(solo)
+ soloExecutor.execute(this);
+ else
+ pooledExecutor.execute(this);
}
static void doRun(Action action){
- while(action != null)
+ try
{
+ Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);
+
boolean hadError = false;
try
{
@@ -162,11 +86,14 @@ static class Action implements Runnable{
popped = action.agent.q.compareAndSet(prior, next);
}
-// if(next.count() > 0)
-// executor.execute((Runnable) next.peek());
-// action = null;
- action = (Action) next.peek();
+ if(next.count() > 0)
+ ((Action) next.peek()).execute();
+
+ }
+ finally
+ {
nested.set(null);
+ Var.popThreadBindings();
}
}
@@ -180,14 +107,6 @@ public Agent(Object state){
}
void setState(Object newState){
- if(newState instanceof IObj)
- {
- IObj o = (IObj) newState;
- if(RT.get(o.meta(), RT.AGENT_KEY) != this)
- {
- newState = o.withMeta((IPersistentMap) RT.assoc(o.meta(), RT.AGENT_KEY, this));
- }
- }
state = newState;
}
@@ -207,12 +126,12 @@ public void clearErrors(){
errors = null;
}
-public Object dispatch(IFn fn, ISeq args) throws Exception{
+public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{
if(errors != null)
{
throw new Exception("Agent has errors", (Exception) RT.first(errors));
}
- Action action = new Action(this, fn, args);
+ Action action = new Action(this, fn, args, solo);
LockingTransaction trans = LockingTransaction.getRunning();
if(trans != null)
trans.enqueue(action);
@@ -236,8 +155,7 @@ void enqueue(Action action){
}
if(prior.count() == 0)
-// executor.execute(action);
- ThreadPool.execute(action);
+ action.execute();
}
}
diff --git a/src/jvm/clojure/lang/RT.java b/src/jvm/clojure/lang/RT.java
index 7fecdd49..905f0d84 100644
--- a/src/jvm/clojure/lang/RT.java
+++ b/src/jvm/clojure/lang/RT.java
@@ -107,7 +107,7 @@ final static public Var IN =
Var.intern(CLOJURE_NS, Symbol.create("*in*"),
new LineNumberingPushbackReader(new InputStreamReader(System.in)));
final static Keyword TAG_KEY = Keyword.intern(null, "tag");
-final static Keyword AGENT_KEY = Keyword.intern("clojure", "agent");
+final static public Var AGENT = Var.intern(CLOJURE_NS, Symbol.create("*agent*"), null);
static Keyword LINE_KEY = Keyword.intern(null, "line");
static Keyword FILE_KEY = Keyword.intern(null, "file");
//final static public Var CURRENT_MODULE = Var.intern(Symbol.create("clojure", "current-module"),