diff options
author | Rich Hickey <richhickey@gmail.com> | 2007-10-28 18:48:06 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2007-10-28 18:48:06 +0000 |
commit | 63eb1b3181e42b7dd827a485a7a7dd10f308574f (patch) | |
tree | 0aec9c0fe3decee947dd536468c99cf66b6c6776 /src | |
parent | 7b1999c497f9e39c01a2b9f5d628f22df701c0e3 (diff) |
added pmap, made doseq return seq
Diffstat (limited to 'src')
-rw-r--r-- | src/boot.clj | 52 |
1 files changed, 46 insertions, 6 deletions
diff --git a/src/boot.clj b/src/boot.clj index 587d7098..3b29b51f 100644 --- a/src/boot.clj +++ b/src/boot.clj @@ -498,11 +498,13 @@ (def *imports* (apply merge imports-maps))) (defmacro doseq [item list & body] - `(loop [list# (seq ~list)] - (when list# - (let [~item (first list#)] - ~@body) - (recur (rest list#))))) + `(let [ret# (seq ~list)] + (loop [list# ret#] + (when list# + (let [~item (first list#)] + ~@body) + (recur (rest list#)))) + ret#)) (defmacro dotimes [i n & body] `(loop [~i 0 n# ~n] @@ -621,6 +623,44 @@ (prn (strcat "Elapsed time: " (/ (- (. System (nanoTime)) start#) 1000000.0) " msecs")) ret#)) +(import '(java.util.concurrent Executors LinkedBlockingQueue)) + +(defn pmap + ([f coll] + (let [nthreads (.. Runtime (getRuntime) (availableProcessors)) + exec (. Executors (newFixedThreadPool nthreads)) + todo (ref (seq coll)) + out (ref 0) + q (new LinkedBlockingQueue) + produce (fn [] + (let [job (sync nil + (when @todo + (let [item (first @todo)] + (set todo (rest @todo)) + (commute out inc) + (list item))))] + (when job + (. q (put (f (first job)))) + (recur)))) + tasks (doseq dnu (map (fn [task] + (. exec (submit task))) + (replicate nthreads produce))) + consume (fn [] + (if (sync nil (and (or @todo (pos? @out)) (commute out dec))) + (fnseq (. q (take)) thisfn) + (do + (. exec (shutdown)) + (doseq x tasks) + nil)))] + (consume))) + ([f coll & colls] + (thisfn (fn [items] (apply f items)) + ((fn [collseq] + (when (every seq collseq) + (let [encl-fn thisfn] + (lazy-cons (map first collseq) (encl-fn (map rest collseq)))))) + (cons coll colls))))) + (def *exports* '(clojure load-file eql-ref? @@ -642,7 +682,7 @@ ref deref deref! commute set sync reduce reverse comp appl every not-every any not-any - map mapcat filter take take-while drop drop-while + map pmap mapcat filter take take-while drop drop-while zipmap cycle split-at split-with repeat replicate iterate range doseq dotimes into |