aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2012-07-29 22:20:09 -0500
committerDavid Barksdale <amatus.amongus@gmail.com>2012-07-29 22:20:09 -0500
commit1d3754e72e09e5f395d5ace1b6bf91e4c7293a54 (patch)
treebf04dda82266a5f8e9a9edf1192236c9e2f063cd
parentfd9befa6bb8b13d703c4e65c3d409156f6b26db6 (diff)
Read-combining for the BasicScheduler.HEADmaster
-rw-r--r--src/clojure/foofs/localbackend.clj2
-rw-r--r--src/clojure/foofs/storage/scheduler.clj47
2 files changed, 37 insertions, 12 deletions
diff --git a/src/clojure/foofs/localbackend.clj b/src/clojure/foofs/localbackend.clj
index a4ce223..f2e5e85 100644
--- a/src/clojure/foofs/localbackend.clj
+++ b/src/clojure/foofs/localbackend.clj
@@ -233,7 +233,7 @@
#(continuation! (read-file scheduler salt file offset size)))))))
(writefile [_ nodeid offset size data continuation!]
;; TODO: Write out all the complete blocks before bothering the state agent
- (send
+ (send-off
state-agent
(fn [state]
(let [inode (get-in state [:inode-table nodeid])]
diff --git a/src/clojure/foofs/storage/scheduler.clj b/src/clojure/foofs/storage/scheduler.clj
index acb5cdc..d6dbcaa 100644
--- a/src/clojure/foofs/storage/scheduler.clj
+++ b/src/clojure/foofs/storage/scheduler.clj
@@ -22,7 +22,8 @@
(store-block [this salt block-bytes n k continuation!]))
(defrecord BasicScheduler
- [^Executor executor
+ [^clojure.lang.Agent state-agent
+ ^Executor executor
read-block
write-block]
Scheduler
@@ -30,16 +31,40 @@
(.execute
executor
(fn []
- (let [[e-hash e-key] block
- e-block (read-block e-hash block-size n k)
- test-hash (sha-512 e-block)]
- (if (= (seq e-hash) (seq test-hash))
- (let [f-block (decode-block e-block e-key salt)
- test-hash (sha-512 f-block)]
- (if (= (seq e-key) (seq test-hash))
- (continuation! (ByteBuffer/wrap f-block))
- (continuation! nil)))
- (continuation! nil))))))
+ (send
+ state-agent
+ (fn [state]
+ ;; check to see if this block is already being fetched
+ (if-let [continuations (get-in state [:fetching block])]
+ ;; add ourselves to the continuation list
+ (assoc-in state [:fetching block]
+ (conj continuations continuation!))
+ ;; else start fetching it using the executor
+ (do
+ (.execute
+ executor
+ (fn []
+ (let [result
+ (let [[e-hash e-key] block
+ e-block (read-block e-hash block-size n k)
+ test-hash (sha-512 e-block)]
+ (when (= (seq e-hash) (seq test-hash))
+ (let [f-block (decode-block e-block e-key salt)
+ test-hash (sha-512 f-block)]
+ (when (= (seq e-key) (seq test-hash))
+ (ByteBuffer/wrap f-block)))))]
+ ;; now that we've got it, call all of the continuations
+ (send
+ state-agent
+ (fn [state]
+ (doseq [continuation! (get-in state
+ [:fetching block])]
+ (continuation! result))
+ ;; and remove the fetching state
+ (assoc state :fetching
+ (dissoc (:fetching state) block)))))))
+ ;; and add ourselves to the continuation list
+ (assoc-in state [:fetching block] [continuation!]))))))))
(store-block [_ salt f-block n k continuation!]
(.execute
executor