diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2012-07-29 22:20:09 -0500 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2012-07-29 22:20:09 -0500 |
commit | 1d3754e72e09e5f395d5ace1b6bf91e4c7293a54 (patch) | |
tree | bf04dda82266a5f8e9a9edf1192236c9e2f063cd | |
parent | fd9befa6bb8b13d703c4e65c3d409156f6b26db6 (diff) |
-rw-r--r-- | src/clojure/foofs/localbackend.clj | 2 | ||||
-rw-r--r-- | src/clojure/foofs/storage/scheduler.clj | 47 |
2 files changed, 37 insertions, 12 deletions
diff --git a/src/clojure/foofs/localbackend.clj b/src/clojure/foofs/localbackend.clj index a4ce223..f2e5e85 100644 --- a/src/clojure/foofs/localbackend.clj +++ b/src/clojure/foofs/localbackend.clj @@ -233,7 +233,7 @@ #(continuation! (read-file scheduler salt file offset size))))))) (writefile [_ nodeid offset size data continuation!] ;; TODO: Write out all the complete blocks before bothering the state agent - (send + (send-off state-agent (fn [state] (let [inode (get-in state [:inode-table nodeid])] diff --git a/src/clojure/foofs/storage/scheduler.clj b/src/clojure/foofs/storage/scheduler.clj index acb5cdc..d6dbcaa 100644 --- a/src/clojure/foofs/storage/scheduler.clj +++ b/src/clojure/foofs/storage/scheduler.clj @@ -22,7 +22,8 @@ (store-block [this salt block-bytes n k continuation!])) (defrecord BasicScheduler - [^Executor executor + [^clojure.lang.Agent state-agent + ^Executor executor read-block write-block] Scheduler @@ -30,16 +31,40 @@ (.execute executor (fn [] - (let [[e-hash e-key] block - e-block (read-block e-hash block-size n k) - test-hash (sha-512 e-block)] - (if (= (seq e-hash) (seq test-hash)) - (let [f-block (decode-block e-block e-key salt) - test-hash (sha-512 f-block)] - (if (= (seq e-key) (seq test-hash)) - (continuation! (ByteBuffer/wrap f-block)) - (continuation! nil))) - (continuation! nil)))))) + (send + state-agent + (fn [state] + ;; check to see if this block is already being fetched + (if-let [continuations (get-in state [:fetching block])] + ;; add ourselves to the continuation list + (assoc-in state [:fetching block] + (conj continuations continuation!)) + ;; else start fetching it using the executor + (do + (.execute + executor + (fn [] + (let [result + (let [[e-hash e-key] block + e-block (read-block e-hash block-size n k) + test-hash (sha-512 e-block)] + (when (= (seq e-hash) (seq test-hash)) + (let [f-block (decode-block e-block e-key salt) + test-hash (sha-512 f-block)] + (when (= (seq e-key) (seq test-hash)) + (ByteBuffer/wrap f-block)))))] + ;; now that we've got it, call all of the continuations + (send + state-agent + (fn [state] + (doseq [continuation! (get-in state + [:fetching block])] + (continuation! result)) + ;; and remove the fetching state + (assoc state :fetching + (dissoc (:fetching state) block))))))) + ;; and add ourselves to the continuation list + (assoc-in state [:fetching block] [continuation!])))))))) (store-block [_ salt f-block n k continuation!] (.execute executor |