diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 00:28:46 -0700 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 00:28:46 -0700 |
commit | bc32b275367e808abb6afc51cdc320f3ef3dad6a (patch) | |
tree | dd294dfd420033fb0fdde455e0598733e62cbe7a | |
parent | 7bef3c6f9d4c3e53c76762c6938f5a5aaeb125a5 (diff) |
Added ThreadPoolExecutors to keep track of CPU and IO load.
Network load should correspond to the selector-continuations-queue.
Removed some overzalous use of the exception-m monad.
-rw-r--r-- | src/org/gnu/clojure/gnunet/core.clj | 279 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/filesharing.clj | 6 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/peer.clj | 70 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/util.clj | 4 |
4 files changed, 218 insertions, 141 deletions
diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj index 0c6442d..8607bc3 100644 --- a/src/org/gnu/clojure/gnunet/core.clj +++ b/src/org/gnu/clojure/gnunet/core.clj @@ -161,13 +161,12 @@ (defn emit-messages! [peer remote-peer messages] - (send-do-exception-m! (:state-agent remote-peer) - [is-connected (fetch-val :is-connected) - :when is-connected - transport (fetch-val :connected-transport) - encoded-address (fetch-val :connected-address)] - ((:emit-messages! transport) transport remote-peer encoded-address nil - messages))) + (let [state (deref (:state-agent remote-peer)) + transport (:connected-transport state) + encoded-address (:connected-address state)] + (when (:is-connected state) + ((:emit-messages! transport) transport remote-peer encoded-address nil + messages)))) (defn send-key! [peer remote-peer] @@ -175,96 +174,108 @@ [:when-let [public-key (deref (:public-key-atom remote-peer))] is-connected (fetch-val :is-connected) :when is-connected - _ (update-val :status #(if (= % peer-status-down) peer-status-key-sent %)) + _ (update-val :status #(if (== % peer-status-down) peer-status-key-sent %)) sender-status (fetch-val :status) creation-time (fetch-val :encrypt-key-created) encrypt-key (fetch-val :encrypt-key) challenge (fetch-val :ping-challenge)] - (let [set-key {:sender-status sender-status - :creation-time creation-time - :peer-id (:id remote-peer) - :encrypted-key (rsa-encrypt! public-key - (encode-aes-key encrypt-key) - (:random peer))} - signed-material (encode-set-key-signed-material set-key) - signature (rsa-sign (:private-key peer) signed-material) - set-key (assoc set-key :signature signature) - encoded-set-key (encode-set-key set-key signed-material) - iv-seed (.nextInt (:random peer)) - ping {:iv-seed iv-seed - :challenge challenge - :peer-id (:id remote-peer)} - encoded-ping (encode-core-ping ping encrypt-key (:id remote-peer))] - (emit-messages! peer remote-peer - [{:message-type message-type-core-set-key :bytes encoded-set-key} - {:message-type message-type-core-ping :bytes encoded-ping}])))) + (.execute (:cpu-bound-executor peer) + (fn [] + (let [set-key {:sender-status sender-status + :creation-time creation-time + :peer-id (:id remote-peer) + :encrypted-key (rsa-encrypt! public-key + (encode-aes-key encrypt-key) + (:random peer))} + signed-material (encode-set-key-signed-material set-key) + signature (rsa-sign (:private-key peer) signed-material) + set-key (assoc set-key :signature signature) + encoded-set-key (encode-set-key set-key signed-material) + iv-seed (.nextInt (:random peer)) + ping {:iv-seed iv-seed + :challenge challenge + :peer-id (:id remote-peer)} + encoded-ping (encode-core-ping ping encrypt-key + (:id remote-peer))] + (emit-messages! peer remote-peer + [{:message-type message-type-core-set-key :bytes encoded-set-key} + {:message-type message-type-core-ping :bytes encoded-ping}])))))) (defn handle-set-key! [peer remote-peer message] - (send-do-exception-m! (:state-agent remote-peer) - [:when-let [public-key (deref (:public-key-atom remote-peer))] - :when-let [set-key (first (parse-set-key (:bytes message)))] - :when (= (:peer-id set-key) (:id peer)) - :when (rsa-verify public-key - (:signed-material set-key) (:signature set-key)) - status (fetch-val :status) - decrypt-key-created (fetch-val :decrypt-key-created) - :when-not (and - (or (= status peer-status-key-received) - (= status peer-status-key-confirmed)) - (.after decrypt-key-created (:creation-time set-key))) - :when-let [decrypted-key (rsa-decrypt (:private-key peer) - (:encrypted-key set-key))] - ;; XXX: For some reason we end up with an extra 0 byte at the - ;; beginning of the decrypted-key when the MSB is 1. - :let [decrypted-key (drop (- (count decrypted-key) aes-key-size 4) - decrypted-key)] - :when-let [decrypt-key (first (parse-aes-key decrypted-key))] - _ (set-val :decrypt-key decrypt-key) - :let [creation-time (:creation-time set-key)] - _ (update-state #(if (= decrypt-key-created creation-time) - % - (conj % {:last-sequence-number-received 0 - :last-packets-bitmap (int 0) - :decrypt-key-created creation-time}))) - :let [sender-status (:sender-status set-key)] - _ (update-val :status - #(if (= % peer-status-key-confirmed) % peer-status-key-received))] - (when (or (= status peer-status-down) - (and (not (= sender-status peer-status-key-received)) - (not (= sender-status peer-status-key-confirmed)))) - (send-key! peer remote-peer)))) + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [public-key (deref (:public-key-atom remote-peer)) + set-key (first (parse-set-key (:bytes message))) + :when (= (:peer-id set-key) (:id peer)) + :when (rsa-verify public-key + (:signed-material set-key) (:signature set-key)) + decrypted-key (rsa-decrypt (:private-key peer) + (:encrypted-key set-key)) + :let [decrypted-key (drop (- (count decrypted-key) aes-key-size 4) + decrypted-key)] + decrypt-key (first (parse-aes-key decrypted-key))] + (send-do-exception-m! (:state-agent remote-peer) + [status (fetch-val :status) + decrypt-key-created (fetch-val :decrypt-key-created) + :when-not (and + (or (== status peer-status-key-received) + (== status peer-status-key-confirmed)) + (.after decrypt-key-created (:creation-time set-key))) + _ (set-val :decrypt-key decrypt-key) + :let [creation-time (:creation-time set-key)] + _ (update-state #(if (= decrypt-key-created creation-time) + % + (conj % {:last-sequence-number-received 0 + :last-packets-bitmap (int 0) + :decrypt-key-created creation-time}))) + :let [sender-status (:sender-status set-key)] + _ (update-val :status + #(if (== % peer-status-key-confirmed) + % + peer-status-key-received))] + (when (or (== status peer-status-down) + (and (not (== sender-status peer-status-key-received)) + (not (== sender-status peer-status-key-confirmed)))) + (send-key! peer remote-peer))))))) (defn handle-core-ping! [peer remote-peer message] - (send-do-exception-m! (:state-agent remote-peer) - [decrypt-key (fetch-val :decrypt-key) - :when decrypt-key - :when-let [ping (first ((parse-core-ping decrypt-key (:id peer)) - (:bytes message)))] - bw-in (fetch-val :bw-in) - encrypt-key (fetch-val :encrypt-key)] - (let [iv-seed (.nextInt (:random peer)) - pong {:iv-seed iv-seed - :challenge (:challenge ping) - :inbound-bw-limit bw-in - :peer-id (:id peer)} - encoded-pong (encode-core-pong pong encrypt-key (:id remote-peer))] - (emit-messages! peer remote-peer - [{:message-type message-type-core-pong :bytes encoded-pong}])))) + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [:let [state (deref (:state-agent remote-peer))] + decrypt-key (:decrypt-key state) + ping (first ((parse-core-ping decrypt-key (:id peer)) + (:bytes message)))] + (let [bw-in (:bw-in state) + encrypt-key (:encrypt-key state) + iv-seed (.nextInt (:random peer)) + pong {:iv-seed iv-seed + :challenge (:challenge ping) + :inbound-bw-limit bw-in + :peer-id (:id peer)} + encoded-pong (encode-core-pong pong encrypt-key + (:id remote-peer))] + (emit-messages! peer remote-peer + [{:message-type message-type-core-pong :bytes encoded-pong}])))))) (defn handle-core-pong! [peer remote-peer message] - (send-do-exception-m! (:state-agent remote-peer) - [decrypt-key (fetch-val :decrypt-key) - :when decrypt-key - challenge (fetch-val :ping-challenge) - :when-let [pong (first ((parse-core-pong decrypt-key - challenge (:id peer)) (:bytes message)))] - :when (= (:peer-id pong) (:id remote-peer)) - status (fetch-val :status) - :when (= status peer-status-key-received) - _ (set-val :status peer-status-key-confirmed)] nil)) + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [:let [state (deref (:state-agent remote-peer))] + decrypt-key (:decrypt-key state) + :let [challenge (:ping-challenge state)] + pong (first ((parse-core-pong decrypt-key challenge (:id peer)) + (:bytes message))) + :when (= (:peer-id pong) (:id remote-peer))] + (send (:state-agent remote-peer) + #(if (== peer-status-key-received (:status %)) + (assoc % :status peer-status-key-confirmed) + %)))))) (defn admit-core-message! [peer remote-peer message] @@ -277,52 +288,60 @@ (defn handle-core-encrypted-message! [peer remote-peer message] - (send-do-exception-m! (:state-agent remote-peer) - [decrypt-key (fetch-val :decrypt-key) - :when decrypt-key - decrypt-key-created (fetch-val :decrypt-key-created) - :when-let [message (first - ((parse-core-encrypted-message decrypt-key - decrypt-key-created (:id peer)) (:bytes message)))] - last-seqnum (fetch-val :last-sequence-number-received) - :let [seqnum (:sequence-number message)] - :when-not (== last-seqnum seqnum) - :when-not (> last-seqnum (+ 32 seqnum)) - bitmap (fetch-val :last-packets-bitmap) - :let [bit (- last-seqnum seqnum 1)] - :when-not (and (> last-seqnum seqnum) (bit-test bitmap bit)) - _ (update-state - #(if (> last-seqnum seqnum) - (assoc % :last-packets-bitmap (bit-set bitmap bit)) - (conj % - {:last-sequence-number-received seqnum - :last-packets-bitmap (.intValue - (bit-shift-left (bigint bitmap) - (- seqnum last-seqnum)))}))) - :when-not (.before (:timestamp message) (message-expiration)) - ;; TODO: update bandwidth tracking - ] - (doseq [message (:messages message)] - (admit-core-message! peer remote-peer message)))) + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [:let [state (deref (:state-agent remote-peer))] + decrypt-key (:decrypt-key state) + :let [decrypt-key-created (:decrypt-key-created state)] + message (first ((parse-core-encrypted-message decrypt-key + decrypt-key-created (:id peer)) (:bytes message)))] + (send-do-exception-m! (:state-agent remote-peer) + [last-seqnum (fetch-val :last-sequence-number-received) + :let [seqnum (:sequence-number message)] + :when-not (== last-seqnum seqnum) + :when-not (> last-seqnum (+ 32 seqnum)) + bitmap (fetch-val :last-packets-bitmap) + :let [bit (- last-seqnum seqnum 1)] + :when-not (and (> last-seqnum seqnum) (bit-test bitmap bit)) + _ (update-state + #(if (> last-seqnum seqnum) + (assoc % :last-packets-bitmap (bit-set bitmap bit)) + (conj % + {:last-sequence-number-received seqnum + :last-packets-bitmap (.intValue + (bit-shift-left (bigint bitmap) + (- seqnum last-seqnum)))}))) + :when-not (.before (:timestamp message) (message-expiration)) + ;; TODO: update bandwidth tracking + ] + (doseq [message (:messages message)] + (admit-core-message! peer remote-peer message))))))) + +(defn initialize-remote-peer-state + [peer state] + (conj state + {:status peer-status-down + :decrypt-key-created (Date. (long 0)) + :encrypt-key (generate-aes-key! (:random peer)) + :encrypt-key-created (Date.) + :ping-challenge (.nextInt (:random peer)) + ;; TODO: Make this a real number + :bw-in 20000})) (defn handle-receive! [peer remote-peer message] - (send-do-exception-m! (:state-agent remote-peer) - [_ (update-state - #(if (contains? % :status) - % - (conj % - {:status peer-status-down - :decrypt-key-created (Date. (long 0)) - :encrypt-key (generate-aes-key! (:random peer)) - :encrypt-key-created (Date.) - :ping-challenge (.nextInt (:random peer)) - ;; TODO: Make this a real number - :bw-in 20000})))] - (condp = (:message-type message) - message-type-core-set-key (handle-set-key! peer remote-peer message) - message-type-core-encrypted-message (handle-core-encrypted-message! - peer remote-peer message) - message-type-core-ping (handle-core-ping! peer remote-peer message) - message-type-core-pong (handle-core-pong! peer remote-peer message) - nil))) + (send (:state-agent remote-peer) + (fn [state] + (let [state (if (contains? state :status) + state + (initialize-remote-peer-state peer state))] + (.write *out* (str "Core: " message "\n")) + (condp = (:message-type message) + message-type-core-set-key (handle-set-key! peer remote-peer message) + message-type-core-encrypted-message (handle-core-encrypted-message! + peer remote-peer message) + message-type-core-ping (handle-core-ping! peer remote-peer message) + message-type-core-pong (handle-core-pong! peer remote-peer message) + nil) + state)))) diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj index 525fefd..a12779a 100644 --- a/src/org/gnu/clojure/gnunet/filesharing.clj +++ b/src/org/gnu/clojure/gnunet/filesharing.clj @@ -42,6 +42,7 @@ (defn bound-priority "Monadic function of the exception-m monad. Updates :trust and :average-priority and returns a bounded priority." + ;; TODO: check load, don't charge if we're idle, drop if we're too busy [priority] (fn [state] (let [priority (min priority (:turst state 0)) @@ -65,13 +66,14 @@ (:return-to get-message)) remote-peer)] :when (:is-connected (deref (:state-agent return-to))) ;; TODO: try connect - ;; TODO: check load and drop message if load is too high priority (bound-priority (:priority get-message)) :let [ttl (min ttl-max (:ttl get-message) (* priority ttl-decrement 0.001))] :let [ttl (- ttl (* 2 ttl-decrement) (.nextInt (:random peer) ttl-decrement))] - :when (< 0 ttl)] + duplicate (with-state-field :queries + (fetch-val (:query get-message))) + ] nil)) (defn admit-put! diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj index 2eac978..d8beccc 100644 --- a/src/org/gnu/clojure/gnunet/peer.clj +++ b/src/org/gnu/clojure/gnunet/peer.clj @@ -1,7 +1,8 @@ (ns org.gnu.clojure.gnunet.peer (:use (org.gnu.clojure.gnunet crypto message util)) (:import java.nio.channels.Selector - java.util.concurrent.ConcurrentLinkedQueue + (java.util.concurrent ConcurrentLinkedQueue LinkedBlockingQueue + PriorityBlockingQueue ThreadPoolExecutor TimeUnit) java.security.SecureRandom)) (defstruct remote-peer-struct @@ -22,7 +23,7 @@ ;; :connected-transport (value from peer-struct:transports-agent) ;; :connected-address (byte vector) ;; (core layer) - ;; :status peer-status-down (int) + ;; :status (int) ;; :decrypt-key-created (java.util.Date) ;; :encrypt-key (java.security.Key) ;; :encrypt-key-created (java.util.Date) @@ -30,7 +31,8 @@ ;; :bw-in (int) ;; (filesharing layer) ;; :trust (int) - ;; :average-priority (float)} + ;; :average-priority (float) + ;; :queries (map of query hashes to get-messages)} :state-agent) (def peer-struct (apply create-struct (concat @@ -54,13 +56,34 @@ ;; Thread which selects on :selector :selector-thread - ;; java.util.concurrent.ConcurrentLinkedQueue of continuations, in order to - ;; access the selector while the selector-thread is running add a - ;; continuation to this queue and call .wakeup on the selector + ;; java.util.concurrent.ConcurrentLinkedQueue of continuations. + ;; In order to access the selector while the selector-thread is running add + ;; a continuation to this queue and call .wakeup on the selector. + ;; The size of this queue is an easy measure our network load. :selector-continuations-queue ;; java.security.SecureRandom - :random)))) + :random + + ;; java.util.concurrent.ThreadPoolExecutor for executing CPU-bound + ;; operations like generating RSA keys, hashes, etc. It has one thread for + ;; each processor in the system and a "practically" unbounded FIFO queue. + :cpu-bound-executor + + ;; java.util.concurrent.LinkedBlockingQueue of Integer.MAX_VALUE capacity. + ;; The size of this queue is an easy measure of our CPU load. + :cpu-bound-queue + + ;; java.util.concurrent.ThreadPoolExecutor for executing disk-bound + ;; operations like quering a database or reading/writing files. It has a + ;; single thread and an unbounded priority queue. The priority of a callable + ;; object is stored in its metadata under the key :priority. + :disk-bound-executor + + ;; java.util.concurrent.PriorityBlockingQueue of unbounded capacity. + ;; The size of this queue is an easy measure of our disk load. + :disk-bound-queue + )))) (defstruct peer-options :keypair) @@ -72,6 +95,13 @@ (def id-size hash-size) +(def priority-comparator + (reify java.util.Comparator + (compare [this o1 o2] + (clojure.core/compare (:priority (meta o2)) (:priority (meta o1)))) + (equals [this obj] + (== (:priority (meta this)) (:priority (meta obj)))))) + (defn selector-loop! [selector continuations] (doseq [continuation (queue-seq! continuations)] @@ -85,7 +115,13 @@ (defn new-peer [options] (let [selector (Selector/open) - continuations (ConcurrentLinkedQueue.)] + continuations (ConcurrentLinkedQueue.) + cpu-bound-queue (LinkedBlockingQueue.) + cpu-bound-executor (ThreadPoolExecutor. 0 (available-processors) 60 + TimeUnit/SECONDS cpu-bound-queue) + disk-bound-queue (PriorityBlockingQueue. 1 priority-comparator) + disk-bound-executor (ThreadPoolExecutor. 0 1 60 TimeUnit/SECONDS + disk-bound-queue)] (struct-map peer-struct :public-key-atom (atom (.getPublic (:keypair options))) :id (generate-id (.getPublic (:keypair options))) @@ -97,4 +133,20 @@ :selector selector :selector-thread (Thread. (partial selector-loop! selector continuations)) :selector-continuations-queue continuations - :random (:random options)))) + :random (:random options) + :cpu-bound-executor cpu-bound-executor + :cpu-bound-queue cpu-bound-queue + :disk-bound-executor disk-bound-executor + :disk-bound-queue disk-bound-queue))) + +(defn network-load + [peer] + (.size (:selector-continuations-queue peer))) + +(defn cpu-load + [peer] + (.size (:cpu-bound-queue peer))) + +(defn disk-load + [peer] + (.size (:disk-bound-queue peer))) diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj index 78a02dc..b74354b 100644 --- a/src/org/gnu/clojure/gnunet/util.clj +++ b/src/org/gnu/clojure/gnunet/util.clj @@ -14,3 +14,7 @@ "Return the maximum in a collection of comparable values." [& coll] (last (sort coll))) + +(defn available-processors + [] + (.availableProcessors (Runtime/getRuntime))) |