summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-11-09 23:49:18 -0800
committerDavid Barksdale <amatus.amongus@gmail.com>2010-11-09 23:49:18 -0800
commit34eda9478fbe9ec22c763afdc8c871836a1a60fd (patch)
treea6874352b09e15be552252e5478e66f547f907e3 /src
parent55c00c94ec90c723c04ef0fe79c55c71dada01a7 (diff)
Added TTL expiration heap. Worked on forward-request!
Diffstat (limited to 'src')
-rw-r--r--src/org/gnu/clojure/gnunet/filesharing.clj76
-rw-r--r--src/org/gnu/clojure/gnunet/peer.clj15
-rw-r--r--src/org/gnu/clojure/gnunet/util.clj2
3 files changed, 73 insertions, 20 deletions
diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj
index 584b705..5b868e0 100644
--- a/src/org/gnu/clojure/gnunet/filesharing.clj
+++ b/src/org/gnu/clojure/gnunet/filesharing.clj
@@ -2,18 +2,20 @@
(:use (org.gnu.clojure.gnunet bloomfilter crypto exception message metrics
parser peer)
clojure.contrib.monads)
- (:import java.util.Date))
+ (:import (java.util Date PriorityQueue)
+ java.util.concurrent.TimeUnit))
(def message-type-fs-get 137)
(def message-type-fs-put 138)
(def message-type-fs-migration-stop 139)
-(def bit-return-to 1)
-(def bit-sks-namespace 2)
-(def bit-transmit-to 3)
+(def bit-return-to 0)
+(def bit-sks-namespace 1)
+(def bit-transmit-to 2)
(def ttl-decrement 5000)
(def ttl-max 1073741824)
+(def max-pending-requests 32768)
(def parse-get-message
(domonad parser-m
@@ -68,9 +70,39 @@
_ (set-val :trust (- trust priority))]
priority))
+(defn target-peer-select
+ [query best candidate]
+ (if (= (key candidate) (:return-to query))
+ best
+ ;; TODO: come on, seriously?
+ candidate))
+
(defn forward-request!
[peer query-id]
- (.write *out* (str "Query " query-id " isn't going anywhere\n")))
+ (send-do-exception-m! (:state-agent peer)
+ [query (with-state-field :queries
+ (fetch-val query-id))
+ :when-not (nil? query)
+ :let [send-to (reduce (partial target-peer-select query) nil
+ (deref (:remote-peers-agent peer)))]
+ :when (if (nil? send-to)
+ (do (.schedule (:scheduled-executor peer)
+ (partial forward-request! peer query-id)
+ (+ 1000 (.nextInt (:random peer) ttl-decrement))
+ TimeUnit/MILLISECONDS)
+ false)
+ true)]
+ (send-do-exception-m! (:state-agent send-to)
+ [is-connected (fetch-val :is-connected)
+ :when is-connected]
+ nil)))
+
+(def ttl-comparator
+ (reify java.util.Comparator
+ (compare [this o1 o2]
+ (clojure.core/compare (:ttl (meta o1)) (:ttl (meta o2))))
+ (equals [this obj]
+ (== (:ttl (meta this)) (:ttl (meta obj))))))
(defn admit-get!
[peer remote-peer message]
@@ -94,21 +126,35 @@
(.nextInt (:random peer) ttl-decrement))]
:let [start-time (Date.)]]
(send-do-exception-m! (:state-agent peer)
- [query (with-state-field :queries
- (fetch-val (:query get-message) {}))
+ [queries (fetch-val :queries)
+ :let [query (queries (:query get-message) {})]
:let [duplicate (query (:id return-to))]
:when (if (nil? duplicate)
true
(do (metric-add peer "Filehsaring requests dropped, duplicate" 1)
false))
- _ (with-state-field :queries
- (set-val (:query get-message)
- (assoc query (:id return-to)
- (conj get-message
- {:priority priority
- :ttl ttl
- :start-time start-time
- :anonymity 1}))))]
+ :let [queries (assoc queries (:query get-message)
+ (assoc query (:id return-to)
+ (conj get-message
+ {:priority priority
+ :ttl ttl
+ :start-time start-time
+ :anonymity 1})))]
+ ttl-queue (fetch-val :ttl-queue (PriorityQueue. 1 ttl-comparator))
+ :let [_ (.add ttl-queue (with-meta [(:query get-message) (:id return-to)]
+ {:ttl (+ ttl (.getTime start-time))}))]
+ :let [_ (metric-set peer
+ "Filesharing pending requests" (.size ttl-queue))]
+ :let [expired (when (< max-pending-requests (.size ttl-queue))
+ (.poll ttl-queue))]
+ :let [queries (if (nil? expired)
+ queries
+ (let [query (dissoc (queries (first expired))
+ (second expired))]
+ (if (empty? query)
+ (dissoc queries (first expired))
+ (assoc queries (first expired) query))))]
+ _ (set-val :queries queries)]
(forward-request! peer (:query get-message)))))
(defn admit-put!
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj
index cc16b17..f441e1d 100644
--- a/src/org/gnu/clojure/gnunet/peer.clj
+++ b/src/org/gnu/clojure/gnunet/peer.clj
@@ -2,7 +2,8 @@
(:use (org.gnu.clojure.gnunet crypto message util))
(:import java.nio.channels.Selector
(java.util.concurrent ConcurrentLinkedQueue LinkedBlockingQueue
- PriorityBlockingQueue ThreadPoolExecutor TimeUnit)
+ PriorityBlockingQueue ScheduledThreadPoolExecutor ThreadPoolExecutor
+ TimeUnit)
java.security.SecureRandom))
(defstruct remote-peer-struct
@@ -36,7 +37,9 @@
;; }
;; Local peer:
;; { (filesharing layer)
- ;; :queries (map of query hashes to maps of return peer ids to queries)
+ ;; :queries (map of query hashes to maps of return-to peer ids to queries)
+ ;; :ttl-queue (java.util.PriorityQueue of [query return-to] pairs sorted by
+ ;; :ttl stored in metadata)
;; }
:state-agent)
@@ -90,7 +93,10 @@
:disk-bound-queue
;; agent of a map of Strings to Numbers.
- :metrics-agent))))
+ :metrics-agent
+
+ ;; java.util.concurrent.ScheduledThreadPoolExecutor
+ :scheduled-executor))))
(defstruct peer-options
:keypair)
@@ -146,7 +152,8 @@
:cpu-bound-queue cpu-bound-queue
:disk-bound-executor disk-bound-executor
:disk-bound-queue disk-bound-queue
- :metrics-agent (agent {}))))
+ :metrics-agent (agent {})
+ :scheduled-executor (ScheduledThreadPoolExecutor. 1))))
(defn network-load
[peer]
diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj
index b74354b..d4de23f 100644
--- a/src/org/gnu/clojure/gnunet/util.clj
+++ b/src/org/gnu/clojure/gnunet/util.clj
@@ -9,7 +9,7 @@
[buffer]
(lazy-seq (when (.hasRemaining buffer)
(cons (.get buffer) (buffer-seq! buffer)))))
-
+
(defn my-max
"Return the maximum in a collection of comparable values."
[& coll]