diff options
-rw-r--r-- | NOTES | 4 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/inet.clj | 20 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/message.clj | 6 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/tcp.clj | 130 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 30 |
5 files changed, 134 insertions, 56 deletions
@@ -3,7 +3,7 @@ (def keypair (generate-rsa-keypair! random)) (def my-peer (new-peer {:keypair keypair :random random})) (.start (:selector-thread my-peer)) -(activate-udp! my-peer 5678) -(configure-udp-addresses! my-peer (get-local-addresses) 5678) +(activate-tcp! my-peer 5678) +(configure-inet-addresses! my-peer "tcp" (get-local-addresses) 5678) (activate-filesharing! my-peer) (download-hostlist! (partial admit-hello! my-peer) "http://192.168.8.2:58080") diff --git a/src/org/gnu/clojure/gnunet/inet.clj b/src/org/gnu/clojure/gnunet/inet.clj index ff8fa02..376f73d 100644 --- a/src/org/gnu/clojure/gnunet/inet.clj +++ b/src/org/gnu/clojure/gnunet/inet.clj @@ -1,7 +1,8 @@ (ns org.gnu.clojure.gnunet.inet - (:use (org.gnu.clojure.gnunet parser message) + (:use (org.gnu.clojure.gnunet message parser transport) clojure.contrib.monads) - (:import (java.net InetAddress InetSocketAddress NetworkInterface))) + (:import (java.net InetAddress InetSocketAddress NetworkInterface) + java.util.Date)) (defn encode-address [inet-socket-address] @@ -29,3 +30,18 @@ (for [interface (enumeration-seq (NetworkInterface/getNetworkInterfaces)) address (enumeration-seq (.getInetAddresses interface))] address)) + +(defn configure-inet-addresses! + "Adds new addresses for the transport to peer's transports-agent and + removes expired addresses." + [peer transport reachable-addresses port] + (send (:transport-addresses-agent peer) + (fn [addresses] + (merge-transport-addresses {} + (expire-transport-addresses (Date.) + (concat (list-transport-addresses addresses) + (for [address reachable-addresses] + {:transport transport + :encoded-address (encode-address + (InetSocketAddress. address port)) + :expiration (hello-address-expiration)}))))))) diff --git a/src/org/gnu/clojure/gnunet/message.clj b/src/org/gnu/clojure/gnunet/message.clj index ccf15d7..124070f 100644 --- a/src/org/gnu/clojure/gnunet/message.clj +++ b/src/org/gnu/clojure/gnunet/message.clj @@ -110,9 +110,7 @@ [parser-map] (domonad parser-m [message parse-message - :let [parser (parser-map (:message-type message))] - :when parser - :let [parsed (first (parser (:bytes message)))] - :when parsed] + :when-let [parser (parser-map (:message-type message))] + :when-let [parsed (first (parser (:bytes message)))]] {:message-type (:message-type message) :message parsed})) diff --git a/src/org/gnu/clojure/gnunet/tcp.clj b/src/org/gnu/clojure/gnunet/tcp.clj index 91b0d7a..1ed2469 100644 --- a/src/org/gnu/clojure/gnunet/tcp.clj +++ b/src/org/gnu/clojure/gnunet/tcp.clj @@ -16,11 +16,19 @@ (def parse-welcome (domonad parser-m [my-id (items id-size)] my-id)) +(defn generate-welcome-message + [peer] + (encode-message + {:message-type message-type-tcp-welcome + :bytes (encode-welcome (:id peer))})) + (defn handle-disconnect! [peer transport encoded-address send-queue selection-key] + (.write *out* (str "disconnect " encoded-address "\n")) (send-do-exception-m! (:sessions-agent transport) [sessions (fetch-state) - :when (contains? encoded-address) + :when (contains? sessions encoded-address) + :let [_ (.write *out* (str "removing session " encoded-address "\n"))] _ (set-state (dissoc sessions encoded-address))] ;; Send another message to the agent to make sure no new packets have been ;; added to the send-queue in the mean time. @@ -28,45 +36,117 @@ (fn [sessions] (.add (:selector-continuations-queue peer) #(do + (.write *out* (str "canceling packets " send-queue "\n")) (doseq [packet (queue-seq! send-queue)] ((:continuation! packet) false)) + (.write *out* (str "closing channel " (.channel selection-key) "\n")) + (.close (.channel selection-key)) + (.write *out* (str "canceling key " selection-key "\n")) (.cancel selection-key))) (.wakeup (:selector peer)) sessions)))) -(defn handle-channel-connectable! +(def handle-socket-channel-selected!) + +(defn admit-tcp-message! + [peer transport encoded-address send-queue selection-key message] + (if (== message-type-tcp-welcome (:message-type message)) + (send-do-exception-m! (:sessions-agent transport) + [:when-let [welcome (first (parse-welcome (:bytes message)))] + remote-peer-id (with-state-field encoded-address + (fetch-val :remote-peer-id)) + :when (= remote-peer-id welcome) + _ (with-state-field encoded-address + (set-val :expecting-welcome false))] + (.add (:selector-continuations-queue peer) + #(.attach selection-key + (partial handle-socket-channel-selected! peer transport + encoded-address send-queue)))) + (send-do-exception-m! (:sessions-agent transport) + [remote-peer-id (with-state-field encoded-address + (fetch-val :remote-peer-id)) + :let [address {:transport "tcp" + :encoded-address encoded-address + :expiration (idle-connection-timeout)}]] + (admit-message! peer remote-peer-id address message)))) + +(defn handle-socket-channel-readable! [peer transport encoded-address send-queue selection-key] - (try - (.finishConnect (.channel selection-key)) - ;; TODO: update interestOps and send welcome - (catch Exception e + (let [socket-channel (.channel selection-key) + socket (.socket socket-channel) + buffer-length (.getReceiveBufferSize socket) + byte-buffer (doto (ByteBuffer/allocate buffer-length) (.clear)) + bytes-read (.read (.channel selection-key) byte-buffer)] + (if (== -1 bytes-read) (handle-disconnect! peer transport encoded-address send-queue - selection-key)))) + selection-key) + (send-do-exception-m! (:sessions-agent transport) + [received-bytes (with-state-field encoded-address + (fetch-val :received-bytes)) + :let [received-bytes (concat received-bytes + (buffer-seq! (.flip byte-buffer)))] + :let [[messages residue] ((one-or-more parse-message) + received-bytes)] + :let [received-bytes (if (nil? residue) received-bytes residue)] + _ (with-state-field encoded-address + (set-val :received-bytes (vec received-bytes)))] + (doseq [message messages] + (admit-tcp-message! peer transport encoded-address send-queue + selection-key message)))))) -(defn handle-channel-writable! +(defn handle-socket-channel-writable! [peer send-queue selection-key] (.add (:selector-continuations-queue peer) #(let [packet (.poll send-queue)] (if (nil? packet) (.interestOps selection-key SelectionKey/OP_READ) (try - (let [byte-buffer (ByteBuffer/wrap (byte-array (:bytes packet)))] - (.write (.channel selection-key) byte-buffer)) + (.write (.channel selection-key) + (ByteBuffer/wrap (byte-array (:bytes packet)))) ((:continuation! packet) true) (catch Exception e ((:continuation! packet) false))))))) -(defn handle-channel-selected! +(defn handle-socket-channel-selected! [peer transport encoded-address send-queue selection-key] - (if (.isConnectable selection-key) - (handle-channel-connectable! peer transport encoded-address send-queue - selection-key)) - (if (.isReadable selection-key) - nil) - (if (.isWritable selection-key) - (handle-channel-writable! peer send-queue selection-key))) + (try + (if (.isReadable selection-key) + (handle-socket-channel-readable! peer transport encoded-address send-queue + selection-key)) + (if (.isWritable selection-key) + (handle-socket-channel-writable! peer send-queue selection-key)) + (catch Exception e + (handle-disconnect! peer transport encoded-address send-queue + selection-key)))) +(defn handle-connecting-channel-connectable! + [peer transport encoded-address send-queue selection-key] + (.finishConnect (.channel selection-key)) + (.add (:selector-continuations-queue peer) + #(.interestOps selection-key + (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))) + +(defn handle-connecting-channel-writable! + [peer selection-key] + (.write (.channel selection-key) + (ByteBuffer/wrap (byte-array (generate-welcome-message peer)))) + (.interestOps selection-key SelectionKey/OP_READ)) + +(defn handle-connecting-channel-selected! + [peer transport encoded-address send-queue selection-key] + (try + (if (.isConnectable selection-key) + (handle-connecting-channel-connectable! peer transport encoded-address + send-queue selection-key)) + (if (.isReadable selection-key) + (handle-socket-channel-readable! peer transport encoded-address + send-queue selection-key)) + (if (.isWritable selection-key) + (handle-connecting-channel-writable! peer selection-key)) + (catch Exception e + (handle-disconnect! peer transport encoded-address send-queue + selection-key)))) -(defn new-session-from-address +(defn new-session-from-address! [peer transport remote-peer encoded-address] (when-let [address (first (parse-address encoded-address))] (let [socket-channel (SocketChannel/open) @@ -78,16 +158,17 @@ (let [selection-key (.register socket-channel (:selector peer) SelectionKey/OP_CONNECT - (partial handle-channel-selected! + (partial handle-socket-channel-selected! peer transport encoded-address send-queue))] - (send-do-exception-m! (:session-agent transport) - [_ (update-val encoded-address - #(assoc % :selection-key selection-key))] + (send-do-exception-m! (:sessions-agent transport) + [_ (with-state-field encoded-address + (set-val :selection-key selection-key))] nil)))) (.wakeup (:selector peer)) {:socket-channel socket-channel :remote-peer-id (:id remote-peer) :send-queue send-queue + :received-bytes [] :expecting-welcome true}))) (defn emit-messages-tcp! @@ -100,7 +181,7 @@ (when continuation! (continuation! %)))] session (fetch-val encoded-address) :let [session (if (nil? session) - (new-session-from-address peer transport remote-peer + (new-session-from-address! peer transport remote-peer encoded-address) session)] :when (if (nil? session) @@ -122,7 +203,6 @@ [peer server-channel] ) - (defn register-server-channel! [peer port] (let [server-channel (ServerSocketChannel/open) diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index f06db1c..4c40d74 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -1,8 +1,7 @@ (ns org.gnu.clojure.gnunet.udp (:use (org.gnu.clojure.gnunet inet parser message peer transport util) clojure.contrib.monads) - (:import (java.util Date Calendar) - java.net.InetSocketAddress + (:import java.net.InetSocketAddress (java.nio.channels DatagramChannel SelectionKey) java.nio.ByteBuffer java.util.concurrent.ConcurrentLinkedQueue)) @@ -28,21 +27,6 @@ :bytes (encode-udp {:sender-id (:id peer) :messages messages})})) -(defn configure-udp-addresses! - "Adds new addresses for the udp transport to peer's transports-agent and - removes expired addresses." - [peer reachable-addresses port] - (send (:transport-addresses-agent peer) - (fn [addresses] - (merge-transport-addresses {} - (expire-transport-addresses (Date.) - (concat (list-transport-addresses addresses) - (for [address reachable-addresses] - {:transport "udp" - :encoded-address (encode-address (InetSocketAddress. address - port)) - :expiration (hello-address-expiration)}))))))) - (defn emit-messages-udp! [peer transport remote-peer encoded-address continuation! messages] ;;(doseq [message messages] @@ -62,7 +46,7 @@ (.wakeup (:selector peer))) (when continuation! (continuation! false)))) -(defn handle-channel-writable! +(defn handle-datagram-channel-writable! [peer datagram-channel] (let [transport ((deref (:transports-agent peer)) "udp")] (.add (:selector-continuations-queue peer) @@ -75,7 +59,7 @@ ((:continuation! packet) true) (catch Exception e ((:continuation! packet) false)))))))) -(defn handle-channel-readable! +(defn handle-datagram-channel-readable! [peer datagram-channel] (let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear)) source-address (.receive datagram-channel byte-buffer) @@ -90,12 +74,12 @@ (doseq [message (:messages udp)] (admit-message! peer (:sender-id udp) address message)))))) -(defn handle-channel-selected! +(defn handle-datagram-channel-selected! [peer datagram-channel selection-key] (if (.isReadable selection-key) - (handle-channel-readable! peer datagram-channel)) + (handle-datagram-channel-readable! peer datagram-channel)) (if (.isWritable selection-key) - (handle-channel-writable! peer datagram-channel))) + (handle-datagram-channel-writable! peer datagram-channel))) (defn register-datagram-channel! [peer port] @@ -106,7 +90,7 @@ (let [selection-key (.register datagram-channel (:selector peer) SelectionKey/OP_READ - (partial handle-channel-selected! + (partial handle-datagram-channel-selected! peer datagram-channel))] (send (:transports-agent peer) (fn [transports] |