diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-21 23:39:44 -0800 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-21 23:39:44 -0800 |
commit | e94bc6c4b07c0cba4780d0a75a1083a187bcf38e (patch) | |
tree | d6ca86b24198cbb8a6314d0fb751793a9b7d731d /src | |
parent | 10ffdbf1363507c250c1148340063a8e0741a8ac (diff) |
Did some work on the TCP transport.
Diffstat (limited to 'src')
-rw-r--r-- | src/org/gnu/clojure/gnunet/tcp.clj | 140 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 9 |
2 files changed, 143 insertions, 6 deletions
diff --git a/src/org/gnu/clojure/gnunet/tcp.clj b/src/org/gnu/clojure/gnunet/tcp.clj index 832072f..91b0d7a 100644 --- a/src/org/gnu/clojure/gnunet/tcp.clj +++ b/src/org/gnu/clojure/gnunet/tcp.clj @@ -1,5 +1,11 @@ (ns org.gnu.clojure.gnunet.tcp - (:use (org.gnu.clojure.gnunet peer parser) clojure.contrib.monads)) + (:use (org.gnu.clojure.gnunet exception inet message parser peer transport + util) + clojure.contrib.monads) + (:import java.net.InetSocketAddress + (java.nio.channels SelectionKey ServerSocketChannel SocketChannel) + java.nio.ByteBuffer + java.util.concurrent.ConcurrentLinkedQueue)) (def message-type-tcp-welcome 60) @@ -9,3 +15,135 @@ (def parse-welcome (domonad parser-m [my-id (items id-size)] my-id)) + +(defn handle-disconnect! + [peer transport encoded-address send-queue selection-key] + (send-do-exception-m! (:sessions-agent transport) + [sessions (fetch-state) + :when (contains? encoded-address) + _ (set-state (dissoc sessions encoded-address))] + ;; Send another message to the agent to make sure no new packets have been + ;; added to the send-queue in the mean time. + (send (:sessions-agent transport) + (fn [sessions] + (.add (:selector-continuations-queue peer) + #(do + (doseq [packet (queue-seq! send-queue)] + ((:continuation! packet) false)) + (.cancel selection-key))) + (.wakeup (:selector peer)) + sessions)))) + +(defn handle-channel-connectable! + [peer transport encoded-address send-queue selection-key] + (try + (.finishConnect (.channel selection-key)) + ;; TODO: update interestOps and send welcome + (catch Exception e + (handle-disconnect! peer transport encoded-address send-queue + selection-key)))) + +(defn handle-channel-writable! + [peer send-queue selection-key] + (.add (:selector-continuations-queue peer) + #(let [packet (.poll send-queue)] + (if (nil? packet) + (.interestOps selection-key SelectionKey/OP_READ) + (try + (let [byte-buffer (ByteBuffer/wrap (byte-array (:bytes packet)))] + (.write (.channel selection-key) byte-buffer)) + ((:continuation! packet) true) + (catch Exception e ((:continuation! packet) false))))))) + +(defn handle-channel-selected! + [peer transport encoded-address send-queue selection-key] + (if (.isConnectable selection-key) + (handle-channel-connectable! peer transport encoded-address send-queue + selection-key)) + (if (.isReadable selection-key) + nil) + (if (.isWritable selection-key) + (handle-channel-writable! peer send-queue selection-key))) + + +(defn new-session-from-address + [peer transport remote-peer encoded-address] + (when-let [address (first (parse-address encoded-address))] + (let [socket-channel (SocketChannel/open) + send-queue (ConcurrentLinkedQueue.)] + (.configureBlocking socket-channel false) + (.connect socket-channel address) + (.add (:selector-continuations-queue peer) + (fn [] + (let [selection-key (.register socket-channel + (:selector peer) + SelectionKey/OP_CONNECT + (partial handle-channel-selected! + peer transport encoded-address send-queue))] + (send-do-exception-m! (:session-agent transport) + [_ (update-val encoded-address + #(assoc % :selection-key selection-key))] + nil)))) + (.wakeup (:selector peer)) + {:socket-channel socket-channel + :remote-peer-id (:id remote-peer) + :send-queue send-queue + :expecting-welcome true}))) + +(defn emit-messages-tcp! + [peer transport remote-peer encoded-address continuation! messages] + (doseq [message messages] + (.write *out* (str "Send " message "\n"))) + (send-do-exception-m! (:sessions-agent transport) + [:let [continuation! #(do (emit-continuation! peer transport remote-peer + encoded-address %) + (when continuation! (continuation! %)))] + session (fetch-val encoded-address) + :let [session (if (nil? session) + (new-session-from-address peer transport remote-peer + encoded-address) + session)] + :when (if (nil? session) + (do (continuation! false) + false) + true) + _ (set-val encoded-address session)] + (do + (.add (:send-queue session) + {:bytes (mapcat encode-message messages) + :continuation! continuation!}) + (when-not (:expecting-welcome session) + (.add (:selector-continuations-queue peer) + #(.interestOps (:selection-key session) + (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))) + (.wakeup (:selector peer)))))) + +(defn handle-channel-acceptable! + [peer server-channel] + ) + + +(defn register-server-channel! + [peer port] + (let [server-channel (ServerSocketChannel/open) + socket (.socket server-channel)] + (.configureBlocking server-channel false) + (.bind socket (InetSocketAddress. port)) + (let [selection-key (.register server-channel + (:selector peer) + SelectionKey/OP_ACCEPT + (partial handle-channel-acceptable! + peer server-channel))] + (send (:transports-agent peer) + (fn [transports] + (assoc transports "tcp" + {:name "tcp" + :emit-messages! (partial emit-messages-tcp! peer) + :selection-key selection-key + :sessions-agent (agent {})})))))) + +(defn activate-tcp! + [peer port] + (.add (:selector-continuations-queue peer) + (partial register-server-channel! peer port)) + (.wakeup (:selector peer))) diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index 81500ca..f06db1c 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -2,7 +2,7 @@ (:use (org.gnu.clojure.gnunet inet parser message peer transport util) clojure.contrib.monads) (:import (java.util Date Calendar) - (java.net InetSocketAddress DatagramPacket) + java.net.InetSocketAddress (java.nio.channels DatagramChannel SelectionKey) java.nio.ByteBuffer java.util.concurrent.ConcurrentLinkedQueue)) @@ -67,13 +67,13 @@ (let [transport ((deref (:transports-agent peer)) "udp")] (.add (:selector-continuations-queue peer) #(let [packet (.poll (:send-queue transport))] - (if (not (nil? packet)) + (if (nil? packet) + (.interestOps (:selection-key transport) SelectionKey/OP_READ) (try (let [byte-buffer (ByteBuffer/wrap (byte-array (:bytes packet)))] (.send datagram-channel byte-buffer (:address packet))) ((:continuation! packet) true) - (catch Exception e ((:continuation! packet) false))) - (.interestOps (:selection-key transport) SelectionKey/OP_READ)))))) + (catch Exception e ((:continuation! packet) false)))))))) (defn handle-channel-readable! [peer datagram-channel] @@ -113,7 +113,6 @@ (assoc transports "udp" {:name "udp" :emit-messages! (partial emit-messages-udp! peer) - :socket socket :selection-key selection-key :send-queue (ConcurrentLinkedQueue.)})))))) |