summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/org/gnu/clojure/gnunet/transport.clj15
-rw-r--r--src/org/gnu/clojure/gnunet/udp.clj31
-rw-r--r--src/org/gnu/clojure/gnunet/util.clj5
3 files changed, 32 insertions, 19 deletions
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj
index 3a6d7bd..4800a06 100644
--- a/src/org/gnu/clojure/gnunet/transport.clj
+++ b/src/org/gnu/clojure/gnunet/transport.clj
@@ -116,7 +116,7 @@
(if-let [transport ((deref (:transports-agent peer))
(:transport address))]
(let [challenge (.nextInt (:random peer))]
- ((:emit-messages! transport) remote-peer (:encoded-address address)
+ ((:emit-messages! transport) transport (:encoded-address address)
[(hello-for-peer-message peer)
(ping-message remote-peer address challenge)])
(conj address
@@ -146,8 +146,6 @@
(send (:remote-peers-agent peer) update-remote-peers peer-id hello)
(send (:remote-peers-agent peer) verify-remote-peers peer)))))
-
-
(defn best-transport
[peer remote-peer]
(let [addresses (deref (:transport-addressess-agent remote-peer))
@@ -173,3 +171,14 @@
remote-peer)]
(conj {:transport transport :address address}
((:connect! transport) peer remote-peer address)))))))
+
+(defn admit-message!
+ [peer sender-id source-address message]
+ (let [string-builder (StringBuilder. "Received message type ")]
+ (.append string-builder (:message-type message))
+ (.append string-builder " from ")
+ (.append string-builder source-address)
+ (.append string-builder " id ")
+ (.append string-builder sender-id)
+ (.append string-builder "\n")
+ (.write *out* (.toString string-builder))))
diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj
index 04ae637..ad1b413 100644
--- a/src/org/gnu/clojure/gnunet/udp.clj
+++ b/src/org/gnu/clojure/gnunet/udp.clj
@@ -1,5 +1,5 @@
(ns org.gnu.clojure.gnunet.udp
- (:use (org.gnu.clojure.gnunet inet parser message peer transport)
+ (:use (org.gnu.clojure.gnunet inet parser message peer transport util)
clojure.contrib.monads)
(:import (java.util Date Calendar)
(java.net InetSocketAddress DatagramPacket)
@@ -13,20 +13,20 @@
(defn encode-udp
[udp]
(concat
- (:peer-id udp)
+ (:sender-id udp)
(mapcat encode-message (:messages udp))))
(def parse-udp
- (domonad parser-m [peer-id (parse-uint id-size)
+ (domonad parser-m [sender-id (items id-size)
messages (none-or-more parse-message)]
- {:peer-id peer-id :messages messages}))
+ {:sender-id sender-id :messages messages}))
(defn generate-udp-message
- [remote-peer messages]
+ [peer messages]
(encode-message
{:message-type message-type-udp
:bytes (encode-udp
- {:peer-id (:id remote-peer) :messages messages})}))
+ {:sender-id (:id peer) :messages messages})}))
(defn configure-udp-addresses!
"Adds new addresses for the udp transport to peer's transports-agent expiring
@@ -45,11 +45,10 @@
(.add Calendar/HOUR_OF_DAY 12)))})))))))
(defn emit-messages-udp!
- [peer remote-peer encoded-address messages]
- (let [address (first (parse-address encoded-address))
- transport ((deref (:transports-agent peer)) "udp")]
+ [peer transport encoded-address messages]
+ (let [address (first (parse-address encoded-address))]
(.add (:send-queue transport)
- {:bytes (generate-udp-message remote-peer messages)
+ {:bytes (generate-udp-message peer messages)
:address address})
(.add (:selector-continuations-queue peer)
#(.interestOps (:selection-key transport)
@@ -71,12 +70,12 @@
(let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear))
source-address (.receive datagram-channel byte-buffer)]
(.flip byte-buffer)
- (let [string-builder (StringBuilder. "Received packet of length ")]
- (.append string-builder (.limit byte-buffer))
- (.append string-builder " from ")
- (.append string-builder source-address)
- (.append string-builder "\n")
- (.write *out* (.toString string-builder)))))
+ (when-let [{udp :message} (first ((parse-message-types
+ {message-type-udp parse-udp})
+ (buffer-seq! byte-buffer)))]
+ (if (not (= (:sender-id udp) (seq (:id peer))))
+ (doseq [message (:messages udp)]
+ (admit-message! peer (:sender-id udp) source-address message))))))
(defn handle-channel-selected!
[peer datagram-channel selection-key]
diff --git a/src/org/gnu/clojure/gnunet/util.clj b/src/org/gnu/clojure/gnunet/util.clj
index ccaa195..78a02dc 100644
--- a/src/org/gnu/clojure/gnunet/util.clj
+++ b/src/org/gnu/clojure/gnunet/util.clj
@@ -5,6 +5,11 @@
[queue]
(lazy-seq (when-let [c (.poll queue)] (cons c (queue-seq! queue)))))
+(defn buffer-seq!
+ [buffer]
+ (lazy-seq (when (.hasRemaining buffer)
+ (cons (.get buffer) (buffer-seq! buffer)))))
+
(defn my-max
"Return the maximum in a collection of comparable values."
[& coll]