diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 22:49:32 -0800 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 22:49:32 -0800 |
commit | 55c00c94ec90c723c04ef0fe79c55c71dada01a7 (patch) | |
tree | 09c508f17124377f00d76fdbafd32cee988cebc1 /src | |
parent | 467e4963cbaabd826adccf8020ca59bcc4949834 (diff) |
Added metrics. More work on filesharing.
Diffstat (limited to 'src')
-rw-r--r-- | src/org/gnu/clojure/gnunet/core.clj | 2 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/filesharing.clj | 79 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/metrics.clj | 16 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/peer.clj | 18 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/transport.clj | 6 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 2 |
6 files changed, 92 insertions, 31 deletions
diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj index 8607bc3..32e48a7 100644 --- a/src/org/gnu/clojure/gnunet/core.clj +++ b/src/org/gnu/clojure/gnunet/core.clj @@ -336,7 +336,7 @@ (let [state (if (contains? state :status) state (initialize-remote-peer-state peer state))] - (.write *out* (str "Core: " message "\n")) + ;; (.write *out* (str "Core: " message "\n")) (condp = (:message-type message) message-type-core-set-key (handle-set-key! peer remote-peer message) message-type-core-encrypted-message (handle-core-encrypted-message! diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj index a12779a..584b705 100644 --- a/src/org/gnu/clojure/gnunet/filesharing.clj +++ b/src/org/gnu/clojure/gnunet/filesharing.clj @@ -1,7 +1,8 @@ (ns org.gnu.clojure.gnunet.filesharing - (:use (org.gnu.clojure.gnunet bloomfilter crypto exception message parser - peer) - clojure.contrib.monads)) + (:use (org.gnu.clojure.gnunet bloomfilter crypto exception message metrics + parser peer) + clojure.contrib.monads) + (:import java.util.Date)) (def message-type-fs-get 137) (def message-type-fs-put 138) @@ -42,19 +43,34 @@ (defn bound-priority "Monadic function of the exception-m monad. Updates :trust and :average-priority and returns a bounded priority." - ;; TODO: check load, don't charge if we're idle, drop if we're too busy - [priority] - (fn [state] - (let [priority (min priority (:turst state 0)) - trust (- (:trust state) priority) - state (assoc state :trust trust)] - (if (< 0 priority) - (let [n 128 - average (:average-priority state 0.0) - p (min priority (+ average n)) - average (/ (+ p (* average (dec n))) n)] - [priority (assoc state :average-priority average)]) - [priority state])))) + [peer priority] + (domonad exception-m + [;; TODO: come up with a better load-limit + :let [load-limit (+ (network-load peer) (cpu-load peer) (disk-load peer))] + :let [priority (if (== 0 load-limit) + (do + (metric-add peer "Filesharing requests done for free" 1) + 0) + priority)] + trust (fetch-val :trust 0) + :let [priority (min priority trust)] + _ (update-val :average-priority 0.0 + #(if (< 0 priority) + (let [n 128 + p (min priority (+ % n))] + (/ (+ p (* % (dec n))) n)) + %)) + :when (if (<= load-limit priority) + true + (do (metric-add peer + "Filesharing requests dropped, priority insufficient" 1) + false)) + _ (set-val :trust (- trust priority))] + priority)) + +(defn forward-request! + [peer query-id] + (.write *out* (str "Query " query-id " isn't going anywhere\n"))) (defn admit-get! [peer remote-peer message] @@ -65,16 +81,35 @@ ((deref (:remote-peers-agent peer)) (:return-to get-message)) remote-peer)] - :when (:is-connected (deref (:state-agent return-to))) ;; TODO: try connect - priority (bound-priority (:priority get-message)) + :when (if (:is-connected (deref (:state-agent return-to))) + true + ;; TODO: try connect + (do (metric-add peer + "Filesharing requests dropped, missing reverse route" 1) + false)) + priority (bound-priority peer (:priority get-message)) :let [ttl (min ttl-max (:ttl get-message) (* priority ttl-decrement 0.001))] :let [ttl (- ttl (* 2 ttl-decrement) (.nextInt (:random peer) ttl-decrement))] - duplicate (with-state-field :queries - (fetch-val (:query get-message))) - ] - nil)) + :let [start-time (Date.)]] + (send-do-exception-m! (:state-agent peer) + [query (with-state-field :queries + (fetch-val (:query get-message) {})) + :let [duplicate (query (:id return-to))] + :when (if (nil? duplicate) + true + (do (metric-add peer "Filehsaring requests dropped, duplicate" 1) + false)) + _ (with-state-field :queries + (set-val (:query get-message) + (assoc query (:id return-to) + (conj get-message + {:priority priority + :ttl ttl + :start-time start-time + :anonymity 1}))))] + (forward-request! peer (:query get-message))))) (defn admit-put! [peer remote-peer message]) diff --git a/src/org/gnu/clojure/gnunet/metrics.clj b/src/org/gnu/clojure/gnunet/metrics.clj new file mode 100644 index 0000000..6aa12f1 --- /dev/null +++ b/src/org/gnu/clojure/gnunet/metrics.clj @@ -0,0 +1,16 @@ +(ns org.gnu.clojure.gnunet.metrics) + +(defn metric-set + [peer metric value] + (send (:metrics-agent peer) + (fn [metrics] + (assoc metrics metric value)))) + +(defn metric-add + ([peer metric value] + (metric-add peer metric value 0)) + ([peer metric value zero] + (send (:metrics-agent peer) + (fn [metrics] + (let [old-value (get metrics metric zero)] + (assoc metrics metric (+ old-value value))))))) diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj index d8beccc..cc16b17 100644 --- a/src/org/gnu/clojure/gnunet/peer.clj +++ b/src/org/gnu/clojure/gnunet/peer.clj @@ -17,7 +17,8 @@ ;; (java.util.Date) :latency (int, if validated)} :transport-addresses-agent - ;; agent of a map of state (nil for local peer?) + ;; agent of a map of state. + ;; Remote-peers: ;; { (shared between layers) ;; :is-connected (boolean) ;; :connected-transport (value from peer-struct:transports-agent) @@ -32,7 +33,11 @@ ;; (filesharing layer) ;; :trust (int) ;; :average-priority (float) - ;; :queries (map of query hashes to get-messages)} + ;; } + ;; Local peer: + ;; { (filesharing layer) + ;; :queries (map of query hashes to maps of return peer ids to queries) + ;; } :state-agent) (def peer-struct (apply create-struct (concat @@ -83,7 +88,9 @@ ;; java.util.concurrent.PriorityBlockingQueue of unbounded capacity. ;; The size of this queue is an easy measure of our disk load. :disk-bound-queue - )))) + + ;; agent of a map of Strings to Numbers. + :metrics-agent)))) (defstruct peer-options :keypair) @@ -126,6 +133,7 @@ :public-key-atom (atom (.getPublic (:keypair options))) :id (generate-id (.getPublic (:keypair options))) :transport-addresses-agent (agent {}) + :state-agent (agent {}) :private-key (.getPrivate (:keypair options)) :remote-peers-agent (agent {}) :transports-agent (agent {}) @@ -137,10 +145,12 @@ :cpu-bound-executor cpu-bound-executor :cpu-bound-queue cpu-bound-queue :disk-bound-executor disk-bound-executor - :disk-bound-queue disk-bound-queue))) + :disk-bound-queue disk-bound-queue + :metrics-agent (agent {})))) (defn network-load [peer] + ;; TODO: figure out if we really need size, it's an O(n) operation (.size (:selector-continuations-queue peer))) (defn cpu-load diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj index 02ce676..ac1f923 100644 --- a/src/org/gnu/clojure/gnunet/transport.clj +++ b/src/org/gnu/clojure/gnunet/transport.clj @@ -1,6 +1,6 @@ (ns org.gnu.clojure.gnunet.transport - (:use (org.gnu.clojure.gnunet core crypto exception hello message parser peer - util) + (:use (org.gnu.clojure.gnunet core crypto exception hello message metrics + parser peer util) clojure.contrib.monads) (:import (java.util Date Calendar))) @@ -279,7 +279,7 @@ {:expiration (hello-address-expiration) :latency (- (.getTime (Date.)) (.getTime (:send-time address)))}))] - nil))))) + (metric-add peer "Peer addresses considered valid" 1)))))) (defn handle-pong-using! [peer remote-peer pong] diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index e0aac16..81500ca 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -86,7 +86,7 @@ (when-let [{udp :message} (first ((parse-message-types {message-type-udp parse-udp}) (buffer-seq! byte-buffer)))] - (if (not (= (:sender-id udp) (:id peer))) + (when-not (= (:sender-id udp) (:id peer)) (doseq [message (:messages udp)] (admit-message! peer (:sender-id udp) address message)))))) |