diff options
authorDavid Barksdale <>2010-11-07 00:28:46 -0700
committerDavid Barksdale <>2010-11-07 00:28:46 -0700
commitbc32b275367e808abb6afc51cdc320f3ef3dad6a (patch)
parent7bef3c6f9d4c3e53c76762c6938f5a5aaeb125a5 (diff)
Added ThreadPoolExecutors to keep track of CPU and IO load.
Network load should correspond to the selector-continuations-queue. Removed some overzalous use of the exception-m monad.
4 files changed, 218 insertions, 141 deletions
diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj
index 0c6442d..8607bc3 100644
--- a/src/org/gnu/clojure/gnunet/core.clj
+++ b/src/org/gnu/clojure/gnunet/core.clj
@@ -161,13 +161,12 @@
(defn emit-messages!
[peer remote-peer messages]
- (send-do-exception-m! (:state-agent remote-peer)
- [is-connected (fetch-val :is-connected)
- :when is-connected
- transport (fetch-val :connected-transport)
- encoded-address (fetch-val :connected-address)]
- ((:emit-messages! transport) transport remote-peer encoded-address nil
- messages)))
+ (let [state (deref (:state-agent remote-peer))
+ transport (:connected-transport state)
+ encoded-address (:connected-address state)]
+ (when (:is-connected state)
+ ((:emit-messages! transport) transport remote-peer encoded-address nil
+ messages))))
(defn send-key!
[peer remote-peer]
@@ -175,96 +174,108 @@
[:when-let [public-key (deref (:public-key-atom remote-peer))]
is-connected (fetch-val :is-connected)
:when is-connected
- _ (update-val :status #(if (= % peer-status-down) peer-status-key-sent %))
+ _ (update-val :status #(if (== % peer-status-down) peer-status-key-sent %))
sender-status (fetch-val :status)
creation-time (fetch-val :encrypt-key-created)
encrypt-key (fetch-val :encrypt-key)
challenge (fetch-val :ping-challenge)]
- (let [set-key {:sender-status sender-status
- :creation-time creation-time
- :peer-id (:id remote-peer)
- :encrypted-key (rsa-encrypt! public-key
- (encode-aes-key encrypt-key)
- (:random peer))}
- signed-material (encode-set-key-signed-material set-key)
- signature (rsa-sign (:private-key peer) signed-material)
- set-key (assoc set-key :signature signature)
- encoded-set-key (encode-set-key set-key signed-material)
- iv-seed (.nextInt (:random peer))
- ping {:iv-seed iv-seed
- :challenge challenge
- :peer-id (:id remote-peer)}
- encoded-ping (encode-core-ping ping encrypt-key (:id remote-peer))]
- (emit-messages! peer remote-peer
- [{:message-type message-type-core-set-key :bytes encoded-set-key}
- {:message-type message-type-core-ping :bytes encoded-ping}]))))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (let [set-key {:sender-status sender-status
+ :creation-time creation-time
+ :peer-id (:id remote-peer)
+ :encrypted-key (rsa-encrypt! public-key
+ (encode-aes-key encrypt-key)
+ (:random peer))}
+ signed-material (encode-set-key-signed-material set-key)
+ signature (rsa-sign (:private-key peer) signed-material)
+ set-key (assoc set-key :signature signature)
+ encoded-set-key (encode-set-key set-key signed-material)
+ iv-seed (.nextInt (:random peer))
+ ping {:iv-seed iv-seed
+ :challenge challenge
+ :peer-id (:id remote-peer)}
+ encoded-ping (encode-core-ping ping encrypt-key
+ (:id remote-peer))]
+ (emit-messages! peer remote-peer
+ [{:message-type message-type-core-set-key :bytes encoded-set-key}
+ {:message-type message-type-core-ping :bytes encoded-ping}]))))))
(defn handle-set-key!
[peer remote-peer message]
- (send-do-exception-m! (:state-agent remote-peer)
- [:when-let [public-key (deref (:public-key-atom remote-peer))]
- :when-let [set-key (first (parse-set-key (:bytes message)))]
- :when (= (:peer-id set-key) (:id peer))
- :when (rsa-verify public-key
- (:signed-material set-key) (:signature set-key))
- status (fetch-val :status)
- decrypt-key-created (fetch-val :decrypt-key-created)
- :when-not (and
- (or (= status peer-status-key-received)
- (= status peer-status-key-confirmed))
- (.after decrypt-key-created (:creation-time set-key)))
- :when-let [decrypted-key (rsa-decrypt (:private-key peer)
- (:encrypted-key set-key))]
- ;; XXX: For some reason we end up with an extra 0 byte at the
- ;; beginning of the decrypted-key when the MSB is 1.
- :let [decrypted-key (drop (- (count decrypted-key) aes-key-size 4)
- decrypted-key)]
- :when-let [decrypt-key (first (parse-aes-key decrypted-key))]
- _ (set-val :decrypt-key decrypt-key)
- :let [creation-time (:creation-time set-key)]
- _ (update-state #(if (= decrypt-key-created creation-time)
- %
- (conj % {:last-sequence-number-received 0
- :last-packets-bitmap (int 0)
- :decrypt-key-created creation-time})))
- :let [sender-status (:sender-status set-key)]
- _ (update-val :status
- #(if (= % peer-status-key-confirmed) % peer-status-key-received))]
- (when (or (= status peer-status-down)
- (and (not (= sender-status peer-status-key-received))
- (not (= sender-status peer-status-key-confirmed))))
- (send-key! peer remote-peer))))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [public-key (deref (:public-key-atom remote-peer))
+ set-key (first (parse-set-key (:bytes message)))
+ :when (= (:peer-id set-key) (:id peer))
+ :when (rsa-verify public-key
+ (:signed-material set-key) (:signature set-key))
+ decrypted-key (rsa-decrypt (:private-key peer)
+ (:encrypted-key set-key))
+ :let [decrypted-key (drop (- (count decrypted-key) aes-key-size 4)
+ decrypted-key)]
+ decrypt-key (first (parse-aes-key decrypted-key))]
+ (send-do-exception-m! (:state-agent remote-peer)
+ [status (fetch-val :status)
+ decrypt-key-created (fetch-val :decrypt-key-created)
+ :when-not (and
+ (or (== status peer-status-key-received)
+ (== status peer-status-key-confirmed))
+ (.after decrypt-key-created (:creation-time set-key)))
+ _ (set-val :decrypt-key decrypt-key)
+ :let [creation-time (:creation-time set-key)]
+ _ (update-state #(if (= decrypt-key-created creation-time)
+ %
+ (conj % {:last-sequence-number-received 0
+ :last-packets-bitmap (int 0)
+ :decrypt-key-created creation-time})))
+ :let [sender-status (:sender-status set-key)]
+ _ (update-val :status
+ #(if (== % peer-status-key-confirmed)
+ %
+ peer-status-key-received))]
+ (when (or (== status peer-status-down)
+ (and (not (== sender-status peer-status-key-received))
+ (not (== sender-status peer-status-key-confirmed))))
+ (send-key! peer remote-peer)))))))
(defn handle-core-ping!
[peer remote-peer message]
- (send-do-exception-m! (:state-agent remote-peer)
- [decrypt-key (fetch-val :decrypt-key)
- :when decrypt-key
- :when-let [ping (first ((parse-core-ping decrypt-key (:id peer))
- (:bytes message)))]
- bw-in (fetch-val :bw-in)
- encrypt-key (fetch-val :encrypt-key)]
- (let [iv-seed (.nextInt (:random peer))
- pong {:iv-seed iv-seed
- :challenge (:challenge ping)
- :inbound-bw-limit bw-in
- :peer-id (:id peer)}
- encoded-pong (encode-core-pong pong encrypt-key (:id remote-peer))]
- (emit-messages! peer remote-peer
- [{:message-type message-type-core-pong :bytes encoded-pong}]))))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [:let [state (deref (:state-agent remote-peer))]
+ decrypt-key (:decrypt-key state)
+ ping (first ((parse-core-ping decrypt-key (:id peer))
+ (:bytes message)))]
+ (let [bw-in (:bw-in state)
+ encrypt-key (:encrypt-key state)
+ iv-seed (.nextInt (:random peer))
+ pong {:iv-seed iv-seed
+ :challenge (:challenge ping)
+ :inbound-bw-limit bw-in
+ :peer-id (:id peer)}
+ encoded-pong (encode-core-pong pong encrypt-key
+ (:id remote-peer))]
+ (emit-messages! peer remote-peer
+ [{:message-type message-type-core-pong :bytes encoded-pong}]))))))
(defn handle-core-pong!
[peer remote-peer message]
- (send-do-exception-m! (:state-agent remote-peer)
- [decrypt-key (fetch-val :decrypt-key)
- :when decrypt-key
- challenge (fetch-val :ping-challenge)
- :when-let [pong (first ((parse-core-pong decrypt-key
- challenge (:id peer)) (:bytes message)))]
- :when (= (:peer-id pong) (:id remote-peer))
- status (fetch-val :status)
- :when (= status peer-status-key-received)
- _ (set-val :status peer-status-key-confirmed)] nil))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [:let [state (deref (:state-agent remote-peer))]
+ decrypt-key (:decrypt-key state)
+ :let [challenge (:ping-challenge state)]
+ pong (first ((parse-core-pong decrypt-key challenge (:id peer))
+ (:bytes message)))
+ :when (= (:peer-id pong) (:id remote-peer))]
+ (send (:state-agent remote-peer)
+ #(if (== peer-status-key-received (:status %))
+ (assoc % :status peer-status-key-confirmed)
+ %))))))
(defn admit-core-message!
[peer remote-peer message]
@@ -277,52 +288,60 @@
(defn handle-core-encrypted-message!
[peer remote-peer message]
- (send-do-exception-m! (:state-agent remote-peer)
- [decrypt-key (fetch-val :decrypt-key)
- :when decrypt-key
- decrypt-key-created (fetch-val :decrypt-key-created)
- :when-let [message (first
- ((parse-core-encrypted-message decrypt-key
- decrypt-key-created (:id peer)) (:bytes message)))]
- last-seqnum (fetch-val :last-sequence-number-received)
- :let [seqnum (:sequence-number message)]
- :when-not (== last-seqnum seqnum)
- :when-not (> last-seqnum (+ 32 seqnum))
- bitmap (fetch-val :last-packets-bitmap)
- :let [bit (- last-seqnum seqnum 1)]
- :when-not (and (> last-seqnum seqnum) (bit-test bitmap bit))
- _ (update-state
- #(if (> last-seqnum seqnum)
- (assoc % :last-packets-bitmap (bit-set bitmap bit))
- (conj %
- {:last-sequence-number-received seqnum
- :last-packets-bitmap (.intValue
- (bit-shift-left (bigint bitmap)
- (- seqnum last-seqnum)))})))
- :when-not (.before (:timestamp message) (message-expiration))
- ;; TODO: update bandwidth tracking
- ]
- (doseq [message (:messages message)]
- (admit-core-message! peer remote-peer message))))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [:let [state (deref (:state-agent remote-peer))]
+ decrypt-key (:decrypt-key state)
+ :let [decrypt-key-created (:decrypt-key-created state)]
+ message (first ((parse-core-encrypted-message decrypt-key
+ decrypt-key-created (:id peer)) (:bytes message)))]
+ (send-do-exception-m! (:state-agent remote-peer)
+ [last-seqnum (fetch-val :last-sequence-number-received)
+ :let [seqnum (:sequence-number message)]
+ :when-not (== last-seqnum seqnum)
+ :when-not (> last-seqnum (+ 32 seqnum))
+ bitmap (fetch-val :last-packets-bitmap)
+ :let [bit (- last-seqnum seqnum 1)]
+ :when-not (and (> last-seqnum seqnum) (bit-test bitmap bit))
+ _ (update-state
+ #(if (> last-seqnum seqnum)
+ (assoc % :last-packets-bitmap (bit-set bitmap bit))
+ (conj %
+ {:last-sequence-number-received seqnum
+ :last-packets-bitmap (.intValue
+ (bit-shift-left (bigint bitmap)
+ (- seqnum last-seqnum)))})))
+ :when-not (.before (:timestamp message) (message-expiration))
+ ;; TODO: update bandwidth tracking
+ ]
+ (doseq [message (:messages message)]
+ (admit-core-message! peer remote-peer message)))))))
+(defn initialize-remote-peer-state
+ [peer state]
+ (conj state
+ {:status peer-status-down
+ :decrypt-key-created (Date. (long 0))
+ :encrypt-key (generate-aes-key! (:random peer))
+ :encrypt-key-created (Date.)
+ :ping-challenge (.nextInt (:random peer))
+ ;; TODO: Make this a real number
+ :bw-in 20000}))
(defn handle-receive!
[peer remote-peer message]
- (send-do-exception-m! (:state-agent remote-peer)
- [_ (update-state
- #(if (contains? % :status)
- %
- (conj %
- {:status peer-status-down
- :decrypt-key-created (Date. (long 0))
- :encrypt-key (generate-aes-key! (:random peer))
- :encrypt-key-created (Date.)
- :ping-challenge (.nextInt (:random peer))
- ;; TODO: Make this a real number
- :bw-in 20000})))]
- (condp = (:message-type message)
- message-type-core-set-key (handle-set-key! peer remote-peer message)
- message-type-core-encrypted-message (handle-core-encrypted-message!
- peer remote-peer message)
- message-type-core-ping (handle-core-ping! peer remote-peer message)
- message-type-core-pong (handle-core-pong! peer remote-peer message)
- nil)))
+ (send (:state-agent remote-peer)
+ (fn [state]
+ (let [state (if (contains? state :status)
+ state
+ (initialize-remote-peer-state peer state))]
+ (.write *out* (str "Core: " message "\n"))
+ (condp = (:message-type message)
+ message-type-core-set-key (handle-set-key! peer remote-peer message)
+ message-type-core-encrypted-message (handle-core-encrypted-message!
+ peer remote-peer message)
+ message-type-core-ping (handle-core-ping! peer remote-peer message)
+ message-type-core-pong (handle-core-pong! peer remote-peer message)
+ nil)
+ state))))
diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj
index 525fefd..a12779a 100644
--- a/src/org/gnu/clojure/gnunet/filesharing.clj
+++ b/src/org/gnu/clojure/gnunet/filesharing.clj
@@ -42,6 +42,7 @@
(defn bound-priority
"Monadic function of the exception-m monad. Updates :trust and
:average-priority and returns a bounded priority."
+ ;; TODO: check load, don't charge if we're idle, drop if we're too busy
(fn [state]
(let [priority (min priority (:turst state 0))
@@ -65,13 +66,14 @@
(:return-to get-message))
:when (:is-connected (deref (:state-agent return-to))) ;; TODO: try connect
- ;; TODO: check load and drop message if load is too high
priority (bound-priority (:priority get-message))
:let [ttl (min ttl-max (:ttl get-message)
(* priority ttl-decrement 0.001))]
:let [ttl (- ttl (* 2 ttl-decrement)
(.nextInt (:random peer) ttl-decrement))]
- :when (< 0 ttl)]
+ duplicate (with-state-field :queries
+ (fetch-val (:query get-message)))
+ ]
(defn admit-put!
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj
index 2eac978..d8beccc 100644
--- a/src/org/gnu/clojure/gnunet/peer.clj
+++ b/src/org/gnu/clojure/gnunet/peer.clj
@@ -1,7 +1,8 @@
(ns org.gnu.clojure.gnunet.peer
(:use (org.gnu.clojure.gnunet crypto message util))
(:import java.nio.channels.Selector
- java.util.concurrent.ConcurrentLinkedQueue
+ (java.util.concurrent ConcurrentLinkedQueue LinkedBlockingQueue
+ PriorityBlockingQueue ThreadPoolExecutor TimeUnit)
(defstruct remote-peer-struct
@@ -22,7 +23,7 @@
;; :connected-transport (value from peer-struct:transports-agent)
;; :connected-address (byte vector)
;; (core layer)
- ;; :status peer-status-down (int)
+ ;; :status (int)
;; :decrypt-key-created (java.util.Date)
;; :encrypt-key (
;; :encrypt-key-created (java.util.Date)
@@ -30,7 +31,8 @@
;; :bw-in (int)
;; (filesharing layer)
;; :trust (int)
- ;; :average-priority (float)}
+ ;; :average-priority (float)
+ ;; :queries (map of query hashes to get-messages)}
(def peer-struct (apply create-struct (concat
@@ -54,13 +56,34 @@
;; Thread which selects on :selector
- ;; java.util.concurrent.ConcurrentLinkedQueue of continuations, in order to
- ;; access the selector while the selector-thread is running add a
- ;; continuation to this queue and call .wakeup on the selector
+ ;; java.util.concurrent.ConcurrentLinkedQueue of continuations.
+ ;; In order to access the selector while the selector-thread is running add
+ ;; a continuation to this queue and call .wakeup on the selector.
+ ;; The size of this queue is an easy measure our network load.
- :random))))
+ :random
+ ;; java.util.concurrent.ThreadPoolExecutor for executing CPU-bound
+ ;; operations like generating RSA keys, hashes, etc. It has one thread for
+ ;; each processor in the system and a "practically" unbounded FIFO queue.
+ :cpu-bound-executor
+ ;; java.util.concurrent.LinkedBlockingQueue of Integer.MAX_VALUE capacity.
+ ;; The size of this queue is an easy measure of our CPU load.
+ :cpu-bound-queue
+ ;; java.util.concurrent.ThreadPoolExecutor for executing disk-bound
+ ;; operations like quering a database or reading/writing files. It has a
+ ;; single thread and an unbounded priority queue. The priority of a callable
+ ;; object is stored in its metadata under the key :priority.
+ :disk-bound-executor
+ ;; java.util.concurrent.PriorityBlockingQueue of unbounded capacity.
+ ;; The size of this queue is an easy measure of our disk load.
+ :disk-bound-queue
+ ))))
(defstruct peer-options
@@ -72,6 +95,13 @@
(def id-size hash-size)
+(def priority-comparator
+ (reify java.util.Comparator
+ (compare [this o1 o2]
+ (clojure.core/compare (:priority (meta o2)) (:priority (meta o1))))
+ (equals [this obj]
+ (== (:priority (meta this)) (:priority (meta obj))))))
(defn selector-loop!
[selector continuations]
(doseq [continuation (queue-seq! continuations)]
@@ -85,7 +115,13 @@
(defn new-peer [options]
(let [selector (Selector/open)
- continuations (ConcurrentLinkedQueue.)]
+ continuations (ConcurrentLinkedQueue.)
+ cpu-bound-queue (LinkedBlockingQueue.)
+ cpu-bound-executor (ThreadPoolExecutor. 0 (available-processors) 60
+ TimeUnit/SECONDS cpu-bound-queue)
+ disk-bound-queue (PriorityBlockingQueue. 1 priority-comparator)
+ disk-bound-executor (ThreadPoolExecutor. 0 1 60 TimeUnit/SECONDS
+ disk-bound-queue)]
(struct-map peer-struct
:public-key-atom (atom (.getPublic (:keypair options)))
:id (generate-id (.getPublic (:keypair options)))
@@ -97,4 +133,20 @@
:selector selector
:selector-thread (Thread. (partial selector-loop! selector continuations))
:selector-continuations-queue continuations
- :random (:random options))))
+ :random (:random options)
+ :cpu-bound-executor cpu-bound-executor
+ :cpu-bound-queue cpu-bound-queue
+ :disk-bound-executor disk-bound-executor
+ :disk-bound-queue disk-bound-queue)))
+(defn network-load
+ [peer]
+ (.size (:selector-continuations-queue peer)))
+(defn cpu-load
+ [peer]
+ (.size (:cpu-bound-queue peer)))
+(defn disk-load
+ [peer]
+ (.size (:disk-bound-queue peer)))
diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj
index 78a02dc..b74354b 100644
--- a/src/org/gnu/clojure/gnunet/util.clj
+++ b/src/org/gnu/clojure/gnunet/util.clj
@@ -14,3 +14,7 @@
"Return the maximum in a collection of comparable values."
[& coll]
(last (sort coll)))
+(defn available-processors
+ []
+ (.availableProcessors (Runtime/getRuntime)))