summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRich Hickey <richhickey@gmail.com>2008-05-19 01:09:42 +0000
committerRich Hickey <richhickey@gmail.com>2008-05-19 01:09:42 +0000
commitaf26812e74ee097197b4c4544bdda56f57081723 (patch)
tree7f207d78311982e699453ae871eae0ca9e709158 /src
parent00f9c7b50c1b17af85fa58d1d8ac4f4027e88177 (diff)
refined parallel api, added partition
Diffstat (limited to 'src')
-rw-r--r--src/boot.clj18
-rw-r--r--src/parallel.clj287
2 files changed, 165 insertions, 140 deletions
diff --git a/src/boot.clj b/src/boot.clj
index bc8f916f..10e0ce84 100644
--- a/src/boot.clj
+++ b/src/boot.clj
@@ -2399,4 +2399,20 @@ not-every? (comp not every?))
(defn hash
"Returns the hash code of its argument"
- [x] (. clojure.lang.Util (hash x))) \ No newline at end of file
+ [x] (. clojure.lang.Util (hash x)))
+
+(defn interpose
+ "Returns a lazy seq of the elements of coll separated by sep"
+ [sep coll] (drop 1 (interleave (repeat sep) coll)))
+
+(defn partition
+ "Returns a lazy sequence of lists of n items each, at offsets step
+ apart. If step is not supplied, defaults to n, i.e. the partitions
+ do not overlap."
+ ([n coll]
+ (partition n n coll))
+ ([n step coll]
+ (take-while #(= n (count %))
+ (when (seq coll)
+ (lazy-cons (take n coll) (partition n step (drop step coll)))))))
+
diff --git a/src/parallel.clj b/src/parallel.clj
index 5c578cef..a60f7bbb 100644
--- a/src/parallel.clj
+++ b/src/parallel.clj
@@ -18,19 +18,21 @@ You'll need jsr166y.jar in your classpath in order to use this
library. The basic idea is that Clojure collections, and most
efficiently vectors, can be turned into parallel arrays for use by
this library with the function par, although most of the functions
-take collections and will call par if needed. Parallel arrays support
-the attachment of bounds, filters and mapping functions prior to
-realization, which happens as the result of any of several operations
-on the array (pall/psort/pfilter-nils/pfilter-dupes/pvec). Rather than
-perform composite operations in steps, as would normally be done with
-sequences, maps and filters are instead attached and thus composed
-with the with-* functions. Note that there is an order sensitivity to
-the attachments - bounds precede filters precede mappings. All
-operations then happen in parallel, using multiple threads and a
-sophisticated work-stealing system supported by fork-join, either when
-the array is realized, or to perform aggregate operations like
-preduce/pmin/pmax etc. A parallel array can be realized into a Clojure
-vector using pvec.
+take collections and will call par if needed, so normally you will
+only need to call par explicitly in order to attach bound/filter/map
+ops. Parallel arrays support the attachment of bounds, filters and
+mapping functions prior to realization/calculation, which happens as
+the result of any of several operations on the
+array (pvec/psort/pfilter-nils/pfilter-dupes). Rather than perform
+composite operations in steps, as would normally be done with
+sequences, maps and filters are instead attached and thus composed by
+providing ops to par. Note that there is an order sensitivity to the
+attachments - bounds precede filters precede mappings. All operations
+then happen in parallel, using multiple threads and a sophisticated
+work-stealing system supported by fork-join, either when the array is
+realized, or to perform aggregate operations like preduce/pmin/pmax
+etc. A parallel array can be realized into a Clojure vector using
+pvec.
")
(import '(jsr166y.forkjoin ParallelArray ParallelArrayWithBounds ParallelArrayWithFilter
@@ -38,30 +40,99 @@ vector using pvec.
Ops$Op Ops$BinaryOp Ops$Reducer Ops$Predicate Ops$BinaryPredicate
Ops$IntAndObjectPredicate Ops$IntAndObjectToObject))
+(defn- op [f]
+ (proxy [Ops$Op] []
+ (op [x] (f x))))
+
+(defn- binary-op [f]
+ (proxy [Ops$BinaryOp] []
+ (op [x y] (f x y))))
+
+(defn- int-and-object-to-object [f]
+ (proxy [Ops$IntAndObjectToObject] []
+ (op [i x] (f x i))))
+
+(defn- reducer [f]
+ (proxy [Ops$Reducer] []
+ (op [x y] (f x y))))
+
+(defn- predicate [f]
+ (proxy [Ops$Predicate] []
+ (op [x] (boolean (f x)))))
+
+(defn- binary-predicate [f]
+ (proxy [Ops$BinaryPredicate] []
+ (op [x y] (boolean (f x y)))))
+
+(defn- int-and-object-predicate [f]
+ (proxy [Ops$IntAndObjectPredicate] []
+ (op [i x] (boolean (f x i)))))
+
(defn par
- "Creates a parallel array from coll if it is not already one"
- [coll]
- (if (instance? ParallelArrayWithMapping coll)
- coll
- (ParallelArray.createUsingHandoff
- (to-array coll)
- (ParallelArray.defaultExecutor))))
+ "Creates a parallel array from coll. ops, if supplied, perform
+ on-the-fly filtering or transformations during parallel realization
+ or calculation. ops form a chain, and bounds must precede filters,
+ must precede maps. ops must be a set of keyword value pairs of the
+ following forms:
-(defn- pa-to-vec [pa]
- (vec (. pa getArray)))
+ :bound [start end]
-(defn pall
- "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
- [coll]
- (if (instance? ParallelArrayWithMapping coll)
- (. coll all)
- (par coll)))
+ Only elements from start (inclusive) to end (exclusive) will be
+ processed when the array is realized.
-(defn pvec
- "Returns the realized contents of the parallel array pa as a Clojure vector"
- [pa] (pa-to-vec (pall pa)))
+ :filter pred
+
+ Filter preds remove elements from processing when the array is realized. pred
+ must be a function of one argument whose return will be processed
+ via boolean.
+ :filter-index pred2
+ pred2 must be a function of two arguments, which will be an element
+ of the collection and the corresponding index, whose return will be
+ processed via boolean.
+
+ :filter-with [pred2 coll2]
+
+ pred2 must be a function of two arguments, which will be
+ corresponding elements of the 2 collections.
+
+ :map f
+
+ Map fns will be used to transform elements when the array is
+ realized. f must be a function of one argument.
+
+ :map-index f2
+
+ f2 must be a function of two arguments, which will be an element of
+ the collection and the corresponding index.
+
+ :map-with [f2 coll2]
+
+ f2 must be a function of two arguments, which will be corresponding
+ elements of the 2 collections."
+
+ ([coll]
+ (if (instance? ParallelArrayWithMapping coll)
+ coll
+ (ParallelArray.createUsingHandoff
+ (to-array coll)
+ (ParallelArray.defaultExecutor))))
+ ([coll & ops]
+ (reduce (fn [pa [op args]]
+ (cond
+ (= op :bound) (. pa withBounds (args 0) (args 1))
+ (= op :filter) (. pa withFilter (predicate args))
+ (= op :filter-with) (. pa withFilter (binary-predicate (args 0)) (par (args 1)))
+ (= op :filter-index) (. pa withIndexedFilter (int-and-object-predicate args))
+ (= op :map) (. pa withMapping (parallel/op args))
+ (= op :map-with) (. pa withMapping (binary-op (args 0)) (par (args 1)))
+ (= op :map-index) (. pa withIndexedMapping (int-and-object-to-object args))
+ :else (throw (Exception. (str "Unsupported par op: " op)))))
+ (par coll)
+ (partition 2 ops))))
+
+;;;;;;;;;;;;;;;;;;;;; aggregate operations ;;;;;;;;;;;;;;;;;;;;;;
(defn pany
"Returns some (random) element of the coll if it satisfies the bound/filter/map"
[coll]
@@ -88,121 +159,56 @@ vector using pvec.
([coll] (summary-map (. (par coll) summary)))
([coll comp] (summary-map (. (par coll) summary comp))))
-(defn pdistinct
- "Returns a parallel array of the distinct elements of coll"
- [coll]
- (pa-to-vec (. (par coll) allUniqueElements)))
-
-(defn- op [f]
- (proxy [Ops$Op] []
- (op [x] (f x))))
-
-(defn- binary-op [f]
- (proxy [Ops$BinaryOp] []
- (op [x y] (f x y))))
+(defn preduce
+ "Returns the reduction of the realized elements of coll
+ using function f. Note f will not necessarily be called
+ consecutively, and so must be commutative. Also note that
+ (f base an-element) might be performed many times, i.e. base is not
+ an initial value as with sequential reduce."
+ [f base coll]
+ (. (par coll) (reduce (reducer f) base)))
-(defn- int-and-object-to-object [f]
- (proxy [Ops$IntAndObjectToObject] []
- (op [i x] (f i x))))
+;;;;;;;;;;;;;;;;;;;;; collection-producing operations ;;;;;;;;;;;;;;;;;;;;;;
-(defn- reducer [f]
- (proxy [Ops$Reducer] []
- (op [x y] (f x y))))
+(defn- pa-to-vec [pa]
+ (vec (. pa getArray)))
-(defn- predicate [f]
- (proxy [Ops$Predicate] []
- (op [x] (boolean (f x)))))
+(defn- pall
+ "Realizes a copy of the coll as a parallel array, with any bounds/filters/maps applied"
+ [coll]
+ (if (instance? ParallelArrayWithMapping coll)
+ (. coll all)
+ (par coll)))
-(defn- binary-predicate [f]
- (proxy [Ops$BinaryPredicate] []
- (op [x y] (boolean (f x y)))))
+(defn pvec
+ "Returns the realized contents of the parallel array pa as a Clojure vector"
+ [pa] (pa-to-vec (pall pa)))
-(defn- int-and-object-predicate [f]
- (proxy [Ops$IntAndObjectPredicate] []
- (op [i x] (boolean (f i x)))))
+(defn pdistinct
+ "Returns a parallel array of the distinct elements of coll"
+ [coll]
+ (pa-to-vec (. (pall coll) allUniqueElements)))
;this doesn't work, passes null to reducer?
(defn- pcumulate [coll f init]
(.. (pall coll) (precumulate (reducer f) init)))
-(defn preduce
- "Returns a parallel array of the reduction of the realized elements of coll
- using function f. Note f will not necessarily be called
- consecutively, and so must be commutative. Also note that
- (f base an-element) might be performed many times, i.e. base is not
- an initial value as with sequential reduce. Returns a new parallel
- array."
- [f base coll]
- (. (par coll) (reduce (reducer f) base)))
-
(defn psort
- "Returns a new parallel array consisting of the realized items in coll, sorted,
+ "Returns a new vector consisting of the realized items in coll, sorted,
presuming Comparable elements, unless a Comparator comp is supplied"
- ([coll] (. (pall coll) sort))
- ([coll comp] (. (pall coll) sort comp)))
+ ([coll] (pa-to-vec (. (pall coll) sort)))
+ ([coll comp] (pa-to-vec (. (pall coll) sort comp))))
(defn pfilter-nils
- "Returns a parallel array containing the non-nil (realized) elements of coll"
+ "Returns a vector containing the non-nil (realized) elements of coll"
[coll]
- (. (pall coll) removeNulls))
+ (pa-to-vec (. (pall coll) removeNulls)))
(defn pfilter-dupes
- "Returns a parallel array containing the (realized) elements of coll,
+ "Returns a vector containing the (realized) elements of coll,
without any consecutive duplicates"
[coll]
- (. (pall coll) removeConsecutiveDuplicates))
-
-(defn with-bounds
- "Returns a parallel array with the bounds attached. Note bounds must
- be attached before filters or mappings. Only elements from
- start (inclusive) to end (exclusive) will be processed when the
- array is realized."
- [coll start end]
- (. (par coll) withBounds start end))
-
-(defn with-filter
- "Returns a parallel array with the filter attached. Note filters
- must be attached before mappings. pred must be a function of one
- argument whose return will be processed via boolean. pred2 must be a
- function of two arguments, which will be corresponding elements of
- the 2 collections. The predicate will be used to remove elements
- from processing when the array is realized."
- ([coll pred]
- (. (par coll) withFilter (predicate pred)))
- ([coll pred2 coll2]
- (. (par coll) withFilter (binary-predicate pred2) (par coll2))))
-
-(defn with-indexed-filter
- "Returns a parallel array with the filter attached. Note filters
- must be attached before mappings. pred2 must be a function of two
- arguments, which will be an index and the corresponding element of
- the collection, whose return will be processed via boolean. The
- predicate will be used to remove elements from processing when
- the array is realized."
- ([coll pred2]
- (. (par coll) withIndexedFilter (int-and-object-predicate pred2))))
-
-(defn with-mapping
- "Returns a parallel array with the mapping function attached. f
- must be a function of one argument. f2 must be a function of two
- arguments, which will be corresponding elements of the 2
- collections. The mapping fn will be used to transform elements when
- the array is realized."
- ([coll f]
- (. (par coll) withMapping (op f)))
- ([coll f2 coll2]
- (. (par coll) withMapping (binary-op f2) (par coll2))))
-
-(defn with-indexed-mapping
- "Returns a parallel array with the mapping function attached. f2
- must be a function of two arguments, which will be an index and the
- corresponding element of the collection. The mapping fn will be used
- to transform elements when the array is realized."
- ([coll f2]
- (. (par coll) withIndexedMapping (int-and-object-to-object f2))))
-
-
-
+ (pa-to-vec (. (pall coll) removeConsecutiveDuplicates)))
(comment
@@ -219,20 +225,23 @@ vector using pvec.
(preduce + 0 [1 2 3 2 1])
(preduce + 0 (psort a))
-(pall (with-indexed-filter (par [11 2 3 2]) (fn [i x] (> i x))))
-(pall (with-filter (par [11 2 3 2]) (fn [x y] (> y x)) (par [110 2 33 2])))
+(pvec (par [11 2 3 2] :filter-index (fn [x i] (> i x))))
+(pvec (par [11 2 3 2] :filter-with [(fn [x y] (> y x)) [110 2 33 2]]))
(psummary ;or pvec/pmax etc
- (-> (par [11 2 3 2])
- (with-filter (fn [x y] (> y x))
- (par [110 2 33 2]))
- (with-mapping #(* % 2))))
+ (par [11 2 3 2]
+ :filter-with [(fn [x y] (> y x))
+ [110 2 33 2]]
+ :map #(* % 2)))
(preduce + 0
- (-> (par [11 2 3 2])
- (with-filter (fn [x y] (> y x))
- (par [110 2 33 2]))))
-
-(par coll :bound [1 2] :filter fn :map fn)
-
+ (par [11 2 3 2]
+ :filter-with [< [110 2 33 2]]))
+
+(time (reduce + 0 (map #(* % %) (range 1000000))))
+(time (preduce + 0 (par (range 1000000) :map-index *)))
+(def v (range 1000000))
+(time (preduce + 0 (par v :map-index *)))
+(time (preduce + 0 (par v :map #(* % %))))
+(time (reduce + 0 (map #(* % %) v)))
) \ No newline at end of file