diff options
author | Chouser <chouser@n01se.net> | 2009-07-31 00:15:05 -0400 |
---|---|---|
committer | Chouser <chouser@n01se.net> | 2009-07-31 00:15:05 -0400 |
commit | 2d2c2003325cb5e38a7a984d5d349c12d8dafa4d (patch) | |
tree | 4a55a0a2282a27a3c24522b3c994648970b44e82 | |
parent | 18c485c58f02e47b42e68f9e0d6ffec249b019ba (diff) |
seq-utils: Add fill-queue
-rw-r--r-- | src/clojure/contrib/seq_utils.clj | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/clojure/contrib/seq_utils.clj b/src/clojure/contrib/seq_utils.clj index fae5df10..46069fa6 100644 --- a/src/clojure/contrib/seq_utils.clj +++ b/src/clojure/contrib/seq_utils.clj @@ -24,7 +24,9 @@ (ns #^{:author "Stuart Sierra (and others)", :doc "Sequence utilities for Clojure"} - clojure.contrib.seq-utils) + clojure.contrib.seq-utils + (:import (java.util.concurrent LinkedBlockingQueue TimeUnit) + (java.lang.ref WeakReference))) ;; 'flatten' written by Rich Hickey, @@ -171,3 +173,40 @@ and return nil if no match is found." [pred coll] (first (filter pred coll))) + +; based on work related to seque. blame Chouser +(defn fill-queue + "filler-func will be called in another thread with a single arg + 'fill'. filler-func may call fill repeatedly with one arg each + time which will be pushed onto a queue, blocking if needed until + this is possible. fill-queue will return a lazy seq of the values + filler-func has pushed onto the queue, blocking if needed until each + next element becomes available. filler-func's return value is ignored." + ([filler-func & optseq] + (let [opts (apply array-map optseq) + apoll (:alive-poll opts 1) + q (LinkedBlockingQueue. (:queue-size opts 1)) + NIL (Object.) ;nil sentinel since LBQ doesn't support nils + weak-target (Object.) + alive? (WeakReference. weak-target) + fill (fn fill [x] + (if (.get alive?) + (if (.offer q (if (nil? x) NIL x) apoll TimeUnit/SECONDS) + x + (recur x)) + (throw (Exception. "abandoned")))) + f (future + (try + (filler-func fill) + (finally + (.put q q))) ;q itself is eos sentinel + nil)] ; set future's value to nil + ((fn drain [] + weak-target ; force closing over this object + (lazy-seq + (let [x (.take q)] + (if (identical? x q) + @f ;will be nil, touch just to propagate errors + (cons (if (identical? x NIL) nil x) + (drain)))))))))) + |