diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 13:21:16 -0800 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-07 13:21:16 -0800 |
commit | 467e4963cbaabd826adccf8020ca59bcc4949834 (patch) | |
tree | ef9d2b16d975367091caba49c83eb3c7c31d7889 /src | |
parent | bc32b275367e808abb6afc51cdc320f3ef3dad6a (diff) |
More use of cpu-bound-executor and a few optimisations.
Diffstat (limited to 'src')
-rw-r--r-- | src/org/gnu/clojure/gnunet/transport.clj | 120 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 2 |
2 files changed, 69 insertions, 53 deletions
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj index 1ad910f..02ce676 100644 --- a/src/org/gnu/clojure/gnunet/transport.clj +++ b/src/org/gnu/clojure/gnunet/transport.clj @@ -71,19 +71,20 @@ [expiration parse-date peer-id (items id-size) address-length parse-uint32 - transport parse-utf8 - :let [transport-length (count (encode-utf8 transport))] + transport (none-or-more (satisfy #(not (== 0 %)))) + zero item + :let [transport-length (inc (count transport))] :when (>= address-length transport-length) encoded-address (items (- address-length transport-length))] {:expiration expiration :peer-id peer-id - :transport transport + :transport (String. (byte-array transport) "UTF-8") :encoded-address encoded-address})) :let [signature-purpose (:purpose signed)] - :when (or (= signature-purpose signature-purpose-pong-own) - (= signature-purpose signature-purpose-pong-using)) - residue (none-or-more item) - :when (empty? residue)] + :when (or (== signature-purpose signature-purpose-pong-own) + (== signature-purpose signature-purpose-pong-using)) + residue (optional item) + :when (nil? residue)] (conj {:challenge challenge :signature signature :signature-purpose signature-purpose @@ -221,35 +222,37 @@ (defn send-pong-own! [peer remote-peer ping] - (when-let [transport-addresses ((deref (:transport-addresses-agent peer)) - (:transport ping))] - ;; XXX: Here we're looking for an exact match, gnunet allows transport - ;; plugins to do inexact matches. - (when (contains? transport-addresses (:encoded-address ping)) - (let [pong {:challenge (:challenge ping) - :signature-purpose signature-purpose-pong-own - :expiration (pong-expiration) - :peer-id (:id peer) - :transport (:transport ping) - :encoded-address (:encoded-address ping)} - signed-material (encode-pong-signed-material pong) - signature (rsa-sign (:private-key peer) signed-material) - pong (assoc pong :signature signature) - encoded-pong (encode-pong pong signed-material)] - ;; XXX: gnunet looks for a "reliable" connection for the pong before - ;; sending to every known address. - (doseq [transport-addresses (deref - (:transport-addresses-agent remote-peer)) - address (val transport-addresses)] - (when-let [transport ((deref (:transports-agent peer)) - (key transport-addresses))] - ((:emit-messages! transport) transport remote-peer (key address) nil - [{:message-type message-type-pong :bytes encoded-pong}]))))))) + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [transport-addresses ((deref (:transport-addresses-agent peer)) + (:transport ping)) + ;; XXX: Here we're looking for an exact match, gnunet allows transport + ;; plugins to do inexact matches. + :when (contains? transport-addresses (:encoded-address ping))] + (let [pong {:challenge (:challenge ping) + :signature-purpose signature-purpose-pong-own + :expiration (pong-expiration) + :peer-id (:id peer) + :transport (:transport ping) + :encoded-address (:encoded-address ping)} + signed-material (encode-pong-signed-material pong) + signature (rsa-sign (:private-key peer) signed-material) + pong (assoc pong :signature signature) + encoded-pong (encode-pong pong signed-material)] + ;; XXX: gnunet looks for a "reliable" connection for the pong before + ;; sending to every known address. + (doseq [transports (deref (:transport-addresses-agent remote-peer)) + address (val transports)] + (when-let [transport ((deref (:transports-agent peer)) + (key transports))] + ((:emit-messages! transport) transport remote-peer (key address) + nil + [{:message-type message-type-pong :bytes encoded-pong}])))))))) (defn send-pong-using! [peer remote-peer ping] - (.write *out* "We don't handle PONG_USING yet!\n") - ) + (.write *out* "We don't send PONG_USING yet!\n")) (defn handle-ping! [peer remote-peer message] @@ -259,30 +262,40 @@ (:transport ping) (send-pong-own! peer remote-peer ping) :else (send-pong-using! peer remote-peer ping)))) +(defn handle-pong-own! + [peer remote-peer pong] + (.execute (:cpu-bound-executor peer) + (fn [] + (domonad maybe-m + [public-key (deref (:public-key-atom remote-peer)) + :when (rsa-verify public-key (:signed-material pong) + (:signature pong))] + (send-do-exception-m! (:transport-addresses-agent remote-peer) + [address (with-state-field (:transport pong) + (fetch-val (:encoded-address pong))) + :when (= (:challenge address) (:challenge pong)) + _ (with-state-field (:transport pong) + (set-val (:encoded-address pong) + {:expiration (hello-address-expiration) + :latency (- (.getTime (Date.)) + (.getTime (:send-time address)))}))] + nil))))) + +(defn handle-pong-using! + [peer remote-peer pong] + ;; TODO - fill in this case + (.write *out* "We don't handle PONG_USING yet!\n")) + (defn handle-pong! [peer message] (domonad maybe-m [pong (first (parse-pong (:bytes message))) - :when (not (.after (Date.) (:expiration pong))) - remote-peer ((deref (:remote-peers-agent peer)) (:peer-id pong)) - public-key (deref (:public-key-atom remote-peer))] - (send-do-exception-m! (:transport-addresses-agent remote-peer) - [address (with-state-field (:transport pong) - (fetch-val (:encoded-address pong))) - :when (= (:challenge address) (:challenge pong)) - _ (cond - (= signature-purpose-pong-own (:signature-purpose pong)) - (if (rsa-verify public-key (:signed-material pong) (:signature pong)) - (with-state-field (:transport pong) - (set-val (:encoded-address pong) - {:expiration (hello-address-expiration) - :latency (- (.getTime (Date.)) - (.getTime (:send-time address)))})) - break) - (= signature-purpose-pong-using (:signature-purpose pong)) - ;; TODO - fill in this case - break - :else break)] nil))) + :when-not (.after (Date.) (:expiration pong)) + remote-peer ((deref (:remote-peers-agent peer)) (:peer-id pong))] + (condp == (:signature-purpose pong) + signature-purpose-pong-own (handle-pong-own! peer remote-peer pong) + signature-purpose-pong-using (handle-pong-using! peer remote-peer pong) + nil))) (defn emit-continuation! [peer transport remote-peer encoded-address result] @@ -299,6 +312,7 @@ (defn admit-message! [peer sender-id address message] + ;; (.write *out* (str "Received " message "\n")) (send (:remote-peers-agent peer) (fn [remote-peers] (let [remote-peers (update-remote-peers remote-peers diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index d16ff1b..e0aac16 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -45,6 +45,8 @@ (defn emit-messages-udp! [peer transport remote-peer encoded-address continuation! messages] + ;;(doseq [message messages] + ;; (.write *out* (str "Send " message "\n"))) (if-let [address (first (parse-address encoded-address))] (let [continuation! #(do (emit-continuation! peer transport remote-peer |