diff options
author | Rich Hickey <richhickey@gmail.com> | 2008-09-15 17:37:29 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2008-09-15 17:37:29 +0000 |
commit | c72a12488f0a65bc13f9b31fee73dc43ea33a70d (patch) | |
tree | 4be8feaa736c647e3f03df7c26121ff972ae520c /src | |
parent | 49c9f8960ff07bf53a3a7eedbc52f7754b9cab3e (diff) |
added lazy parallel pmap
Diffstat (limited to 'src')
-rw-r--r-- | src/clj/clojure/boot.clj | 63 |
1 files changed, 24 insertions, 39 deletions
diff --git a/src/clj/clojure/boot.clj b/src/clj/clojure/boot.clj index 09648451..921a22df 100644 --- a/src/clj/clojure/boot.clj +++ b/src/clj/clojure/boot.clj @@ -1779,45 +1779,6 @@ (recur (inc i) (rest xs)))) ret)) -(import '(java.util.concurrent Executors LinkedBlockingQueue)) - -(defn pmap - ([f coll] - (let [nthreads (.. Runtime (getRuntime) (availableProcessors)) - exec (. Executors (newFixedThreadPool nthreads)) - todo (ref (seq coll)) - out (ref 0) - q (new LinkedBlockingQueue) - produce (fn [] - (let [job (sync nil - (when @todo - (let [item (first @todo)] - (alter todo rest) - (commute out inc) - (list item))))] - (when job - (. q (put (f (first job)))) - (recur)))) - tasks (doseq dnu (map (fn [task] - (. exec (submit #^java.util.concurrent.Callable task))) - (replicate nthreads produce))) - consume (fn thisfn [] - (if (sync nil (and (or @todo (pos? @out)) - (commute out dec))) - (fnseq (. q (take)) thisfn) - (do - (. exec (shutdown)) - (doseq x tasks) - nil)))] - (consume))) - ([f coll & colls] - (pmap (fn [items] (apply f items)) - (let [encl-fn (fn thisfn [collseq] - (when (every? seq collseq) - (lazy-cons (map first collseq) - (thisfn (map rest collseq)))))] - (encl-fn (cons coll colls)))))) - (defn macroexpand-1 "If form represents a macro form, returns its expansion, else returns form." @@ -3334,3 +3295,27 @@ (defn odd? "Returns true if n is odd, throws an exception if n is not an integer" [n] (not (even? n))) + +(defn pmap + "Like map, except f is applied in parallel. Semi-lazy in that the + parallel computation stays ahead of the consumption, but doesn't + realize the entire result unless required. Only useful for + computationally intensive functions where the time of f dominates + the coordination overhead." + ([f coll] + (let [n (inc (.. Runtime getRuntime availableProcessors)) + agents (doall (map #(agent (f %)) (take n coll))) + wget (fn [a] (await1 a) @a) + step (fn step [[x & xs :as s] + [a & as :as acycle]] + (if s + (let [v (wget a)] + (send a (fn [_] (f x))) + (lazy-cons v (step xs as))) + (map wget (take (count agents) acycle))))] + (step (drop n coll) (cycle agents)))) + ([f coll & colls] + (let [step (fn step [cs] + (when (every? seq cs) + (lazy-cons (map first cs) (step (map rest cs)))))] + (pmap #(apply f %) (step (cons coll colls)))))) |