summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-11-07 22:49:32 -0800
committerDavid Barksdale <amatus.amongus@gmail.com>2010-11-07 22:49:32 -0800
commit55c00c94ec90c723c04ef0fe79c55c71dada01a7 (patch)
tree09c508f17124377f00d76fdbafd32cee988cebc1 /src
parent467e4963cbaabd826adccf8020ca59bcc4949834 (diff)
Added metrics. More work on filesharing.
Diffstat (limited to 'src')
-rw-r--r--src/org/gnu/clojure/gnunet/core.clj2
-rw-r--r--src/org/gnu/clojure/gnunet/filesharing.clj79
-rw-r--r--src/org/gnu/clojure/gnunet/metrics.clj16
-rw-r--r--src/org/gnu/clojure/gnunet/peer.clj18
-rw-r--r--src/org/gnu/clojure/gnunet/transport.clj6
-rw-r--r--src/org/gnu/clojure/gnunet/udp.clj2
6 files changed, 92 insertions, 31 deletions
diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj
index 8607bc3..32e48a7 100644
--- a/src/org/gnu/clojure/gnunet/core.clj
+++ b/src/org/gnu/clojure/gnunet/core.clj
@@ -336,7 +336,7 @@
(let [state (if (contains? state :status)
state
(initialize-remote-peer-state peer state))]
- (.write *out* (str "Core: " message "\n"))
+ ;; (.write *out* (str "Core: " message "\n"))
(condp = (:message-type message)
message-type-core-set-key (handle-set-key! peer remote-peer message)
message-type-core-encrypted-message (handle-core-encrypted-message!
diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj
index a12779a..584b705 100644
--- a/src/org/gnu/clojure/gnunet/filesharing.clj
+++ b/src/org/gnu/clojure/gnunet/filesharing.clj
@@ -1,7 +1,8 @@
(ns org.gnu.clojure.gnunet.filesharing
- (:use (org.gnu.clojure.gnunet bloomfilter crypto exception message parser
- peer)
- clojure.contrib.monads))
+ (:use (org.gnu.clojure.gnunet bloomfilter crypto exception message metrics
+ parser peer)
+ clojure.contrib.monads)
+ (:import java.util.Date))
(def message-type-fs-get 137)
(def message-type-fs-put 138)
@@ -42,19 +43,34 @@
(defn bound-priority
"Monadic function of the exception-m monad. Updates :trust and
:average-priority and returns a bounded priority."
- ;; TODO: check load, don't charge if we're idle, drop if we're too busy
- [priority]
- (fn [state]
- (let [priority (min priority (:turst state 0))
- trust (- (:trust state) priority)
- state (assoc state :trust trust)]
- (if (< 0 priority)
- (let [n 128
- average (:average-priority state 0.0)
- p (min priority (+ average n))
- average (/ (+ p (* average (dec n))) n)]
- [priority (assoc state :average-priority average)])
- [priority state]))))
+ [peer priority]
+ (domonad exception-m
+ [;; TODO: come up with a better load-limit
+ :let [load-limit (+ (network-load peer) (cpu-load peer) (disk-load peer))]
+ :let [priority (if (== 0 load-limit)
+ (do
+ (metric-add peer "Filesharing requests done for free" 1)
+ 0)
+ priority)]
+ trust (fetch-val :trust 0)
+ :let [priority (min priority trust)]
+ _ (update-val :average-priority 0.0
+ #(if (< 0 priority)
+ (let [n 128
+ p (min priority (+ % n))]
+ (/ (+ p (* % (dec n))) n))
+ %))
+ :when (if (<= load-limit priority)
+ true
+ (do (metric-add peer
+ "Filesharing requests dropped, priority insufficient" 1)
+ false))
+ _ (set-val :trust (- trust priority))]
+ priority))
+
+(defn forward-request!
+ [peer query-id]
+ (.write *out* (str "Query " query-id " isn't going anywhere\n")))
(defn admit-get!
[peer remote-peer message]
@@ -65,16 +81,35 @@
((deref (:remote-peers-agent peer))
(:return-to get-message))
remote-peer)]
- :when (:is-connected (deref (:state-agent return-to))) ;; TODO: try connect
- priority (bound-priority (:priority get-message))
+ :when (if (:is-connected (deref (:state-agent return-to)))
+ true
+ ;; TODO: try connect
+ (do (metric-add peer
+ "Filesharing requests dropped, missing reverse route" 1)
+ false))
+ priority (bound-priority peer (:priority get-message))
:let [ttl (min ttl-max (:ttl get-message)
(* priority ttl-decrement 0.001))]
:let [ttl (- ttl (* 2 ttl-decrement)
(.nextInt (:random peer) ttl-decrement))]
- duplicate (with-state-field :queries
- (fetch-val (:query get-message)))
- ]
- nil))
+ :let [start-time (Date.)]]
+ (send-do-exception-m! (:state-agent peer)
+ [query (with-state-field :queries
+ (fetch-val (:query get-message) {}))
+ :let [duplicate (query (:id return-to))]
+ :when (if (nil? duplicate)
+ true
+ (do (metric-add peer "Filehsaring requests dropped, duplicate" 1)
+ false))
+ _ (with-state-field :queries
+ (set-val (:query get-message)
+ (assoc query (:id return-to)
+ (conj get-message
+ {:priority priority
+ :ttl ttl
+ :start-time start-time
+ :anonymity 1}))))]
+ (forward-request! peer (:query get-message)))))
(defn admit-put!
[peer remote-peer message])
diff --git a/src/org/gnu/clojure/gnunet/metrics.clj b/src/org/gnu/clojure/gnunet/metrics.clj
new file mode 100644
index 0000000..6aa12f1
--- /dev/null
+++ b/src/org/gnu/clojure/gnunet/metrics.clj
@@ -0,0 +1,16 @@
+(ns org.gnu.clojure.gnunet.metrics)
+
+(defn metric-set
+ [peer metric value]
+ (send (:metrics-agent peer)
+ (fn [metrics]
+ (assoc metrics metric value))))
+
+(defn metric-add
+ ([peer metric value]
+ (metric-add peer metric value 0))
+ ([peer metric value zero]
+ (send (:metrics-agent peer)
+ (fn [metrics]
+ (let [old-value (get metrics metric zero)]
+ (assoc metrics metric (+ old-value value)))))))
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj
index d8beccc..cc16b17 100644
--- a/src/org/gnu/clojure/gnunet/peer.clj
+++ b/src/org/gnu/clojure/gnunet/peer.clj
@@ -17,7 +17,8 @@
;; (java.util.Date) :latency (int, if validated)}
:transport-addresses-agent
- ;; agent of a map of state (nil for local peer?)
+ ;; agent of a map of state.
+ ;; Remote-peers:
;; { (shared between layers)
;; :is-connected (boolean)
;; :connected-transport (value from peer-struct:transports-agent)
@@ -32,7 +33,11 @@
;; (filesharing layer)
;; :trust (int)
;; :average-priority (float)
- ;; :queries (map of query hashes to get-messages)}
+ ;; }
+ ;; Local peer:
+ ;; { (filesharing layer)
+ ;; :queries (map of query hashes to maps of return peer ids to queries)
+ ;; }
:state-agent)
(def peer-struct (apply create-struct (concat
@@ -83,7 +88,9 @@
;; java.util.concurrent.PriorityBlockingQueue of unbounded capacity.
;; The size of this queue is an easy measure of our disk load.
:disk-bound-queue
- ))))
+
+ ;; agent of a map of Strings to Numbers.
+ :metrics-agent))))
(defstruct peer-options
:keypair)
@@ -126,6 +133,7 @@
:public-key-atom (atom (.getPublic (:keypair options)))
:id (generate-id (.getPublic (:keypair options)))
:transport-addresses-agent (agent {})
+ :state-agent (agent {})
:private-key (.getPrivate (:keypair options))
:remote-peers-agent (agent {})
:transports-agent (agent {})
@@ -137,10 +145,12 @@
:cpu-bound-executor cpu-bound-executor
:cpu-bound-queue cpu-bound-queue
:disk-bound-executor disk-bound-executor
- :disk-bound-queue disk-bound-queue)))
+ :disk-bound-queue disk-bound-queue
+ :metrics-agent (agent {}))))
(defn network-load
[peer]
+ ;; TODO: figure out if we really need size, it's an O(n) operation
(.size (:selector-continuations-queue peer)))
(defn cpu-load
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj
index 02ce676..ac1f923 100644
--- a/src/org/gnu/clojure/gnunet/transport.clj
+++ b/src/org/gnu/clojure/gnunet/transport.clj
@@ -1,6 +1,6 @@
(ns org.gnu.clojure.gnunet.transport
- (:use (org.gnu.clojure.gnunet core crypto exception hello message parser peer
- util)
+ (:use (org.gnu.clojure.gnunet core crypto exception hello message metrics
+ parser peer util)
clojure.contrib.monads)
(:import (java.util Date Calendar)))
@@ -279,7 +279,7 @@
{:expiration (hello-address-expiration)
:latency (- (.getTime (Date.))
(.getTime (:send-time address)))}))]
- nil)))))
+ (metric-add peer "Peer addresses considered valid" 1))))))
(defn handle-pong-using!
[peer remote-peer pong]
diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj
index e0aac16..81500ca 100644
--- a/src/org/gnu/clojure/gnunet/udp.clj
+++ b/src/org/gnu/clojure/gnunet/udp.clj
@@ -86,7 +86,7 @@
(when-let [{udp :message} (first ((parse-message-types
{message-type-udp parse-udp})
(buffer-seq! byte-buffer)))]
- (if (not (= (:sender-id udp) (:id peer)))
+ (when-not (= (:sender-id udp) (:id peer))
(doseq [message (:messages udp)]
(admit-message! peer (:sender-id udp) address message))))))