diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2011-08-13 00:24:43 -0500 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2011-08-13 00:24:43 -0500 |
commit | c35250c7ab793e569c1ac293035bca8c929fabe6 (patch) | |
tree | d20c37e0331bbe96dd57a9401ffc402d83daf860 | |
parent | 0465f3c418d0682c68d4cdcc3c8bde0f84584513 (diff) |
Re-wrote TCP transport.
I'm pretty sure my use of the word "continuation" is inaccurate
so I've renamed some things "callback".
-rw-r--r-- | src/main/clojure/org/gnu/clojure/gnunet/peer.clj | 22 | ||||
-rw-r--r-- | src/main/clojure/org/gnu/clojure/gnunet/tcp.clj | 407 | ||||
-rw-r--r-- | src/main/clojure/org/gnu/clojure/gnunet/transport.clj | 2 | ||||
-rw-r--r-- | src/main/clojure/org/gnu/clojure/gnunet/util.clj | 2 |
4 files changed, 211 insertions, 222 deletions
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj index adb7a2b..446aa9a 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj @@ -65,11 +65,11 @@ ;; Thread which selects on :selector :selector-thread - ;; java.util.concurrent.ConcurrentLinkedQueue of continuations. + ;; java.util.concurrent.ConcurrentLinkedQueue of callbacks. ;; In order to access the selector while the selector-thread is running add - ;; a continuation to this queue and call .wakeup on the selector. + ;; a callback to this queue and call .wakeup on the selector. ;; The size of this queue is an easy measure our network load. - :selector-continuations-queue + :selector-callbacks-queue ;; java.security.SecureRandom :random @@ -117,19 +117,19 @@ (== (:priority (meta this)) (:priority (meta obj)))))) (defn selector-loop! - [selector continuations] - (doseq [continuation (queue-seq! continuations)] - (continuation)) + [selector callbacks] + (doseq [callback (queue-seq! callbacks)] + (callback)) (.select selector) (let [selected-keys (.selectedKeys selector)] (doseq [selection-key selected-keys] ((.attachment selection-key) selection-key)) (.clear selected-keys)) - (recur selector continuations)) + (recur selector callbacks)) (defn new-peer [options] (let [selector (Selector/open) - continuations (ConcurrentLinkedQueue.) + callbacks (ConcurrentLinkedQueue.) cpu-bound-queue (LinkedBlockingQueue.) cpu-bound-executor (ThreadPoolExecutor. 0 (available-processors) 60 TimeUnit/SECONDS cpu-bound-queue) @@ -146,8 +146,8 @@ :transports-agent (agent {}) :dispatch-agent (agent {}) :selector selector - :selector-thread (Thread. (partial selector-loop! selector continuations)) - :selector-continuations-queue continuations + :selector-thread (Thread. (partial selector-loop! selector callbacks)) + :selector-callbacks-queue callbacks :random (:random options) :cpu-bound-executor cpu-bound-executor :cpu-bound-queue cpu-bound-queue @@ -159,7 +159,7 @@ (defn network-load [peer] ;; TODO: figure out if we really need size, it's an O(n) operation - (.size (:selector-continuations-queue peer))) + (.size (:selector-callbacks-queue peer))) (defn cpu-load [peer] diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj index 1246c6e..aa0b76c 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj @@ -10,277 +10,264 @@ (def message-type-tcp-welcome 60) (defn encode-welcome - [welcome] - (:peer-id welcome)) + [peer] + (:id peer)) (def parse-welcome (domonad parser-m - [peer-id (items id-size)] - {:peer-id peer-id})) + [id (items id-size)] + {:id id})) (defn generate-welcome-message [peer] (encode-message {:message-type message-type-tcp-welcome - :bytes (encode-welcome {:peer-id (:id peer)})})) + :bytes (encode-welcome peer)})) (defn handle-disconnect! - [peer transport encoded-address selection-key] + [peer transport connection] ;; This is always called from inside the selector thread - (assert-args handle-disconnect! - (vector? encoded-address) "encoded-address as vector") - (.cancel selection-key) - (.close (.channel selection-key)) - (send-do-exception-m! (:sessions-agent transport) - [remote-peer-id (with-state-field [:connection encoded-address] - (fetch-val :remote-peer-id)) - send-queue (fetch-val [encoded-address remote-peer-id]) - _ (update-state #(dissoc % - [:connection encoded-address] - [encoded-address remote-peer-id])) - :when send-queue] - (doseq [packet (queue-seq! send-queue)] - ((:continuation! packet) false)))) + (.cancel (:selection-key connection)) + (.close (:socket-channel connection)) + (send-do-exception-m! + (:connections-agent transport) + [pending-connections (fetch-val nil (hash-set)) + _ (set-val nil (disj pending-connections connection)) + :when-let [remote-peer-id (deref (:remote-peer-id-atom connection))] + remote-peer-connections (fetch-val remote-peer-id) + _ (set-val remote-peer-id (disj remote-peer-connections connection))])) + +(defn update-selection-key! + [selection-key ops & attachment] + ;; This is always called from inside the selector thread + (try + (.interestOps selection-key ops) + (when attachment (.attach selection-key (first attachment))) + (catch Exception e))) (defn update-selection-key-async! - ([peer selection-key ops] - (.add (:selector-continuations-queue peer) - #(try - (.interestOps selection-key ops) - (catch Exception e))) - (.wakeup (:selector peer))) - ([peer selection-key ops attachment] - (.add (:selector-continuations-queue peer) - #(try - (.interestOps selection-key ops) - (.attach selection-key attachment) - (catch Exception e))) - (.wakeup (:selector peer)))) + [peer selection-key ops & attachment] + (.add (:selector-callbacks-queue peer) + #(apply update-selection-key! selection-key ops attachment)) + (.wakeup (:selector peer))) (defn admit-tcp-message! - [peer transport encoded-address selection-key message] - (assert-args admit-tcp-message! - (vector? encoded-address) "encoded-address as vector") + [peer transport connection message] + ;; This is always called from inside the selector thread (if (== message-type-tcp-welcome (:message-type message)) - (send-do-exception-m! (:sessions-agent transport) + (domonad + exception-m [:when-let [welcome (first (parse-welcome (:bytes message)))] - connection (fetch-val [:connection encoded-address]) - :when-not (nil? connection) - ;; XXX: This is weird because for an outgoing connection we don't check - ;; that the peer-id matches who we thought we connected to. - _ (set-val [:connection encoded-address] - (conj connection {:remote-peer-id (:peer-id welcome) - :received-welcome true}))] - (do - (when (nil? (:remote-peer-id connection)) - (.add (:send-queue connection) - {:bytes (generate-welcome-message peer) - :continuation! identity})) - ;; We have to send again to make sure our update to the sessions-agent - ;; is finished before updating the selection-key - (send (:sessions-agent transport) - (fn [sessions] - (update-selection-key-async! peer selection-key - (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)) - sessions)))) - (send-do-exception-m! (:sessions-agent transport) - [connection (fetch-val [:connection encoded-address]) - :when-let [remote-peer-id (:remote-peer-id connection)] - :when (:received-welcome connection) - :let [address {:transport "tcp" - :encoded-address encoded-address - :expiration (idle-connection-timeout)}]] - (admit-message! peer remote-peer-id address message)))) + :let [remote-peer-id (:peer-id welcome)] + :when-not (== remote-peer-id (:id peer)) + :when (nil? (deref (:remote-peer-id-atom connection)))] + (let [remote-peer-id (:peer-id welcome)] + (swap! (:remote-peer-id-atom connection) + (fn [_] remote-peer-id)) + (.add (:send-queue connection) + {:bytes (generate-welcome-message peer) + :callback! skip}) + (update-selection-key! + (:selection-key connection) + (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)) + (send-do-exception-m + (:connections-agent transport) + [pending-connections (fetch-val nil) + _ (set-val nil (disj pending-connections connection)) + remote-peer-connections (fetch-val remote-peer-id (hash-set)) + _ (set-val remote-peer-id (conj remote-peer-connections + connection))]))) + (when-let [remote-peer-id (deref (:remote-peer-id-atom connection))] + (let [address {:transport "tcp" + :encoded-address encoded-address + :expiration (idle-connection-timeout)}] + (admit-message! peer remote-peer-id address message))))) (defn handle-socket-channel-connectable! - [peer transport encoded-address selection-key] - (.finishConnect (.channel selection-key)) - (.interestOps selection-key + [peer transport connection] + ;; This is always called from inside the selector thread + (.finishConnect (:socket-channel connection)) + (.interestOps (:selection-key connection) (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))) (defn handle-socket-channel-readable! - [peer transport encoded-address selection-key] - (assert-args handle-socket-channel-readable! - (vector? encoded-address) "encoded-address as vector") - (let [socket-channel (.channel selection-key) + [peer transport connection] + ;; This is always called from inside the selector thread + (let [socket-channel (:socket-channel connection) socket (.socket socket-channel) buffer-length (.getReceiveBufferSize socket) byte-buffer (doto (ByteBuffer/allocate buffer-length) (.clear)) - bytes-read (.read (.channel selection-key) byte-buffer)] + bytes-read (.read socket-channel byte-buffer)] (if (== -1 bytes-read) - (handle-disconnect! peer transport encoded-address selection-key) - (send-do-exception-m! (:sessions-agent transport) - [connection (fetch-val [:connection encoded-address]) - :when-not (nil? connection) - :let [received-bytes (concat (:received-bytes connection) - (buffer-seq! (.flip byte-buffer)))] - :let [[messages residue] ((one-or-more parse-message) - received-bytes)] - :let [received-bytes (if (nil? residue) received-bytes residue)] - _ (set-val [:connection encoded-address] - (assoc connection :received-bytes (vec received-bytes)))] + (handle-disconnect! peer transport connection) + ;; NB: parse-message will only fail on an incomplete message and + ;; there is a maximum message size. + ;; Therefore received-bytes is bounded by the maximum message size. + ;; If, for example, there were some checksum that it verified then + ;; received-bytes could grow without bound having an invalid message + ;; stuck in the buffer. + (let [received-bytes (concat (deref (:received-bytes-atom connection)) + (buffer-seq! (.flip byte-buffer))) + [messages residue] ((one-or-more parse-message) received-bytes) + received-bytes (if (nil? residue) received-bytes residue)] + (swap! (:received-bytes-atom connection) + (fn [_] received-bytes)) (doseq [message messages] - (admit-tcp-message! peer transport encoded-address - selection-key message)))))) + (admit-tcp-message! peer transport connection message)))))) (defn handle-socket-channel-writable! - [peer transport encoded-address selection-key] + [peer transport connection] ;; We are already in the selector thread, but add ourselves to the end of the - ;; selector-continuations-queue because we want to make sure we set the - ;; interest ops on the selection-key after any other continuations that might + ;; selector-callbacks-queue because we want to make sure we set the + ;; interest ops on the selection-key after any other callbacks that might ;; be setting OP_WRITE. - (assert-args handle-socket-channel-writable! - (vector? encoded-address) "encoded-address as vector") - (.add (:selector-continuations-queue peer) - #(let [sessions (deref (:sessions-agent transport))] - (when-let [connection (sessions [:connection encoded-address])] - (let [packet (.poll (:send-queue connection)) - packet (if (nil? packet) - (when-let [send-queue (sessions - [encoded-address - (:remote-peer-id connection)])] - (.poll send-queue)) - packet)] - (if (nil? packet) - (.interestOps selection-key SelectionKey/OP_READ) - (try - (.write (.channel selection-key) - (ByteBuffer/wrap (byte-array (:bytes packet)))) - ((:continuation! packet) true) - (catch Exception e - ((:continuation! packet) false) - (handle-disconnect! peer transport encoded-address - selection-key))))))))) + (.add (:selector-callbacks-queue peer) + #(let [packet (.poll (:send-queue connection)) + remote-peer-id (deref (:remote-peer-id-atom connection)) + packet (if (and (not (nil? remote-peer-id)) (nil? packet)) + (.poll ((deref (:sessions-agent transport)) remote-peer-id)) + packet)] + (if (nil? packet) + (.interestOps selection-key SelectionKey/OP_READ) + (try + (.write (:socket-channel connection) + (ByteBuffer/wrap (byte-array (:bytes packet)))) + ((:callback! packet) true) + (catch Exception e + ((:callback! packet) false) + (handle-disconnect! peer transport connection))))))) (defn handle-socket-channel-selected! - [peer transport encoded-address selection-key] + [peer transport connection] + ;; This is always called from inside the selector thread (try (if (.isConnectable selection-key) - (handle-socket-channel-connectable! peer transport encoded-address - selection-key)) + (handle-socket-channel-connectable! peer transport connection)) (if (.isReadable selection-key) - (handle-socket-channel-readable! peer transport encoded-address - selection-key)) + (handle-socket-channel-readable! peer transport connection)) (if (.isWritable selection-key) - (handle-socket-channel-writable! peer transport encoded-address - selection-key)) + (handle-socket-channel-writable! peer transport connection)) (catch Exception e - (handle-disconnect! peer transport encoded-address selection-key)))) + (handle-disconnect! peer transport connection)))) -(defn set-connection-writable! - [peer remote-peer connection] - (when (= (:remote-peer-id connection) (:id remote-peer)) - (update-selection-key-async! peer (:selection-key connection) - (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))) +(defn connect! + [peer transport remote-peer encoded-address] + (when-let [address (first (parse-address encoded-address))] + (.add (:selector-callbacks-queue peer) + (fn [] + (let [socket-channel (doto (SocketChannel/open) + (.configureBlocking false)) + selection-key (.register socket-channel (:selector peer) 0) + remote-peer-id (:id remote-peer) + send-queue (ConcurrentLinkedQueue.) + connection {:socket-channel socket-channel + :encoded-address encoded-address + :selection-key selection-key + :send-queue send-queue + :received-bytes-atom (atom nil) + :remote-peer-id-atom (atom remote-peer-id)}] + (enqueue-welcome-message! peer connection) + (.add send-queue + {:bytes (generate-welcome-message peer) + :callback! skip}) + (update-selection-key! + selection-key + SelectionKey/OP_CONNECT + (partial handle-socket-channel-selected! peer transport connection)) + (try + (.connect socket-channel address) + (catch Exception e + (handle-disconnect! peer transport connection))) + (send-do-exception-m! + (:connections-agent transport) + [remote-peer-connections (fetch-val remote-peer-id (hash-set)) + _ (set-val remote-peer-id + (conj remote-peer-connections connection))])))) + (.wakeup (:selector peer)))) -(defn set-connection-writable-or-connect! +(defn set-write-interest-or-connect! [peer transport remote-peer encoded-address] - (assert-args set-connection-writable-or-connect! - (vector? encoded-address) "encoded-address as vector") - (if-let [connection ((deref (:sessions-agent transport)) - [:connection encoded-address])] - (set-connection-writable! peer remote-peer connection) - (when-let [address (first (parse-address encoded-address))] - (.add (:selector-continuations-queue peer) - (fn [] - (let [socket-channel (doto (SocketChannel/open) - (.configureBlocking false)) - selection-key (.register socket-channel (:selector peer) 0) - send-queue (ConcurrentLinkedQueue.)] - (send-do-exception-m! (:sessions-agent transport) - [connection (fetch-val [:connection encoded-address]) - :when (if (nil? connection) - true - (do - (.cancel selection-key) - (set-connection-writable! peer remote-peer connection) - false)) - _ (set-val [:connection encoded-address] - {:socket-channel socket-channel - :selection-key selection-key - :send-queue send-queue - :remote-peer-id (:id remote-peer) - :received-bytes []})] - (try - (.connect socket-channel address) - (.add send-queue - {:bytes (generate-welcome-message peer) - :continuation! identity}) - (update-selection-key-async! peer selection-key - SelectionKey/OP_CONNECT - (partial handle-socket-channel-selected! peer transport - encoded-address)) - (catch Exception e - (.add (:selector-continuations-queue peer) - #(handle-disconnect! peer transport encoded-address - selection-key)) - (.wakeup (:selector peer)))))))) - (.wakeup (:selector peer))))) + (if-let [remote-peer-connections ((deref (:connections-agent transport)) + (:id remote-peer))] + (update-selection-key-async! + peer + (:selection-key (first remote-peer-connections)) + (bit-or SelectionKey/OP_CONNECT + SelectionKey/OP_READ + SelectionKey/OP_WRITE)) + (connect! peer transport remote-peer encoded-address))) + +(defn enqueue-messages! + [peer transport remote-peer encoded-address callback! messages send-queue] + ;; TODO: clean up messages for sessions that are never established. + (.add send-queue + {:bytes (mapcat encode-message messages) + :callback! callback!}) + (set-write-interest-or-connect! peer transport remote-peer + encoded-address)) (defn emit-messages-tcp! - [peer transport remote-peer encoded-address continuation! messages] - (assert-args emit-tcp-message! + [peer transport remote-peer encoded-address callback! messages] + (assert-args emit-messages-tcp! (vector? encoded-address) "encoded-address as vector") - (send-do-exception-m! (:sessions-agent transport) - [:let [continuation! #(do (emit-continuation! peer transport remote-peer - encoded-address %) - (when continuation! (continuation! %)))] - send-queue (fetch-val [encoded-address (:id remote-peer)] - (ConcurrentLinkedQueue.)) - _ (set-val [encoded-address (:id remote-peer)] send-queue)] - (do - ;; TODO: clean up messages for sessions that are never established. - (.add send-queue - {:bytes (mapcat encode-message messages) - :continuation! continuation!}) - (set-connection-writable-or-connect! peer transport remote-peer - encoded-address)))) + (let [callback! #(do (emit-callback! peer transport remote-peer + encoded-address %) + (when callback! (callback! %))) + enqueue! (partial enqueue-messages! peer transport remote-peer + encoded-address callback! messages) + remote-peer-id (:id remote-peer) + send-queue ((deref (:sessions-agent transport)) remote-peer-id)] + (if (nil? send-queue) + (send-do-exception-m! + (:sessions-agent transport) + [send-queue (fetch-val remote-peer-id (ConcurrentLinkedQueue.)) + _ (set-val remote-peer-id send-queue)] + (enqueue! send-queue)) + (enqueue! send-queue)))) (defn handle-channel-acceptable! - [peer transport server-selection-key] + [peer transport server-socket-channel] + ;; This is always called from inside the selector thread (try - (let [socket-channel (doto (.accept (.channel server-selection-key)) + (let [socket-channel (doto (.accept server-socket-channel) (.configureBlocking false)) address (.getRemoteSocketAddress (.socket socket-channel)) encoded-address (encode-address address) - selection-key (.register socket-channel (:selector peer) 0)] - (send-do-exception-m! (:sessions-agent transport) - [connection (fetch-val [:connection encoded-address]) - :when (if (nil? connection) - true - (do - (.cancel selection-key) - (.close (.channel selection-key)) - false)) - _ (set-val [:connection encoded-address] - {:socket-channel socket-channel - :selection-key selection-key - :send-queue (ConcurrentLinkedQueue.) - :received-bytes [] - :incoming true})] - (update-selection-key-async! peer selection-key SelectionKey/OP_READ - (partial handle-socket-channel-selected! peer transport - encoded-address)))) - (catch Exception e - ;; If accept throws an exception, ignore it - (do)))) + selection-key (.register socket-channel (:selector peer) 0) + connection {:socket-channel socket-channel + :encoded-address encoded-address + :selection-key selection-key + :send-queue (ConcurrentLinkedQueue.) + :received-bytes-atom (atom nil) + :remote-peer-id-atom (atom nil) + :incoming true}] + (update-selection-key! + selection-key + SelectionKey/OP_READ + (partial handle-socket-channel-selected! peer transport connection)) + (send-do-exception-m! (:connections-agent transport) + [pending-connections (fetch-val nil (hash-set)) + _ (set-val nil (conj pending-connections connection))])) + (catch Exception e nil))) (defn register-server-channel! [peer port] - (let [server-channel (ServerSocketChannel/open) - socket (.socket server-channel) + ;; This is always called from inside the selector thread + (let [server-socket-channel (ServerSocketChannel/open) + socket (.socket server-socket-channel) transport {:name "tcp" :emit-messages! (partial emit-messages-tcp! peer) + :connections-agent (agent {}) :sessions-agent (agent {})}] - (.configureBlocking server-channel false) + (.configureBlocking server-socket-channel false) (.bind socket (InetSocketAddress. port)) - (.register server-channel (:selector peer) SelectionKey/OP_ACCEPT - (partial handle-channel-acceptable! peer transport)) + (.register server-socket-channel (:selector peer) SelectionKey/OP_ACCEPT + (partial handle-channel-acceptable! peer transport server-socket-channel)) (send (:transports-agent peer) #(assoc % "tcp" transport)))) (defn activate-tcp! [peer port] - (.add (:selector-continuations-queue peer) + (.add (:selector-callbacks-queue peer) (partial register-server-channel! peer port)) (.wakeup (:selector peer))) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj index 25d1511..4f8e38a 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj @@ -327,7 +327,7 @@ signature-purpose-pong-using (handle-pong-using! peer remote-peer pong) nil))) -(defn emit-continuation! +(defn emit-callback! [peer transport remote-peer encoded-address result] (when result (let [addresses ((deref (:transport-addresses-agent remote-peer)) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/util.clj b/src/main/clojure/org/gnu/clojure/gnunet/util.clj index 329df62..ea813e3 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/util.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/util.clj @@ -26,3 +26,5 @@ (defn available-processors [] (.availableProcessors (Runtime/getRuntime))) + +(defn skip [& _]) |