diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-09 23:49:18 -0800 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-09 23:49:18 -0800 |
commit | 34eda9478fbe9ec22c763afdc8c871836a1a60fd (patch) | |
tree | a6874352b09e15be552252e5478e66f547f907e3 /src | |
parent | 55c00c94ec90c723c04ef0fe79c55c71dada01a7 (diff) |
Added TTL expiration heap. Worked on forward-request!
Diffstat (limited to 'src')
-rw-r--r-- | src/org/gnu/clojure/gnunet/filesharing.clj | 76 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/peer.clj | 15 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/util.clj | 2 |
3 files changed, 73 insertions, 20 deletions
diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj index 584b705..5b868e0 100644 --- a/src/org/gnu/clojure/gnunet/filesharing.clj +++ b/src/org/gnu/clojure/gnunet/filesharing.clj @@ -2,18 +2,20 @@ (:use (org.gnu.clojure.gnunet bloomfilter crypto exception message metrics parser peer) clojure.contrib.monads) - (:import java.util.Date)) + (:import (java.util Date PriorityQueue) + java.util.concurrent.TimeUnit)) (def message-type-fs-get 137) (def message-type-fs-put 138) (def message-type-fs-migration-stop 139) -(def bit-return-to 1) -(def bit-sks-namespace 2) -(def bit-transmit-to 3) +(def bit-return-to 0) +(def bit-sks-namespace 1) +(def bit-transmit-to 2) (def ttl-decrement 5000) (def ttl-max 1073741824) +(def max-pending-requests 32768) (def parse-get-message (domonad parser-m @@ -68,9 +70,39 @@ _ (set-val :trust (- trust priority))] priority)) +(defn target-peer-select + [query best candidate] + (if (= (key candidate) (:return-to query)) + best + ;; TODO: come on, seriously? + candidate)) + (defn forward-request! [peer query-id] - (.write *out* (str "Query " query-id " isn't going anywhere\n"))) + (send-do-exception-m! (:state-agent peer) + [query (with-state-field :queries + (fetch-val query-id)) + :when-not (nil? query) + :let [send-to (reduce (partial target-peer-select query) nil + (deref (:remote-peers-agent peer)))] + :when (if (nil? send-to) + (do (.schedule (:scheduled-executor peer) + (partial forward-request! peer query-id) + (+ 1000 (.nextInt (:random peer) ttl-decrement)) + TimeUnit/MILLISECONDS) + false) + true)] + (send-do-exception-m! (:state-agent send-to) + [is-connected (fetch-val :is-connected) + :when is-connected] + nil))) + +(def ttl-comparator + (reify java.util.Comparator + (compare [this o1 o2] + (clojure.core/compare (:ttl (meta o1)) (:ttl (meta o2)))) + (equals [this obj] + (== (:ttl (meta this)) (:ttl (meta obj)))))) (defn admit-get! [peer remote-peer message] @@ -94,21 +126,35 @@ (.nextInt (:random peer) ttl-decrement))] :let [start-time (Date.)]] (send-do-exception-m! (:state-agent peer) - [query (with-state-field :queries - (fetch-val (:query get-message) {})) + [queries (fetch-val :queries) + :let [query (queries (:query get-message) {})] :let [duplicate (query (:id return-to))] :when (if (nil? duplicate) true (do (metric-add peer "Filehsaring requests dropped, duplicate" 1) false)) - _ (with-state-field :queries - (set-val (:query get-message) - (assoc query (:id return-to) - (conj get-message - {:priority priority - :ttl ttl - :start-time start-time - :anonymity 1}))))] + :let [queries (assoc queries (:query get-message) + (assoc query (:id return-to) + (conj get-message + {:priority priority + :ttl ttl + :start-time start-time + :anonymity 1})))] + ttl-queue (fetch-val :ttl-queue (PriorityQueue. 1 ttl-comparator)) + :let [_ (.add ttl-queue (with-meta [(:query get-message) (:id return-to)] + {:ttl (+ ttl (.getTime start-time))}))] + :let [_ (metric-set peer + "Filesharing pending requests" (.size ttl-queue))] + :let [expired (when (< max-pending-requests (.size ttl-queue)) + (.poll ttl-queue))] + :let [queries (if (nil? expired) + queries + (let [query (dissoc (queries (first expired)) + (second expired))] + (if (empty? query) + (dissoc queries (first expired)) + (assoc queries (first expired) query))))] + _ (set-val :queries queries)] (forward-request! peer (:query get-message))))) (defn admit-put! diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj index cc16b17..f441e1d 100644 --- a/src/org/gnu/clojure/gnunet/peer.clj +++ b/src/org/gnu/clojure/gnunet/peer.clj @@ -2,7 +2,8 @@ (:use (org.gnu.clojure.gnunet crypto message util)) (:import java.nio.channels.Selector (java.util.concurrent ConcurrentLinkedQueue LinkedBlockingQueue - PriorityBlockingQueue ThreadPoolExecutor TimeUnit) + PriorityBlockingQueue ScheduledThreadPoolExecutor ThreadPoolExecutor + TimeUnit) java.security.SecureRandom)) (defstruct remote-peer-struct @@ -36,7 +37,9 @@ ;; } ;; Local peer: ;; { (filesharing layer) - ;; :queries (map of query hashes to maps of return peer ids to queries) + ;; :queries (map of query hashes to maps of return-to peer ids to queries) + ;; :ttl-queue (java.util.PriorityQueue of [query return-to] pairs sorted by + ;; :ttl stored in metadata) ;; } :state-agent) @@ -90,7 +93,10 @@ :disk-bound-queue ;; agent of a map of Strings to Numbers. - :metrics-agent)))) + :metrics-agent + + ;; java.util.concurrent.ScheduledThreadPoolExecutor + :scheduled-executor)))) (defstruct peer-options :keypair) @@ -146,7 +152,8 @@ :cpu-bound-queue cpu-bound-queue :disk-bound-executor disk-bound-executor :disk-bound-queue disk-bound-queue - :metrics-agent (agent {})))) + :metrics-agent (agent {}) + :scheduled-executor (ScheduledThreadPoolExecutor. 1)))) (defn network-load [peer] diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj index b74354b..d4de23f 100644 --- a/src/org/gnu/clojure/gnunet/util.clj +++ b/src/org/gnu/clojure/gnunet/util.clj @@ -9,7 +9,7 @@ [buffer] (lazy-seq (when (.hasRemaining buffer) (cons (.get buffer) (buffer-seq! buffer))))) - + (defn my-max "Return the maximum in a collection of comparable values." [& coll] |