diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-12-12 23:24:28 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-12-12 23:24:28 +0000 |
commit | aec380c321f7fd8d28b2cd45082a3502208a01c2 (patch) | |
tree | 7ac8c1a69d0fbbbeb956ec431995e5a60d565eca /src | |
parent | 96fcb4d72b6d5214c7da945b948a3da5188e1530 (diff) |
thread pool tweaking
Diffstat (limited to 'src')
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 100 |
1 files changed, 91 insertions, 9 deletions
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 68e3afa6..af494e36 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -12,8 +12,6 @@ package clojure.lang; -import java.util.Queue; -import java.util.LinkedList; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -25,17 +23,99 @@ AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY) volatile ISeq errors = null; //todo - make tuneable +//final public static ThreadPoolExecutor executor = +// new ThreadPoolExecutor( +// 2 * Runtime.getRuntime().availableProcessors(), +// 2 * Runtime.getRuntime().availableProcessors(), +// 0L, TimeUnit.MILLISECONDS, +// new LinkedBlockingQueue<Runnable>()); final public static ThreadPoolExecutor executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), -//2 * Runtime.getRuntime().availableProcessors(), -Integer.MAX_VALUE,//2 * Runtime.getRuntime().availableProcessors(), -200L, TimeUnit.MILLISECONDS, -new SynchronousQueue()); + Integer.MAX_VALUE, + 200L, TimeUnit.MILLISECONDS, + new SynchronousQueue()); + // new LinkedBlockingQueue<Runnable>()); //Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); //final static Executor executor = Executors.newCachedThreadPool(); final static ThreadLocal<PersistentVector> nested = new ThreadLocal<PersistentVector>(); +static class ThreadPool{ + static class Worker implements Runnable{ + Runnable task; + + public Worker(Runnable task){ + this.task = task; + } + + public void newTask(Runnable task){ + synchronized(this) + { + this.task = task; + notify(); + } + } + + public void run(){ + for(; ;) + { + task.run(); + synchronized(this) + { + IPersistentStack workers = ThreadPool.workers.get(); + if(workers.count() < ThreadPool.workerReserve) + { + boolean pushed = false; + while(!pushed) + { + IPersistentStack prior = ThreadPool.workers.get(); + IPersistentStack next = (IPersistentStack) prior.cons(this); + pushed = ThreadPool.workers.compareAndSet(prior, next); + } + task = null; + try + { + while(task == null) + wait(); + } + catch(InterruptedException e) + { + break; + } + } + else //have enough reserve workers, die + break; + } + } + } + } + + static int workerReserve = 2 * Runtime.getRuntime().availableProcessors(); + static AtomicReference<IPersistentStack> workers = new AtomicReference(PersistentList.EMPTY); + + static void execute(Runnable r){ + IPersistentStack ws = null; + while(ws == null) + { + ws = workers.get(); + if(ws.count() > 0) + { + IPersistentStack popped = ws.pop(); + if(!workers.compareAndSet(ws, popped)) + ws = null; + } + } + if(ws.count() > 0) + { + Worker worker = (Worker) ws.peek(); + worker.newTask(r); + } + else + (new Thread(new Worker(r))).start(); + + } +} + static class Action implements Runnable{ final Agent agent; final IFn fn; @@ -82,8 +162,9 @@ static class Action implements Runnable{ popped = action.agent.q.compareAndSet(prior, next); } -// if(next.count() > 0) -// executor.execute((Runnable) next.peek()); +// if(next.count() > 0) +// executor.execute((Runnable) next.peek()); +// action = null; action = (Action) next.peek(); nested.set(null); } @@ -155,7 +236,8 @@ void enqueue(Action action){ } if(prior.count() == 0) - executor.execute(action); +// executor.execute(action); + ThreadPool.execute(action); } } |