summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2009-07-22 15:43:40 -0400
committerRich Hickey <richhickey@gmail.com>2009-07-22 15:43:40 -0400
commit26f5aed73c9cc2959beee0dd43a0d434fce83631 (patch)
tree99b69f47fcc112e142b77a32595a8604523da76f
parenteb6207db21c8a8c4be78478b0ceb72c9a3efd359 (diff)
initial fork/join support
incorporate jsr166y.jar in build uses fj pool in agent added clojure.par ns with pvmap and pvreduce
-rw-r--r--build.xml5
-rw-r--r--src/clj/clojure/par.clj60
-rw-r--r--src/jvm/clojure/lang/Agent.java23
-rw-r--r--src/jvm/clojure/lang/FJTask.java51
4 files changed, 134 insertions, 5 deletions
diff --git a/build.xml b/build.xml
index b37adefc..ebbffe03 100644
--- a/build.xml
+++ b/build.xml
@@ -79,13 +79,13 @@
<target name="compile-java" depends="init"
description="Compile Java sources.">
<javac srcdir="${jsrc}" destdir="${build}" includeJavaRuntime="yes"
- debug="true" target="1.5"/>
+ debug="true" target="1.5" classpath="jsr166y.jar"/>
</target>
<target name="compile-clojure" depends="compile-java"
description="Compile Clojure sources.">
<java classname="clojure.lang.Compile"
- classpath="${build}:${cljsrc}">
+ classpath="${build}:${cljsrc}:jsr166y.jar">
<sysproperty key="clojure.compile.path" value="${build}"/>
<arg value="clojure.core"/>
<arg value="clojure.main"/>
@@ -98,6 +98,7 @@
<arg value="clojure.template"/>
<arg value="clojure.test"/>
<arg value="clojure.test.tap"/>
+ <arg value="clojure.par"/>
</java>
</target>
diff --git a/src/clj/clojure/par.clj b/src/clj/clojure/par.clj
new file mode 100644
index 00000000..8fad17c8
--- /dev/null
+++ b/src/clj/clojure/par.clj
@@ -0,0 +1,60 @@
+; Copyright (c) Rich Hickey. All rights reserved.
+; The use and distribution terms for this software are covered by the
+; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+; which can be found in the file epl-v10.html at the root of this distribution.
+; By using this software in any fashion, you are agreeing to be bound by
+; the terms of this license.
+; You must not remove this notice, or any other, from this software.
+
+(ns clojure.par
+ (:import (jsr166y ForkJoinTask ForkJoinPool RecursiveTask)
+ (clojure.lang PersistentVector PersistentVector$Node FJTask)
+ (java.util Collection)))
+
+(def #^{:tag ForkJoinPool :private true} pool (ForkJoinPool.))
+
+(defmacro fjtask [& body]
+ `(FJTask. (fn [] ~@body)))
+
+(defn- fjvtree [#^PersistentVector v combine-fn leaf-fn]
+ (let [tfn (fn tfn [shift #^PersistentVector$Node node]
+ (fjtask
+ (let [nodes (remove nil? (.array node))]
+ (if (= shift PersistentVector/SHIFT)
+ (let [lts (reduce #(cons (doto (fjtask (leaf-fn %2)) .fork) %1)
+ () nodes)]
+ (combine-fn (reduce (fn [rets #^FJTask t]
+ (cons (if (.tryUnfork t)
+ (.compute t)
+ (do (.join t) (.get t))) rets))
+ () lts)))
+ (let [tasks (map #(tfn (- shift PersistentVector/SHIFT) %) nodes)]
+ (ForkJoinTask/invokeAll #^Collection tasks)
+ (combine-fn (map #(.get #^ForkJoinTask %) tasks)))))))
+ task #^ForkJoinTask (tfn (.shift v) (.root v))]
+ (if (ForkJoinTask/getPool) ;nested task
+ (.invoke task)
+ (.invoke pool task))))
+
+(defn pvmap [f #^PersistentVector v]
+ (let [new-node #(PersistentVector$Node. (.. v root edit) %)
+ new-root (fjvtree v
+ #(new-node (to-array %))
+ #(new-node
+ (amap (.array #^PersistentVector$Node %) i a
+ (f (aget a i)))))
+ new-tail (to-array (map f (.tail v)))]
+ (PersistentVector. (.cnt v) (.shift v) new-root new-tail)))
+
+(defn pvreduce [f #^PersistentVector v]
+ (if (<= (count v) PersistentVector/CHUNK)
+ (reduce f v)
+ (let [tr (fjvtree v
+ #(reduce f %)
+ #(let [a (.array #^PersistentVector$Node %)]
+ (loop [ret (aget a 0) i (int 1)]
+ (if (< i PersistentVector/CHUNK)
+ (recur (f ret (aget a i)) (inc i))
+ ret))))]
+ (f tr (reduce f (.tail v))))))
+
diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java
index 310c826b..df06acd8 100644
--- a/src/jvm/clojure/lang/Agent.java
+++ b/src/jvm/clojure/lang/Agent.java
@@ -12,6 +12,8 @@
package clojure.lang;
+import jsr166y.ForkJoinPool;
+
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Map;
@@ -22,13 +24,28 @@ volatile Object state;
volatile ISeq errors = null;
-final public static ExecutorService pooledExecutor =
- Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
-
+//these are public for diagnostic reasons, don't mess with them otherwise
+volatile public static ExecutorService pooledExecutor;
final public static ExecutorService soloExecutor = Executors.newCachedThreadPool();
final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
+static
+ {
+ boolean gotfj = true;
+ try
+ {
+ Class fjpool = Class.forName("jsr166y.ForkJoinPool");
+ pooledExecutor = (ExecutorService) fjpool.newInstance();
+ ((ForkJoinPool)pooledExecutor).setAsyncMode(true);
+ }
+ catch(Exception e)
+ {
+ gotfj = false;
+ }
+ if(!gotfj)
+ pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
+ }
public static void shutdown(){
soloExecutor.shutdown();
diff --git a/src/jvm/clojure/lang/FJTask.java b/src/jvm/clojure/lang/FJTask.java
new file mode 100644
index 00000000..c9f33789
--- /dev/null
+++ b/src/jvm/clojure/lang/FJTask.java
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) Rich Hickey. All rights reserved.
+ * The use and distribution terms for this software are covered by the
+ * Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+ * which can be found in the file epl-v10.html at the root of this distribution.
+ * By using this software in any fashion, you are agreeing to be bound by
+ * the terms of this license.
+ * You must not remove this notice, or any other, from this software.
+ **/
+
+/* rich Jul 21, 2009 */
+
+package clojure.lang;
+
+import jsr166y.ForkJoinTask;
+
+import java.util.concurrent.Callable;
+
+public class FJTask extends ForkJoinTask<Object>{
+final Callable f;
+Object result;
+
+public FJTask(Callable f){
+ this.f = f;
+}
+
+public Object compute(){
+ try
+ {
+ return f.call();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+}
+
+public final Object getRawResult(){
+ return result;
+}
+
+protected final void setRawResult(Object value){
+ result = value;
+}
+
+protected final boolean exec(){
+ result = compute();
+ return true;
+}
+
+}