summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--NOTES4
-rw-r--r--src/org/gnu/clojure/gnunet/inet.clj20
-rw-r--r--src/org/gnu/clojure/gnunet/message.clj6
-rw-r--r--src/org/gnu/clojure/gnunet/tcp.clj130
-rw-r--r--src/org/gnu/clojure/gnunet/udp.clj30
5 files changed, 134 insertions, 56 deletions
diff --git a/NOTES b/NOTES
index 88f6f0e..067f034 100644
--- a/NOTES
+++ b/NOTES
@@ -3,7 +3,7 @@
(def keypair (generate-rsa-keypair! random))
(def my-peer (new-peer {:keypair keypair :random random}))
(.start (:selector-thread my-peer))
-(activate-udp! my-peer 5678)
-(configure-udp-addresses! my-peer (get-local-addresses) 5678)
+(activate-tcp! my-peer 5678)
+(configure-inet-addresses! my-peer "tcp" (get-local-addresses) 5678)
(activate-filesharing! my-peer)
(download-hostlist! (partial admit-hello! my-peer) "http://192.168.8.2:58080")
diff --git a/src/org/gnu/clojure/gnunet/inet.clj b/src/org/gnu/clojure/gnunet/inet.clj
index ff8fa02..376f73d 100644
--- a/src/org/gnu/clojure/gnunet/inet.clj
+++ b/src/org/gnu/clojure/gnunet/inet.clj
@@ -1,7 +1,8 @@
(ns org.gnu.clojure.gnunet.inet
- (:use (org.gnu.clojure.gnunet parser message)
+ (:use (org.gnu.clojure.gnunet message parser transport)
clojure.contrib.monads)
- (:import (java.net InetAddress InetSocketAddress NetworkInterface)))
+ (:import (java.net InetAddress InetSocketAddress NetworkInterface)
+ java.util.Date))
(defn encode-address
[inet-socket-address]
@@ -29,3 +30,18 @@
(for [interface (enumeration-seq (NetworkInterface/getNetworkInterfaces))
address (enumeration-seq (.getInetAddresses interface))]
address))
+
+(defn configure-inet-addresses!
+ "Adds new addresses for the transport to peer's transports-agent and
+ removes expired addresses."
+ [peer transport reachable-addresses port]
+ (send (:transport-addresses-agent peer)
+ (fn [addresses]
+ (merge-transport-addresses {}
+ (expire-transport-addresses (Date.)
+ (concat (list-transport-addresses addresses)
+ (for [address reachable-addresses]
+ {:transport transport
+ :encoded-address (encode-address
+ (InetSocketAddress. address port))
+ :expiration (hello-address-expiration)})))))))
diff --git a/src/org/gnu/clojure/gnunet/message.clj b/src/org/gnu/clojure/gnunet/message.clj
index ccf15d7..124070f 100644
--- a/src/org/gnu/clojure/gnunet/message.clj
+++ b/src/org/gnu/clojure/gnunet/message.clj
@@ -110,9 +110,7 @@
[parser-map]
(domonad parser-m
[message parse-message
- :let [parser (parser-map (:message-type message))]
- :when parser
- :let [parsed (first (parser (:bytes message)))]
- :when parsed]
+ :when-let [parser (parser-map (:message-type message))]
+ :when-let [parsed (first (parser (:bytes message)))]]
{:message-type (:message-type message)
:message parsed}))
diff --git a/src/org/gnu/clojure/gnunet/tcp.clj b/src/org/gnu/clojure/gnunet/tcp.clj
index 91b0d7a..1ed2469 100644
--- a/src/org/gnu/clojure/gnunet/tcp.clj
+++ b/src/org/gnu/clojure/gnunet/tcp.clj
@@ -16,11 +16,19 @@
(def parse-welcome
(domonad parser-m [my-id (items id-size)] my-id))
+(defn generate-welcome-message
+ [peer]
+ (encode-message
+ {:message-type message-type-tcp-welcome
+ :bytes (encode-welcome (:id peer))}))
+
(defn handle-disconnect!
[peer transport encoded-address send-queue selection-key]
+ (.write *out* (str "disconnect " encoded-address "\n"))
(send-do-exception-m! (:sessions-agent transport)
[sessions (fetch-state)
- :when (contains? encoded-address)
+ :when (contains? sessions encoded-address)
+ :let [_ (.write *out* (str "removing session " encoded-address "\n"))]
_ (set-state (dissoc sessions encoded-address))]
;; Send another message to the agent to make sure no new packets have been
;; added to the send-queue in the mean time.
@@ -28,45 +36,117 @@
(fn [sessions]
(.add (:selector-continuations-queue peer)
#(do
+ (.write *out* (str "canceling packets " send-queue "\n"))
(doseq [packet (queue-seq! send-queue)]
((:continuation! packet) false))
+ (.write *out* (str "closing channel " (.channel selection-key) "\n"))
+ (.close (.channel selection-key))
+ (.write *out* (str "canceling key " selection-key "\n"))
(.cancel selection-key)))
(.wakeup (:selector peer))
sessions))))
-(defn handle-channel-connectable!
+(def handle-socket-channel-selected!)
+
+(defn admit-tcp-message!
+ [peer transport encoded-address send-queue selection-key message]
+ (if (== message-type-tcp-welcome (:message-type message))
+ (send-do-exception-m! (:sessions-agent transport)
+ [:when-let [welcome (first (parse-welcome (:bytes message)))]
+ remote-peer-id (with-state-field encoded-address
+ (fetch-val :remote-peer-id))
+ :when (= remote-peer-id welcome)
+ _ (with-state-field encoded-address
+ (set-val :expecting-welcome false))]
+ (.add (:selector-continuations-queue peer)
+ #(.attach selection-key
+ (partial handle-socket-channel-selected! peer transport
+ encoded-address send-queue))))
+ (send-do-exception-m! (:sessions-agent transport)
+ [remote-peer-id (with-state-field encoded-address
+ (fetch-val :remote-peer-id))
+ :let [address {:transport "tcp"
+ :encoded-address encoded-address
+ :expiration (idle-connection-timeout)}]]
+ (admit-message! peer remote-peer-id address message))))
+
+(defn handle-socket-channel-readable!
[peer transport encoded-address send-queue selection-key]
- (try
- (.finishConnect (.channel selection-key))
- ;; TODO: update interestOps and send welcome
- (catch Exception e
+ (let [socket-channel (.channel selection-key)
+ socket (.socket socket-channel)
+ buffer-length (.getReceiveBufferSize socket)
+ byte-buffer (doto (ByteBuffer/allocate buffer-length) (.clear))
+ bytes-read (.read (.channel selection-key) byte-buffer)]
+ (if (== -1 bytes-read)
(handle-disconnect! peer transport encoded-address send-queue
- selection-key))))
+ selection-key)
+ (send-do-exception-m! (:sessions-agent transport)
+ [received-bytes (with-state-field encoded-address
+ (fetch-val :received-bytes))
+ :let [received-bytes (concat received-bytes
+ (buffer-seq! (.flip byte-buffer)))]
+ :let [[messages residue] ((one-or-more parse-message)
+ received-bytes)]
+ :let [received-bytes (if (nil? residue) received-bytes residue)]
+ _ (with-state-field encoded-address
+ (set-val :received-bytes (vec received-bytes)))]
+ (doseq [message messages]
+ (admit-tcp-message! peer transport encoded-address send-queue
+ selection-key message))))))
-(defn handle-channel-writable!
+(defn handle-socket-channel-writable!
[peer send-queue selection-key]
(.add (:selector-continuations-queue peer)
#(let [packet (.poll send-queue)]
(if (nil? packet)
(.interestOps selection-key SelectionKey/OP_READ)
(try
- (let [byte-buffer (ByteBuffer/wrap (byte-array (:bytes packet)))]
- (.write (.channel selection-key) byte-buffer))
+ (.write (.channel selection-key)
+ (ByteBuffer/wrap (byte-array (:bytes packet))))
((:continuation! packet) true)
(catch Exception e ((:continuation! packet) false)))))))
-(defn handle-channel-selected!
+(defn handle-socket-channel-selected!
[peer transport encoded-address send-queue selection-key]
- (if (.isConnectable selection-key)
- (handle-channel-connectable! peer transport encoded-address send-queue
- selection-key))
- (if (.isReadable selection-key)
- nil)
- (if (.isWritable selection-key)
- (handle-channel-writable! peer send-queue selection-key)))
+ (try
+ (if (.isReadable selection-key)
+ (handle-socket-channel-readable! peer transport encoded-address send-queue
+ selection-key))
+ (if (.isWritable selection-key)
+ (handle-socket-channel-writable! peer send-queue selection-key))
+ (catch Exception e
+ (handle-disconnect! peer transport encoded-address send-queue
+ selection-key))))
+(defn handle-connecting-channel-connectable!
+ [peer transport encoded-address send-queue selection-key]
+ (.finishConnect (.channel selection-key))
+ (.add (:selector-continuations-queue peer)
+ #(.interestOps selection-key
+ (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))))
+
+(defn handle-connecting-channel-writable!
+ [peer selection-key]
+ (.write (.channel selection-key)
+ (ByteBuffer/wrap (byte-array (generate-welcome-message peer))))
+ (.interestOps selection-key SelectionKey/OP_READ))
+
+(defn handle-connecting-channel-selected!
+ [peer transport encoded-address send-queue selection-key]
+ (try
+ (if (.isConnectable selection-key)
+ (handle-connecting-channel-connectable! peer transport encoded-address
+ send-queue selection-key))
+ (if (.isReadable selection-key)
+ (handle-socket-channel-readable! peer transport encoded-address
+ send-queue selection-key))
+ (if (.isWritable selection-key)
+ (handle-connecting-channel-writable! peer selection-key))
+ (catch Exception e
+ (handle-disconnect! peer transport encoded-address send-queue
+ selection-key))))
-(defn new-session-from-address
+(defn new-session-from-address!
[peer transport remote-peer encoded-address]
(when-let [address (first (parse-address encoded-address))]
(let [socket-channel (SocketChannel/open)
@@ -78,16 +158,17 @@
(let [selection-key (.register socket-channel
(:selector peer)
SelectionKey/OP_CONNECT
- (partial handle-channel-selected!
+ (partial handle-socket-channel-selected!
peer transport encoded-address send-queue))]
- (send-do-exception-m! (:session-agent transport)
- [_ (update-val encoded-address
- #(assoc % :selection-key selection-key))]
+ (send-do-exception-m! (:sessions-agent transport)
+ [_ (with-state-field encoded-address
+ (set-val :selection-key selection-key))]
nil))))
(.wakeup (:selector peer))
{:socket-channel socket-channel
:remote-peer-id (:id remote-peer)
:send-queue send-queue
+ :received-bytes []
:expecting-welcome true})))
(defn emit-messages-tcp!
@@ -100,7 +181,7 @@
(when continuation! (continuation! %)))]
session (fetch-val encoded-address)
:let [session (if (nil? session)
- (new-session-from-address peer transport remote-peer
+ (new-session-from-address! peer transport remote-peer
encoded-address)
session)]
:when (if (nil? session)
@@ -122,7 +203,6 @@
[peer server-channel]
)
-
(defn register-server-channel!
[peer port]
(let [server-channel (ServerSocketChannel/open)
diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj
index f06db1c..4c40d74 100644
--- a/src/org/gnu/clojure/gnunet/udp.clj
+++ b/src/org/gnu/clojure/gnunet/udp.clj
@@ -1,8 +1,7 @@
(ns org.gnu.clojure.gnunet.udp
(:use (org.gnu.clojure.gnunet inet parser message peer transport util)
clojure.contrib.monads)
- (:import (java.util Date Calendar)
- java.net.InetSocketAddress
+ (:import java.net.InetSocketAddress
(java.nio.channels DatagramChannel SelectionKey)
java.nio.ByteBuffer
java.util.concurrent.ConcurrentLinkedQueue))
@@ -28,21 +27,6 @@
:bytes (encode-udp
{:sender-id (:id peer) :messages messages})}))
-(defn configure-udp-addresses!
- "Adds new addresses for the udp transport to peer's transports-agent and
- removes expired addresses."
- [peer reachable-addresses port]
- (send (:transport-addresses-agent peer)
- (fn [addresses]
- (merge-transport-addresses {}
- (expire-transport-addresses (Date.)
- (concat (list-transport-addresses addresses)
- (for [address reachable-addresses]
- {:transport "udp"
- :encoded-address (encode-address (InetSocketAddress. address
- port))
- :expiration (hello-address-expiration)})))))))
-
(defn emit-messages-udp!
[peer transport remote-peer encoded-address continuation! messages]
;;(doseq [message messages]
@@ -62,7 +46,7 @@
(.wakeup (:selector peer)))
(when continuation! (continuation! false))))
-(defn handle-channel-writable!
+(defn handle-datagram-channel-writable!
[peer datagram-channel]
(let [transport ((deref (:transports-agent peer)) "udp")]
(.add (:selector-continuations-queue peer)
@@ -75,7 +59,7 @@
((:continuation! packet) true)
(catch Exception e ((:continuation! packet) false))))))))
-(defn handle-channel-readable!
+(defn handle-datagram-channel-readable!
[peer datagram-channel]
(let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear))
source-address (.receive datagram-channel byte-buffer)
@@ -90,12 +74,12 @@
(doseq [message (:messages udp)]
(admit-message! peer (:sender-id udp) address message))))))
-(defn handle-channel-selected!
+(defn handle-datagram-channel-selected!
[peer datagram-channel selection-key]
(if (.isReadable selection-key)
- (handle-channel-readable! peer datagram-channel))
+ (handle-datagram-channel-readable! peer datagram-channel))
(if (.isWritable selection-key)
- (handle-channel-writable! peer datagram-channel)))
+ (handle-datagram-channel-writable! peer datagram-channel)))
(defn register-datagram-channel!
[peer port]
@@ -106,7 +90,7 @@
(let [selection-key (.register datagram-channel
(:selector peer)
SelectionKey/OP_READ
- (partial handle-channel-selected!
+ (partial handle-datagram-channel-selected!
peer datagram-channel))]
(send (:transports-agent peer)
(fn [transports]