summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-12-12 23:24:28 +0000
committerRich Hickey <richhickey@gmail.com>2007-12-12 23:24:28 +0000
commitaec380c321f7fd8d28b2cd45082a3502208a01c2 (patch)
tree7ac8c1a69d0fbbbeb956ec431995e5a60d565eca /src
parent96fcb4d72b6d5214c7da945b948a3da5188e1530 (diff)
thread pool tweaking
Diffstat (limited to 'src')
-rw-r--r--src/jvm/clojure/lang/Agent.java100
1 files changed, 91 insertions, 9 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 68e3afa6..af494e36 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -12,8 +12,6 @@
package clojure.lang;
-import java.util.Queue;
-import java.util.LinkedList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
@@ -25,17 +23,99 @@ AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY)
volatile ISeq errors = null;
//todo - make tuneable
+//final public static ThreadPoolExecutor executor =
+// new ThreadPoolExecutor(
+// 2 * Runtime.getRuntime().availableProcessors(),
+// 2 * Runtime.getRuntime().availableProcessors(),
+// 0L, TimeUnit.MILLISECONDS,
+// new LinkedBlockingQueue<Runnable>());
final public static ThreadPoolExecutor executor =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
-//2 * Runtime.getRuntime().availableProcessors(),
-Integer.MAX_VALUE,//2 * Runtime.getRuntime().availableProcessors(),
-200L, TimeUnit.MILLISECONDS,
-new SynchronousQueue());
+ Integer.MAX_VALUE,
+ 200L, TimeUnit.MILLISECONDS,
+ new SynchronousQueue());
+
// new LinkedBlockingQueue<Runnable>());
//Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
//final static Executor executor = Executors.newCachedThreadPool();
final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>();
+static class ThreadPool{
+ static class Worker implements Runnable{
+ Runnable task;
+
+ public Worker(Runnable task){
+ this.task = task;
+ }
+
+ public void newTask(Runnable task){
+ synchronized(this)
+ {
+ this.task = task;
+ notify();
+ }
+ }
+
+ public void run(){
+ for(; ;)
+ {
+ task.run();
+ synchronized(this)
+ {
+ IPersistentStack workers = ThreadPool.workers.get();
+ if(workers.count() < ThreadPool.workerReserve)
+ {
+ boolean pushed = false;
+ while(!pushed)
+ {
+ IPersistentStack prior = ThreadPool.workers.get();
+ IPersistentStack next = (IPersistentStack) prior.cons(this);
+ pushed = ThreadPool.workers.compareAndSet(prior, next);
+ }
+ task = null;
+ try
+ {
+ while(task == null)
+ wait();
+ }
+ catch(InterruptedException e)
+ {
+ break;
+ }
+ }
+ else //have enough reserve workers, die
+ break;
+ }
+ }
+ }
+ }
+
+ static int workerReserve = 2 * Runtime.getRuntime().availableProcessors();
+ static AtomicReference<IPersistentStack> workers = new AtomicReference(PersistentList.EMPTY);
+
+ static void execute(Runnable r){
+ IPersistentStack ws = null;
+ while(ws == null)
+ {
+ ws = workers.get();
+ if(ws.count() > 0)
+ {
+ IPersistentStack popped = ws.pop();
+ if(!workers.compareAndSet(ws, popped))
+ ws = null;
+ }
+ }
+ if(ws.count() > 0)
+ {
+ Worker worker = (Worker) ws.peek();
+ worker.newTask(r);
+ }
+ else
+ (new Thread(new Worker(r))).start();
+
+ }
+}
+
static class Action implements Runnable{
final Agent agent;
final IFn fn;
@@ -82,8 +162,9 @@ static class Action implements Runnable{
popped = action.agent.q.compareAndSet(prior, next);
}
-// if(next.count() > 0)
-// executor.execute((Runnable) next.peek());
+// if(next.count() > 0)
+// executor.execute((Runnable) next.peek());
+// action = null;
action = (Action) next.peek();
nested.set(null);
}
@@ -155,7 +236,8 @@ void enqueue(Action action){
}
if(prior.count() == 0)
- executor.execute(action);
+// executor.execute(action);
+ ThreadPool.execute(action);
}
}