diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-07-22 15:43:40 -0400 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-07-22 15:43:40 -0400 |
commit | 26f5aed73c9cc2959beee0dd43a0d434fce83631 (patch) | |
tree | 99b69f47fcc112e142b77a32595a8604523da76f | |
parent | eb6207db21c8a8c4be78478b0ceb72c9a3efd359 (diff) |
initial fork/join support
incorporate jsr166y.jar in build
uses fj pool in agent
added clojure.par ns with pvmap and pvreduce
-rw-r--r-- | build.xml | 5 | ||||
-rw-r--r-- | src/clj/clojure/par.clj | 60 | ||||
-rw-r--r-- | src/jvm/clojure/lang/Agent.java | 23 | ||||
-rw-r--r-- | src/jvm/clojure/lang/FJTask.java | 51 |
4 files changed, 134 insertions, 5 deletions
@@ -79,13 +79,13 @@ <target name="compile-java" depends="init" description="Compile Java sources."> <javac srcdir="${jsrc}" destdir="${build}" includeJavaRuntime="yes" - debug="true" target="1.5"/> + debug="true" target="1.5" classpath="jsr166y.jar"/> </target> <target name="compile-clojure" depends="compile-java" description="Compile Clojure sources."> <java classname="clojure.lang.Compile" - classpath="${build}:${cljsrc}"> + classpath="${build}:${cljsrc}:jsr166y.jar"> <sysproperty key="clojure.compile.path" value="${build}"/> <arg value="clojure.core"/> <arg value="clojure.main"/> @@ -98,6 +98,7 @@ <arg value="clojure.template"/> <arg value="clojure.test"/> <arg value="clojure.test.tap"/> + <arg value="clojure.par"/> </java> </target> diff --git a/src/clj/clojure/par.clj b/src/clj/clojure/par.clj new file mode 100644 index 00000000..8fad17c8 --- /dev/null +++ b/src/clj/clojure/par.clj @@ -0,0 +1,60 @@ +; Copyright (c) Rich Hickey. All rights reserved. +; The use and distribution terms for this software are covered by the +; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) +; which can be found in the file epl-v10.html at the root of this distribution. +; By using this software in any fashion, you are agreeing to be bound by +; the terms of this license. +; You must not remove this notice, or any other, from this software. + +(ns clojure.par + (:import (jsr166y ForkJoinTask ForkJoinPool RecursiveTask) + (clojure.lang PersistentVector PersistentVector$Node FJTask) + (java.util Collection))) + +(def #^{:tag ForkJoinPool :private true} pool (ForkJoinPool.)) + +(defmacro fjtask [& body] + `(FJTask. (fn [] ~@body))) + +(defn- fjvtree [#^PersistentVector v combine-fn leaf-fn] + (let [tfn (fn tfn [shift #^PersistentVector$Node node] + (fjtask + (let [nodes (remove nil? (.array node))] + (if (= shift PersistentVector/SHIFT) + (let [lts (reduce #(cons (doto (fjtask (leaf-fn %2)) .fork) %1) + () nodes)] + (combine-fn (reduce (fn [rets #^FJTask t] + (cons (if (.tryUnfork t) + (.compute t) + (do (.join t) (.get t))) rets)) + () lts))) + (let [tasks (map #(tfn (- shift PersistentVector/SHIFT) %) nodes)] + (ForkJoinTask/invokeAll #^Collection tasks) + (combine-fn (map #(.get #^ForkJoinTask %) tasks))))))) + task #^ForkJoinTask (tfn (.shift v) (.root v))] + (if (ForkJoinTask/getPool) ;nested task + (.invoke task) + (.invoke pool task)))) + +(defn pvmap [f #^PersistentVector v] + (let [new-node #(PersistentVector$Node. (.. v root edit) %) + new-root (fjvtree v + #(new-node (to-array %)) + #(new-node + (amap (.array #^PersistentVector$Node %) i a + (f (aget a i))))) + new-tail (to-array (map f (.tail v)))] + (PersistentVector. (.cnt v) (.shift v) new-root new-tail))) + +(defn pvreduce [f #^PersistentVector v] + (if (<= (count v) PersistentVector/CHUNK) + (reduce f v) + (let [tr (fjvtree v + #(reduce f %) + #(let [a (.array #^PersistentVector$Node %)] + (loop [ret (aget a 0) i (int 1)] + (if (< i PersistentVector/CHUNK) + (recur (f ret (aget a i)) (inc i)) + ret))))] + (f tr (reduce f (.tail v)))))) + diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 310c826b..df06acd8 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -12,6 +12,8 @@ package clojure.lang; +import jsr166y.ForkJoinPool; + import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import java.util.Map; @@ -22,13 +24,28 @@ volatile Object state; volatile ISeq errors = null; -final public static ExecutorService pooledExecutor = - Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); - +//these are public for diagnostic reasons, don't mess with them otherwise +volatile public static ExecutorService pooledExecutor; final public static ExecutorService soloExecutor = Executors.newCachedThreadPool(); final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>(); +static + { + boolean gotfj = true; + try + { + Class fjpool = Class.forName("jsr166y.ForkJoinPool"); + pooledExecutor = (ExecutorService) fjpool.newInstance(); + ((ForkJoinPool)pooledExecutor).setAsyncMode(true); + } + catch(Exception e) + { + gotfj = false; + } + if(!gotfj) + pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); + } public static void shutdown(){ soloExecutor.shutdown(); diff --git a/src/jvm/clojure/lang/FJTask.java b/src/jvm/clojure/lang/FJTask.java new file mode 100644 index 00000000..c9f33789 --- /dev/null +++ b/src/jvm/clojure/lang/FJTask.java @@ -0,0 +1,51 @@ +/** + * Copyright (c) Rich Hickey. All rights reserved. + * The use and distribution terms for this software are covered by the + * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) + * which can be found in the file epl-v10.html at the root of this distribution. + * By using this software in any fashion, you are agreeing to be bound by + * the terms of this license. + * You must not remove this notice, or any other, from this software. + **/ + +/* rich Jul 21, 2009 */ + +package clojure.lang; + +import jsr166y.ForkJoinTask; + +import java.util.concurrent.Callable; + +public class FJTask extends ForkJoinTask<Object>{ +final Callable f; +Object result; + +public FJTask(Callable f){ + this.f = f; +} + +public Object compute(){ + try + { + return f.call(); + } + catch(Exception e) + { + throw new RuntimeException(e); + } +} + +public final Object getRawResult(){ + return result; +} + +protected final void setRawResult(Object value){ + result = value; +} + +protected final boolean exec(){ + result = compute(); + return true; +} + +} |