diff options
author | Rich Hickey <richhickey@gmail.com> | 2009-02-09 16:08:22 +0000 |
---|---|---|
committer | Rich Hickey <richhickey@gmail.com> | 2009-02-09 16:08:22 +0000 |
commit | f1daffdbae886dab563f5653ffd78db40403115c (patch) | |
tree | 4e18cc244e2b569027a2f97fad1a716e0dcc5401 /src/clj | |
parent | 5577a47a390782d7ab911c2e3c4c8be1b0341aa8 (diff) |
added #_ ignore form reader macro
added IDeref above IRef, made delays and futures implement IDeref
renamed/moved IRef.get() -> IDeref.deref()
deref/@ maps to IDeref/deref
added future-calls and future
implement pmap on future
implement pcalls on pmap
Diffstat (limited to 'src/clj')
-rw-r--r-- | src/clj/clojure/core.clj | 98 |
1 files changed, 58 insertions, 40 deletions
diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj index 04b11d25..37a063aa 100644 --- a/src/clj/clojure/core.clj +++ b/src/clj/clojure/core.clj @@ -1214,11 +1214,13 @@ ([x & options] (setup-reference (ref x) options))) (defn deref - "Also reader macro: @ref/@agent/@var/@atom Within a transaction, + "Also reader macro: @ref/@agent/@var/@atom/@delay/@future. Within a transaction, returns the in-transaction-value of ref, else returns the most-recently-committed value of ref. When applied to a var, agent - or atom, returns its current state." - [#^clojure.lang.IRef ref] (. ref (get))) + or atom, returns its current state. When applied to a delay, forces + it if not already forced. When applied to a future, will block if + computation not complete" + [#^clojure.lang.IDeref ref] (.deref ref)) (defn atom "Creates and returns an Atom with an initial value of x and zero or @@ -3704,43 +3706,6 @@ "Returns true if coll implements Reversible" [coll] (instance? clojure.lang.Reversible coll)) -(defn pcalls - "Executes the no-arg fns in parallel, returning a lazy sequence of - their values" - [& fns] - (let [futs (.invokeAll clojure.lang.Agent/pooledExecutor fns)] - (map #(.get %) futs))) - -(defmacro pvalues - "Returns a lazy sequence of the values of the exprs, which are - evaluated in parallel" - [& exprs] - `(pcalls ~@(map #(list `fn [] %) exprs))) - -(defn pmap - "Like map, except f is applied in parallel. Semi-lazy in that the - parallel computation stays ahead of the consumption, but doesn't - realize the entire result unless required. Only useful for - computationally intensive functions where the time of f dominates - the coordination overhead." - ([f coll] - (let [n (inc (.. Runtime getRuntime availableProcessors)) - agents (doall (map #(send (agent %) f) (take n coll))) - wget (fn [a] (await1 a) @a) - step (fn step [[x & xs :as s] - [a & as :as acycle]] - (if s - (let [v (wget a)] - (send a (fn [_] (f x))) - (lazy-cons v (step xs as))) - (map wget (take (count agents) acycle))))] - (step (drop n coll) (cycle agents)))) - ([f coll & colls] - (let [step (fn step [cs] - (when (every? seq cs) - (lazy-cons (map first cs) (step (map rest cs)))))] - (pmap #(apply f %) (step (cons coll colls)))))) - (def #^{:doc "bound in a repl thread to the most recent value printed"} *1) @@ -3933,3 +3898,56 @@ (load "genclass") +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; futures (needs proxy);;;;;;;;;;;;;;;;;; +(defn future-call + "Takes a function of no args and yields a future object that will + invoke the function in another thread, and will cache the result and + return it on all subsequent calls to deref/@. If the computation has + not yet finished, calls to deref/@ will block." + [#^Callable f] + (let [fut (.submit clojure.lang.Agent/pooledExecutor f)] + (proxy [clojure.lang.IDeref java.util.concurrent.Future] [] + (deref [] (.get fut)) + (get [] (.get fut)) + (get [timeout unit] (.get fut timeout unit)) + (isCancelled [] (.isCancelled fut)) + (isDone [] (.isDone fut)) + (cancel [interrupt?] (.cancel fut interrupt?))))) + +(defmacro future + "Takes a body of expressions and yields a future object that will + invoke the body in another thread, and will cache the result and + return it on all subsequent calls to deref/@. If the computation has + not yet finished, calls to deref/@ will block." + [& body] `(future-call (fn [] ~@body))) + +(defn pmap + "Like map, except f is applied in parallel. Semi-lazy in that the + parallel computation stays ahead of the consumption, but doesn't + realize the entire result unless required. Only useful for + computationally intensive functions where the time of f dominates + the coordination overhead." + ([f coll] + (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) + rets (map #(future (f %)) coll) + step (fn step [[x & xs :as vs] fs] + (if fs + (lazy-cons (deref x) (step xs (rest fs))) + (map deref vs)))] + (step rets (drop n rets)))) + ([f coll & colls] + (let [step (fn step [cs] + (when (every? seq cs) + (lazy-cons (map first cs) (step (map rest cs)))))] + (pmap #(apply f %) (step (cons coll colls)))))) + +(defn pcalls + "Executes the no-arg fns in parallel, returning a lazy sequence of + their values" + [& fns] (pmap #(%) fns)) + +(defmacro pvalues + "Returns a lazy sequence of the values of the exprs, which are + evaluated in parallel" + [& exprs] + `(pcalls ~@(map #(list `fn [] %) exprs))) |