summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2007-10-28 18:48:06 +0000
committerRich Hickey <richhickey@gmail.com>2007-10-28 18:48:06 +0000
commit63eb1b3181e42b7dd827a485a7a7dd10f308574f (patch)
tree0aec9c0fe3decee947dd536468c99cf66b6c6776 /src
parent7b1999c497f9e39c01a2b9f5d628f22df701c0e3 (diff)
added pmap, made doseq return seq
Diffstat (limited to 'src')
-rw-r--r--src/boot.clj52
1 files changed, 46 insertions, 6 deletions
diff --git a/src/boot.clj b/src/boot.clj
index 587d7098..3b29b51f 100644
--- a/src/boot.clj
+++ b/src/boot.clj
@@ -498,11 +498,13 @@
(def *imports* (apply merge imports-maps)))
(defmacro doseq [item list & body]
- `(loop [list# (seq ~list)]
- (when list#
- (let [~item (first list#)]
- ~@body)
- (recur (rest list#)))))
+ `(let [ret# (seq ~list)]
+ (loop [list# ret#]
+ (when list#
+ (let [~item (first list#)]
+ ~@body)
+ (recur (rest list#))))
+ ret#))
(defmacro dotimes [i n & body]
`(loop [~i 0 n# ~n]
@@ -621,6 +623,44 @@
(prn (strcat "Elapsed time: " (/ (- (. System (nanoTime)) start#) 1000000.0) " msecs"))
ret#))
+(import '(java.util.concurrent Executors LinkedBlockingQueue))
+
+(defn pmap
+ ([f coll]
+ (let [nthreads (.. Runtime (getRuntime) (availableProcessors))
+ exec (. Executors (newFixedThreadPool nthreads))
+ todo (ref (seq coll))
+ out (ref 0)
+ q (new LinkedBlockingQueue)
+ produce (fn []
+ (let [job (sync nil
+ (when @todo
+ (let [item (first @todo)]
+ (set todo (rest @todo))
+ (commute out inc)
+ (list item))))]
+ (when job
+ (. q (put (f (first job))))
+ (recur))))
+ tasks (doseq dnu (map (fn [task]
+ (. exec (submit task)))
+ (replicate nthreads produce)))
+ consume (fn []
+ (if (sync nil (and (or @todo (pos? @out)) (commute out dec)))
+ (fnseq (. q (take)) thisfn)
+ (do
+ (. exec (shutdown))
+ (doseq x tasks)
+ nil)))]
+ (consume)))
+ ([f coll & colls]
+ (thisfn (fn [items] (apply f items))
+ ((fn [collseq]
+ (when (every seq collseq)
+ (let [encl-fn thisfn]
+ (lazy-cons (map first collseq) (encl-fn (map rest collseq))))))
+ (cons coll colls)))))
+
(def *exports*
'(clojure
load-file eql-ref?
@@ -642,7 +682,7 @@
ref deref deref! commute set sync
reduce reverse comp appl
every not-every any not-any
- map mapcat filter take take-while drop drop-while
+ map pmap mapcat filter take take-while drop drop-while
zipmap
cycle split-at split-with repeat replicate iterate range
doseq dotimes into