summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-11-07 13:21:16 -0800
committerDavid Barksdale <amatus.amongus@gmail.com>2010-11-07 13:21:16 -0800
commit467e4963cbaabd826adccf8020ca59bcc4949834 (patch)
treeef9d2b16d975367091caba49c83eb3c7c31d7889 /src
parentbc32b275367e808abb6afc51cdc320f3ef3dad6a (diff)
More use of cpu-bound-executor and a few optimisations.
Diffstat (limited to 'src')
-rw-r--r--src/org/gnu/clojure/gnunet/transport.clj120
-rw-r--r--src/org/gnu/clojure/gnunet/udp.clj2
2 files changed, 69 insertions, 53 deletions
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj
index 1ad910f..02ce676 100644
--- a/src/org/gnu/clojure/gnunet/transport.clj
+++ b/src/org/gnu/clojure/gnunet/transport.clj
@@ -71,19 +71,20 @@
[expiration parse-date
peer-id (items id-size)
address-length parse-uint32
- transport parse-utf8
- :let [transport-length (count (encode-utf8 transport))]
+ transport (none-or-more (satisfy #(not (== 0 %))))
+ zero item
+ :let [transport-length (inc (count transport))]
:when (>= address-length transport-length)
encoded-address (items (- address-length transport-length))]
{:expiration expiration
:peer-id peer-id
- :transport transport
+ :transport (String. (byte-array transport) "UTF-8")
:encoded-address encoded-address}))
:let [signature-purpose (:purpose signed)]
- :when (or (= signature-purpose signature-purpose-pong-own)
- (= signature-purpose signature-purpose-pong-using))
- residue (none-or-more item)
- :when (empty? residue)]
+ :when (or (== signature-purpose signature-purpose-pong-own)
+ (== signature-purpose signature-purpose-pong-using))
+ residue (optional item)
+ :when (nil? residue)]
(conj {:challenge challenge
:signature signature
:signature-purpose signature-purpose
@@ -221,35 +222,37 @@
(defn send-pong-own!
[peer remote-peer ping]
- (when-let [transport-addresses ((deref (:transport-addresses-agent peer))
- (:transport ping))]
- ;; XXX: Here we're looking for an exact match, gnunet allows transport
- ;; plugins to do inexact matches.
- (when (contains? transport-addresses (:encoded-address ping))
- (let [pong {:challenge (:challenge ping)
- :signature-purpose signature-purpose-pong-own
- :expiration (pong-expiration)
- :peer-id (:id peer)
- :transport (:transport ping)
- :encoded-address (:encoded-address ping)}
- signed-material (encode-pong-signed-material pong)
- signature (rsa-sign (:private-key peer) signed-material)
- pong (assoc pong :signature signature)
- encoded-pong (encode-pong pong signed-material)]
- ;; XXX: gnunet looks for a "reliable" connection for the pong before
- ;; sending to every known address.
- (doseq [transport-addresses (deref
- (:transport-addresses-agent remote-peer))
- address (val transport-addresses)]
- (when-let [transport ((deref (:transports-agent peer))
- (key transport-addresses))]
- ((:emit-messages! transport) transport remote-peer (key address) nil
- [{:message-type message-type-pong :bytes encoded-pong}])))))))
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [transport-addresses ((deref (:transport-addresses-agent peer))
+ (:transport ping))
+ ;; XXX: Here we're looking for an exact match, gnunet allows transport
+ ;; plugins to do inexact matches.
+ :when (contains? transport-addresses (:encoded-address ping))]
+ (let [pong {:challenge (:challenge ping)
+ :signature-purpose signature-purpose-pong-own
+ :expiration (pong-expiration)
+ :peer-id (:id peer)
+ :transport (:transport ping)
+ :encoded-address (:encoded-address ping)}
+ signed-material (encode-pong-signed-material pong)
+ signature (rsa-sign (:private-key peer) signed-material)
+ pong (assoc pong :signature signature)
+ encoded-pong (encode-pong pong signed-material)]
+ ;; XXX: gnunet looks for a "reliable" connection for the pong before
+ ;; sending to every known address.
+ (doseq [transports (deref (:transport-addresses-agent remote-peer))
+ address (val transports)]
+ (when-let [transport ((deref (:transports-agent peer))
+ (key transports))]
+ ((:emit-messages! transport) transport remote-peer (key address)
+ nil
+ [{:message-type message-type-pong :bytes encoded-pong}]))))))))
(defn send-pong-using!
[peer remote-peer ping]
- (.write *out* "We don't handle PONG_USING yet!\n")
- )
+ (.write *out* "We don't send PONG_USING yet!\n"))
(defn handle-ping!
[peer remote-peer message]
@@ -259,30 +262,40 @@
(:transport ping) (send-pong-own! peer remote-peer ping)
:else (send-pong-using! peer remote-peer ping))))
+(defn handle-pong-own!
+ [peer remote-peer pong]
+ (.execute (:cpu-bound-executor peer)
+ (fn []
+ (domonad maybe-m
+ [public-key (deref (:public-key-atom remote-peer))
+ :when (rsa-verify public-key (:signed-material pong)
+ (:signature pong))]
+ (send-do-exception-m! (:transport-addresses-agent remote-peer)
+ [address (with-state-field (:transport pong)
+ (fetch-val (:encoded-address pong)))
+ :when (= (:challenge address) (:challenge pong))
+ _ (with-state-field (:transport pong)
+ (set-val (:encoded-address pong)
+ {:expiration (hello-address-expiration)
+ :latency (- (.getTime (Date.))
+ (.getTime (:send-time address)))}))]
+ nil)))))
+
+(defn handle-pong-using!
+ [peer remote-peer pong]
+ ;; TODO - fill in this case
+ (.write *out* "We don't handle PONG_USING yet!\n"))
+
(defn handle-pong!
[peer message]
(domonad maybe-m
[pong (first (parse-pong (:bytes message)))
- :when (not (.after (Date.) (:expiration pong)))
- remote-peer ((deref (:remote-peers-agent peer)) (:peer-id pong))
- public-key (deref (:public-key-atom remote-peer))]
- (send-do-exception-m! (:transport-addresses-agent remote-peer)
- [address (with-state-field (:transport pong)
- (fetch-val (:encoded-address pong)))
- :when (= (:challenge address) (:challenge pong))
- _ (cond
- (= signature-purpose-pong-own (:signature-purpose pong))
- (if (rsa-verify public-key (:signed-material pong) (:signature pong))
- (with-state-field (:transport pong)
- (set-val (:encoded-address pong)
- {:expiration (hello-address-expiration)
- :latency (- (.getTime (Date.))
- (.getTime (:send-time address)))}))
- break)
- (= signature-purpose-pong-using (:signature-purpose pong))
- ;; TODO - fill in this case
- break
- :else break)] nil)))
+ :when-not (.after (Date.) (:expiration pong))
+ remote-peer ((deref (:remote-peers-agent peer)) (:peer-id pong))]
+ (condp == (:signature-purpose pong)
+ signature-purpose-pong-own (handle-pong-own! peer remote-peer pong)
+ signature-purpose-pong-using (handle-pong-using! peer remote-peer pong)
+ nil)))
(defn emit-continuation!
[peer transport remote-peer encoded-address result]
@@ -299,6 +312,7 @@
(defn admit-message!
[peer sender-id address message]
+ ;; (.write *out* (str "Received " message "\n"))
(send (:remote-peers-agent peer)
(fn [remote-peers]
(let [remote-peers (update-remote-peers remote-peers
diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj
index d16ff1b..e0aac16 100644
--- a/src/org/gnu/clojure/gnunet/udp.clj
+++ b/src/org/gnu/clojure/gnunet/udp.clj
@@ -45,6 +45,8 @@
(defn emit-messages-udp!
[peer transport remote-peer encoded-address continuation! messages]
+ ;;(doseq [message messages]
+ ;; (.write *out* (str "Send " message "\n")))
(if-let [address (first (parse-address encoded-address))]
(let [continuation! #(do
(emit-continuation! peer transport remote-peer