summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-12-02 21:52:56 -0800
committerDavid Barksdale <amatus.amongus@gmail.com>2010-12-02 21:52:56 -0800
commit32f3904c1ce4aafac27dd9ac162d3a008e76d878 (patch)
tree4d6f522deb452a7e61fb2423b3e17318075e3273
parent6b7d55268badb3f141e31fdddc431bc83175efec (diff)
Much work on the TCP transport.
Successfully negotiates outgoing connections, incoming should work also.
-rw-r--r--src/org/gnu/clojure/gnunet/tcp.clj322
1 files changed, 175 insertions, 147 deletions
diff --git a/src/org/gnu/clojure/gnunet/tcp.clj b/src/org/gnu/clojure/gnunet/tcp.clj
index 1ed2469..672feb9 100644
--- a/src/org/gnu/clojure/gnunet/tcp.clj
+++ b/src/org/gnu/clojure/gnunet/tcp.clj
@@ -10,217 +10,245 @@
(def message-type-tcp-welcome 60)
(defn encode-welcome
- [my-id]
- my-id)
+ [welcome]
+ (:peer-id welcome))
(def parse-welcome
- (domonad parser-m [my-id (items id-size)] my-id))
+ (domonad parser-m
+ [peer-id (items id-size)]
+ {:peer-id peer-id}))
(defn generate-welcome-message
[peer]
(encode-message
{:message-type message-type-tcp-welcome
- :bytes (encode-welcome (:id peer))}))
+ :bytes (encode-welcome {:peer-id (:id peer)})}))
(defn handle-disconnect!
- [peer transport encoded-address send-queue selection-key]
- (.write *out* (str "disconnect " encoded-address "\n"))
+ [peer transport encoded-address selection-key]
+ ;; This is always called from inside the selector thread
+ (.cancel selection-key)
+ (.close (.channel selection-key))
(send-do-exception-m! (:sessions-agent transport)
- [sessions (fetch-state)
- :when (contains? sessions encoded-address)
- :let [_ (.write *out* (str "removing session " encoded-address "\n"))]
- _ (set-state (dissoc sessions encoded-address))]
- ;; Send another message to the agent to make sure no new packets have been
- ;; added to the send-queue in the mean time.
- (send (:sessions-agent transport)
- (fn [sessions]
- (.add (:selector-continuations-queue peer)
- #(do
- (.write *out* (str "canceling packets " send-queue "\n"))
- (doseq [packet (queue-seq! send-queue)]
- ((:continuation! packet) false))
- (.write *out* (str "closing channel " (.channel selection-key) "\n"))
- (.close (.channel selection-key))
- (.write *out* (str "canceling key " selection-key "\n"))
- (.cancel selection-key)))
- (.wakeup (:selector peer))
- sessions))))
-
-(def handle-socket-channel-selected!)
+ [remote-peer-id (with-state-field [:connection encoded-address]
+ (fetch-val :remote-peer-id))
+ send-queue (fetch-val [encoded-address remote-peer-id])
+ _ (update-state #(dissoc %
+ [:connection encoded-address]
+ [encoded-address remote-peer-id]))
+ :when send-queue]
+ (doseq [packet (queue-seq! send-queue)]
+ ((:continuation! packet) false))))
+
+(defn update-selection-key-async!
+ ([peer selection-key ops]
+ (.add (:selector-continuations-queue peer)
+ #(.interestOps selection-key ops))
+ (.wakeup (:selector peer)))
+ ([peer selection-key ops attachment]
+ (.add (:selector-continuations-queue peer)
+ #(do
+ (.interestOps selection-key ops)
+ (.attach selection-key attachment)))
+ (.wakeup (:selector peer))))
(defn admit-tcp-message!
- [peer transport encoded-address send-queue selection-key message]
+ [peer transport encoded-address selection-key message]
(if (== message-type-tcp-welcome (:message-type message))
(send-do-exception-m! (:sessions-agent transport)
[:when-let [welcome (first (parse-welcome (:bytes message)))]
- remote-peer-id (with-state-field encoded-address
- (fetch-val :remote-peer-id))
- :when (= remote-peer-id welcome)
- _ (with-state-field encoded-address
- (set-val :expecting-welcome false))]
- (.add (:selector-continuations-queue peer)
- #(.attach selection-key
- (partial handle-socket-channel-selected! peer transport
- encoded-address send-queue))))
- (send-do-exception-m! (:sessions-agent transport)
- [remote-peer-id (with-state-field encoded-address
- (fetch-val :remote-peer-id))
- :let [address {:transport "tcp"
- :encoded-address encoded-address
- :expiration (idle-connection-timeout)}]]
- (admit-message! peer remote-peer-id address message))))
+ connection (fetch-val [:connection encoded-address])
+ :when-not (nil? connection)
+ _ (set-val [:connection encoded-address]
+ (assoc connection :remote-peer-id (:peer-id welcome)))]
+ (do
+ (when (and (nil? (:remote-peer-id connection))
+ (:incoming connection))
+ (.add (:send-queue connection)
+ {:bytes (generate-welcome-message peer)
+ :continuation! identity}))
+ (update-selection-key-async! peer selection-key
+ (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))))
+ (when-let [remote-peer-id (:remote-peer-id
+ ((deref (:sessions-agent transport))
+ [:connection encoded-address]))]
+ (let [address {:transport "tcp"
+ :encoded-address encoded-address
+ :expiration (idle-connection-timeout)}]
+ (admit-message! peer remote-peer-id address message)))))
+
+(defn handle-socket-channel-connectable!
+ [peer transport encoded-address selection-key]
+ (.finishConnect (.channel selection-key))
+ (.interestOps selection-key
+ (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))
(defn handle-socket-channel-readable!
- [peer transport encoded-address send-queue selection-key]
+ [peer transport encoded-address selection-key]
(let [socket-channel (.channel selection-key)
socket (.socket socket-channel)
buffer-length (.getReceiveBufferSize socket)
byte-buffer (doto (ByteBuffer/allocate buffer-length) (.clear))
bytes-read (.read (.channel selection-key) byte-buffer)]
(if (== -1 bytes-read)
- (handle-disconnect! peer transport encoded-address send-queue
- selection-key)
+ (handle-disconnect! peer transport encoded-address selection-key)
(send-do-exception-m! (:sessions-agent transport)
- [received-bytes (with-state-field encoded-address
- (fetch-val :received-bytes))
- :let [received-bytes (concat received-bytes
+ [connection (fetch-val [:connection encoded-address])
+ :when-not (nil? connection)
+ :let [received-bytes (concat (:received-bytes connection)
(buffer-seq! (.flip byte-buffer)))]
:let [[messages residue] ((one-or-more parse-message)
received-bytes)]
:let [received-bytes (if (nil? residue) received-bytes residue)]
- _ (with-state-field encoded-address
- (set-val :received-bytes (vec received-bytes)))]
+ _ (set-val [:connection encoded-address]
+ (assoc connection :received-bytes (vec received-bytes)))]
(doseq [message messages]
- (admit-tcp-message! peer transport encoded-address send-queue
+ (admit-tcp-message! peer transport encoded-address
selection-key message))))))
(defn handle-socket-channel-writable!
- [peer send-queue selection-key]
+ [peer transport encoded-address selection-key]
+ ;; We are already in the selector thread, but add ourselves to the end of the
+ ;; selector-continuations-queue because we want to make sure we set the
+ ;; interest ops on the selection-key after any other continuations that might
+ ;; be setting OP_WRITE.
(.add (:selector-continuations-queue peer)
- #(let [packet (.poll send-queue)]
- (if (nil? packet)
- (.interestOps selection-key SelectionKey/OP_READ)
- (try
- (.write (.channel selection-key)
- (ByteBuffer/wrap (byte-array (:bytes packet))))
- ((:continuation! packet) true)
- (catch Exception e ((:continuation! packet) false)))))))
+ #(let [sessions (deref (:sessions-agent transport))]
+ (when-let [connection (sessions [:connection encoded-address])]
+ (let [packet (.poll (:send-queue connection))
+ packet (if (nil? packet)
+ (when-let [send-queue (sessions
+ [encoded-address
+ (:remote-peer-id connection)])]
+ (.poll send-queue))
+ packet)]
+ (if (nil? packet)
+ (.interestOps selection-key SelectionKey/OP_READ)
+ (try
+ (.write (.channel selection-key)
+ (ByteBuffer/wrap (byte-array (:bytes packet))))
+ ((:continuation! packet) true)
+ (catch Exception e
+ ((:continuation! packet) false)
+ (handle-disconnect! peer transport encoded-address
+ selection-key)))))))))
(defn handle-socket-channel-selected!
- [peer transport encoded-address send-queue selection-key]
- (try
- (if (.isReadable selection-key)
- (handle-socket-channel-readable! peer transport encoded-address send-queue
- selection-key))
- (if (.isWritable selection-key)
- (handle-socket-channel-writable! peer send-queue selection-key))
- (catch Exception e
- (handle-disconnect! peer transport encoded-address send-queue
- selection-key))))
-
-(defn handle-connecting-channel-connectable!
- [peer transport encoded-address send-queue selection-key]
- (.finishConnect (.channel selection-key))
- (.add (:selector-continuations-queue peer)
- #(.interestOps selection-key
- (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))))
-
-(defn handle-connecting-channel-writable!
- [peer selection-key]
- (.write (.channel selection-key)
- (ByteBuffer/wrap (byte-array (generate-welcome-message peer))))
- (.interestOps selection-key SelectionKey/OP_READ))
-
-(defn handle-connecting-channel-selected!
- [peer transport encoded-address send-queue selection-key]
+ [peer transport encoded-address selection-key]
(try
(if (.isConnectable selection-key)
- (handle-connecting-channel-connectable! peer transport encoded-address
- send-queue selection-key))
+ (handle-socket-channel-connectable! peer transport encoded-address
+ selection-key))
(if (.isReadable selection-key)
(handle-socket-channel-readable! peer transport encoded-address
- send-queue selection-key))
+ selection-key))
(if (.isWritable selection-key)
- (handle-connecting-channel-writable! peer selection-key))
+ (handle-socket-channel-writable! peer transport encoded-address
+ selection-key))
(catch Exception e
- (handle-disconnect! peer transport encoded-address send-queue
- selection-key))))
+ (handle-disconnect! peer transport encoded-address selection-key))))
+
+(defn set-connection-writable!
+ [peer remote-peer connection]
+ (when (= (:remote-peer-id connection) (:id remote-peer))
+ (update-selection-key-async! peer (:selection-key connection)
+ (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))))
-(defn new-session-from-address!
+(defn set-connection-writable-or-connect!
[peer transport remote-peer encoded-address]
- (when-let [address (first (parse-address encoded-address))]
- (let [socket-channel (SocketChannel/open)
- send-queue (ConcurrentLinkedQueue.)]
- (.configureBlocking socket-channel false)
- (.connect socket-channel address)
+ (if-let [connection ((deref (:sessions-agent transport))
+ [:connection encoded-address])]
+ (set-connection-writable! peer remote-peer connection)
+ (when-let [address (first (parse-address encoded-address))]
(.add (:selector-continuations-queue peer)
(fn []
- (let [selection-key (.register socket-channel
- (:selector peer)
- SelectionKey/OP_CONNECT
- (partial handle-socket-channel-selected!
- peer transport encoded-address send-queue))]
+ (let [socket-channel (doto (SocketChannel/open)
+ (.configureBlocking false))
+ selection-key (.register socket-channel (:selector peer) 0)
+ send-queue (ConcurrentLinkedQueue.)]
(send-do-exception-m! (:sessions-agent transport)
- [_ (with-state-field encoded-address
- (set-val :selection-key selection-key))]
- nil))))
- (.wakeup (:selector peer))
- {:socket-channel socket-channel
- :remote-peer-id (:id remote-peer)
- :send-queue send-queue
- :received-bytes []
- :expecting-welcome true})))
+ [connection (fetch-val [:connection encoded-address])
+ :when (if (nil? connection)
+ true
+ (do
+ (.cancel selection-key)
+ (set-connection-writable! peer remote-peer connection)
+ false))
+ _ (set-val [:connection encoded-address]
+ {:socket-channel socket-channel
+ :selection-key selection-key
+ :send-queue send-queue
+ :received-bytes []})]
+ (do
+ (.connect socket-channel address)
+ (.add send-queue
+ {:bytes (generate-welcome-message peer)
+ :continuation! identity})
+ (update-selection-key-async! peer selection-key
+ SelectionKey/OP_CONNECT
+ (partial handle-socket-channel-selected! peer transport
+ encoded-address)))))))
+ (.wakeup (:selector peer)))))
(defn emit-messages-tcp!
[peer transport remote-peer encoded-address continuation! messages]
- (doseq [message messages]
- (.write *out* (str "Send " message "\n")))
(send-do-exception-m! (:sessions-agent transport)
[:let [continuation! #(do (emit-continuation! peer transport remote-peer
encoded-address %)
(when continuation! (continuation! %)))]
- session (fetch-val encoded-address)
- :let [session (if (nil? session)
- (new-session-from-address! peer transport remote-peer
- encoded-address)
- session)]
- :when (if (nil? session)
- (do (continuation! false)
- false)
- true)
- _ (set-val encoded-address session)]
+ send-queue (fetch-val [encoded-address (:id remote-peer)]
+ (ConcurrentLinkedQueue.))
+ _ (set-val [encoded-address (:id remote-peer)] send-queue)]
(do
- (.add (:send-queue session)
+ ;; TODO: clean up messages for sessions that are never established.
+ (.add send-queue
{:bytes (mapcat encode-message messages)
:continuation! continuation!})
- (when-not (:expecting-welcome session)
- (.add (:selector-continuations-queue peer)
- #(.interestOps (:selection-key session)
- (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))
- (.wakeup (:selector peer))))))
+ (set-connection-writable-or-connect! peer transport remote-peer
+ encoded-address))))
(defn handle-channel-acceptable!
- [peer server-channel]
- )
+ [peer transport server-selection-key]
+ (try
+ (let [socket-channel (doto (.accept (.channel server-selection-key))
+ (.configureBlocking false))
+ address (.getRemoteSocketAddress (.socket socket-channel))
+ encoded-address (encode-address address)
+ selection-key (.register socket-channel (:selector peer) 0)]
+ (send-do-exception-m! (:sessions-agent transport)
+ [connection (fetch-val [:connection encoded-address])
+ :when (if (nil? connection)
+ true
+ (do
+ (.cancel selection-key)
+ (.close (.channel selection-key))
+ false))
+ _ (set-val [:connection encoded-address]
+ {:socket-channel socket-channel
+ :selection-key selection-key
+ :send-queue (ConcurrentLinkedQueue.)
+ :received-bytes []
+ :incoming true})]
+ (update-selection-key-async! peer selection-key SelectionKey/OP_READ
+ (partial handle-socket-channel-selected! peer transport
+ encoded-address))))
+ (catch Exception e
+ ;; If accept throws an exception, ignore it
+ (do))))
(defn register-server-channel!
[peer port]
(let [server-channel (ServerSocketChannel/open)
- socket (.socket server-channel)]
+ socket (.socket server-channel)
+ transport {:name "tcp"
+ :emit-messages! (partial emit-messages-tcp! peer)
+ :sessions-agent (agent {})}]
(.configureBlocking server-channel false)
(.bind socket (InetSocketAddress. port))
- (let [selection-key (.register server-channel
- (:selector peer)
- SelectionKey/OP_ACCEPT
- (partial handle-channel-acceptable!
- peer server-channel))]
- (send (:transports-agent peer)
- (fn [transports]
- (assoc transports "tcp"
- {:name "tcp"
- :emit-messages! (partial emit-messages-tcp! peer)
- :selection-key selection-key
- :sessions-agent (agent {})}))))))
+ (.register server-channel (:selector peer) SelectionKey/OP_ACCEPT
+ (partial handle-channel-acceptable! peer transport))
+ (send (:transports-agent peer)
+ #(assoc % "tcp" transport))))
(defn activate-tcp!
[peer port]