summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2008-09-15 17:37:29 +0000
committerRich Hickey <richhickey@gmail.com>2008-09-15 17:37:29 +0000
commitc72a12488f0a65bc13f9b31fee73dc43ea33a70d (patch)
tree4be8feaa736c647e3f03df7c26121ff972ae520c /src
parent49c9f8960ff07bf53a3a7eedbc52f7754b9cab3e (diff)
added lazy parallel pmap
Diffstat (limited to 'src')
-rw-r--r--src/clj/clojure/boot.clj63
1 files changed, 24 insertions, 39 deletions
diff --git a/src/clj/clojure/boot.clj b/src/clj/clojure/boot.clj
index 09648451..921a22df 100644
--- a/src/clj/clojure/boot.clj
+++ b/src/clj/clojure/boot.clj
@@ -1779,45 +1779,6 @@
(recur (inc i) (rest xs))))
ret))
-(import '(java.util.concurrent Executors LinkedBlockingQueue))
-
-(defn pmap
- ([f coll]
- (let [nthreads (.. Runtime (getRuntime) (availableProcessors))
- exec (. Executors (newFixedThreadPool nthreads))
- todo (ref (seq coll))
- out (ref 0)
- q (new LinkedBlockingQueue)
- produce (fn []
- (let [job (sync nil
- (when @todo
- (let [item (first @todo)]
- (alter todo rest)
- (commute out inc)
- (list item))))]
- (when job
- (. q (put (f (first job))))
- (recur))))
- tasks (doseq dnu (map (fn [task]
- (. exec (submit #^java.util.concurrent.Callable task)))
- (replicate nthreads produce)))
- consume (fn thisfn []
- (if (sync nil (and (or @todo (pos? @out))
- (commute out dec)))
- (fnseq (. q (take)) thisfn)
- (do
- (. exec (shutdown))
- (doseq x tasks)
- nil)))]
- (consume)))
- ([f coll & colls]
- (pmap (fn [items] (apply f items))
- (let [encl-fn (fn thisfn [collseq]
- (when (every? seq collseq)
- (lazy-cons (map first collseq)
- (thisfn (map rest collseq)))))]
- (encl-fn (cons coll colls))))))
-
(defn macroexpand-1
"If form represents a macro form, returns its expansion,
else returns form."
@@ -3334,3 +3295,27 @@
(defn odd?
"Returns true if n is odd, throws an exception if n is not an integer"
[n] (not (even? n)))
+
+(defn pmap
+ "Like map, except f is applied in parallel. Semi-lazy in that the
+ parallel computation stays ahead of the consumption, but doesn't
+ realize the entire result unless required. Only useful for
+ computationally intensive functions where the time of f dominates
+ the coordination overhead."
+ ([f coll]
+ (let [n (inc (.. Runtime getRuntime availableProcessors))
+ agents (doall (map #(agent (f %)) (take n coll)))
+ wget (fn [a] (await1 a) @a)
+ step (fn step [[x & xs :as s]
+ [a & as :as acycle]]
+ (if s
+ (let [v (wget a)]
+ (send a (fn [_] (f x)))
+ (lazy-cons v (step xs as)))
+ (map wget (take (count agents) acycle))))]
+ (step (drop n coll) (cycle agents))))
+ ([f coll & colls]
+ (let [step (fn step [cs]
+ (when (every? seq cs)
+ (lazy-cons (map first cs) (step (map rest cs)))))]
+ (pmap #(apply f %) (step (cons coll colls))))))