diff options
-rw-r--r-- | src/org/gnu/clojure/gnunet/transport.clj | 15 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 31 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/util.clj | 5 |
3 files changed, 32 insertions, 19 deletions
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj index 3a6d7bd..4800a06 100644 --- a/src/org/gnu/clojure/gnunet/transport.clj +++ b/src/org/gnu/clojure/gnunet/transport.clj @@ -116,7 +116,7 @@ (if-let [transport ((deref (:transports-agent peer)) (:transport address))] (let [challenge (.nextInt (:random peer))] - ((:emit-messages! transport) remote-peer (:encoded-address address) + ((:emit-messages! transport) transport (:encoded-address address) [(hello-for-peer-message peer) (ping-message remote-peer address challenge)]) (conj address @@ -146,8 +146,6 @@ (send (:remote-peers-agent peer) update-remote-peers peer-id hello) (send (:remote-peers-agent peer) verify-remote-peers peer))))) - - (defn best-transport [peer remote-peer] (let [addresses (deref (:transport-addressess-agent remote-peer)) @@ -173,3 +171,14 @@ remote-peer)] (conj {:transport transport :address address} ((:connect! transport) peer remote-peer address))))))) + +(defn admit-message! + [peer sender-id source-address message] + (let [string-builder (StringBuilder. "Received message type ")] + (.append string-builder (:message-type message)) + (.append string-builder " from ") + (.append string-builder source-address) + (.append string-builder " id ") + (.append string-builder sender-id) + (.append string-builder "\n") + (.write *out* (.toString string-builder)))) diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index 04ae637..ad1b413 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -1,5 +1,5 @@ (ns org.gnu.clojure.gnunet.udp - (:use (org.gnu.clojure.gnunet inet parser message peer transport) + (:use (org.gnu.clojure.gnunet inet parser message peer transport util) clojure.contrib.monads) (:import (java.util Date Calendar) (java.net InetSocketAddress DatagramPacket) @@ -13,20 +13,20 @@ (defn encode-udp [udp] (concat - (:peer-id udp) + (:sender-id udp) (mapcat encode-message (:messages udp)))) (def parse-udp - (domonad parser-m [peer-id (parse-uint id-size) + (domonad parser-m [sender-id (items id-size) messages (none-or-more parse-message)] - {:peer-id peer-id :messages messages})) + {:sender-id sender-id :messages messages})) (defn generate-udp-message - [remote-peer messages] + [peer messages] (encode-message {:message-type message-type-udp :bytes (encode-udp - {:peer-id (:id remote-peer) :messages messages})})) + {:sender-id (:id peer) :messages messages})})) (defn configure-udp-addresses! "Adds new addresses for the udp transport to peer's transports-agent expiring @@ -45,11 +45,10 @@ (.add Calendar/HOUR_OF_DAY 12)))}))))))) (defn emit-messages-udp! - [peer remote-peer encoded-address messages] - (let [address (first (parse-address encoded-address)) - transport ((deref (:transports-agent peer)) "udp")] + [peer transport encoded-address messages] + (let [address (first (parse-address encoded-address))] (.add (:send-queue transport) - {:bytes (generate-udp-message remote-peer messages) + {:bytes (generate-udp-message peer messages) :address address}) (.add (:selector-continuations-queue peer) #(.interestOps (:selection-key transport) @@ -71,12 +70,12 @@ (let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear)) source-address (.receive datagram-channel byte-buffer)] (.flip byte-buffer) - (let [string-builder (StringBuilder. "Received packet of length ")] - (.append string-builder (.limit byte-buffer)) - (.append string-builder " from ") - (.append string-builder source-address) - (.append string-builder "\n") - (.write *out* (.toString string-builder))))) + (when-let [{udp :message} (first ((parse-message-types + {message-type-udp parse-udp}) + (buffer-seq! byte-buffer)))] + (if (not (= (:sender-id udp) (seq (:id peer)))) + (doseq [message (:messages udp)] + (admit-message! peer (:sender-id udp) source-address message)))))) (defn handle-channel-selected! [peer datagram-channel selection-key] diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj index ccaa195..78a02dc 100644 --- a/src/org/gnu/clojure/gnunet/util.clj +++ b/src/org/gnu/clojure/gnunet/util.clj @@ -5,6 +5,11 @@ [queue] (lazy-seq (when-let [c (.poll queue)] (cons c (queue-seq! queue))))) +(defn buffer-seq! + [buffer] + (lazy-seq (when (.hasRemaining buffer) + (cons (.get buffer) (buffer-seq! buffer))))) + (defn my-max "Return the maximum in a collection of comparable values." [& coll] |