aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChouser <chouser@n01se.net>2009-07-31 00:15:05 -0400
committerChouser <chouser@n01se.net>2009-07-31 00:15:05 -0400
commit2d2c2003325cb5e38a7a984d5d349c12d8dafa4d (patch)
tree4a55a0a2282a27a3c24522b3c994648970b44e82
parent18c485c58f02e47b42e68f9e0d6ffec249b019ba (diff)
seq-utils: Add fill-queue
-rw-r--r--src/clojure/contrib/seq_utils.clj41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/clojure/contrib/seq_utils.clj b/src/clojure/contrib/seq_utils.clj
index fae5df10..46069fa6 100644
--- a/src/clojure/contrib/seq_utils.clj
+++ b/src/clojure/contrib/seq_utils.clj
@@ -24,7 +24,9 @@
(ns
#^{:author "Stuart Sierra (and others)",
:doc "Sequence utilities for Clojure"}
- clojure.contrib.seq-utils)
+ clojure.contrib.seq-utils
+ (:import (java.util.concurrent LinkedBlockingQueue TimeUnit)
+ (java.lang.ref WeakReference)))
;; 'flatten' written by Rich Hickey,
@@ -171,3 +173,40 @@
and return nil if no match is found."
[pred coll]
(first (filter pred coll)))
+
+; based on work related to seque. blame Chouser
+(defn fill-queue
+ "filler-func will be called in another thread with a single arg
+ 'fill'. filler-func may call fill repeatedly with one arg each
+ time which will be pushed onto a queue, blocking if needed until
+ this is possible. fill-queue will return a lazy seq of the values
+ filler-func has pushed onto the queue, blocking if needed until each
+ next element becomes available. filler-func's return value is ignored."
+ ([filler-func & optseq]
+ (let [opts (apply array-map optseq)
+ apoll (:alive-poll opts 1)
+ q (LinkedBlockingQueue. (:queue-size opts 1))
+ NIL (Object.) ;nil sentinel since LBQ doesn't support nils
+ weak-target (Object.)
+ alive? (WeakReference. weak-target)
+ fill (fn fill [x]
+ (if (.get alive?)
+ (if (.offer q (if (nil? x) NIL x) apoll TimeUnit/SECONDS)
+ x
+ (recur x))
+ (throw (Exception. "abandoned"))))
+ f (future
+ (try
+ (filler-func fill)
+ (finally
+ (.put q q))) ;q itself is eos sentinel
+ nil)] ; set future's value to nil
+ ((fn drain []
+ weak-target ; force closing over this object
+ (lazy-seq
+ (let [x (.take q)]
+ (if (identical? x q)
+ @f ;will be nil, touch just to propagate errors
+ (cons (if (identical? x NIL) nil x)
+ (drain))))))))))
+