summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2011-08-13 00:24:43 -0500
committerDavid Barksdale <amatus.amongus@gmail.com>2011-08-13 00:24:43 -0500
commitc35250c7ab793e569c1ac293035bca8c929fabe6 (patch)
treed20c37e0331bbe96dd57a9401ffc402d83daf860
parent0465f3c418d0682c68d4cdcc3c8bde0f84584513 (diff)
Re-wrote TCP transport.
I'm pretty sure my use of the word "continuation" is inaccurate so I've renamed some things "callback".
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/peer.clj22
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/tcp.clj407
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/transport.clj2
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/util.clj2
4 files changed, 211 insertions, 222 deletions
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
index adb7a2b..446aa9a 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
@@ -65,11 +65,11 @@
;; Thread which selects on :selector
:selector-thread
- ;; java.util.concurrent.ConcurrentLinkedQueue of continuations.
+ ;; java.util.concurrent.ConcurrentLinkedQueue of callbacks.
;; In order to access the selector while the selector-thread is running add
- ;; a continuation to this queue and call .wakeup on the selector.
+ ;; a callback to this queue and call .wakeup on the selector.
;; The size of this queue is an easy measure our network load.
- :selector-continuations-queue
+ :selector-callbacks-queue
;; java.security.SecureRandom
:random
@@ -117,19 +117,19 @@
(== (:priority (meta this)) (:priority (meta obj))))))
(defn selector-loop!
- [selector continuations]
- (doseq [continuation (queue-seq! continuations)]
- (continuation))
+ [selector callbacks]
+ (doseq [callback (queue-seq! callbacks)]
+ (callback))
(.select selector)
(let [selected-keys (.selectedKeys selector)]
(doseq [selection-key selected-keys]
((.attachment selection-key) selection-key))
(.clear selected-keys))
- (recur selector continuations))
+ (recur selector callbacks))
(defn new-peer [options]
(let [selector (Selector/open)
- continuations (ConcurrentLinkedQueue.)
+ callbacks (ConcurrentLinkedQueue.)
cpu-bound-queue (LinkedBlockingQueue.)
cpu-bound-executor (ThreadPoolExecutor. 0 (available-processors) 60
TimeUnit/SECONDS cpu-bound-queue)
@@ -146,8 +146,8 @@
:transports-agent (agent {})
:dispatch-agent (agent {})
:selector selector
- :selector-thread (Thread. (partial selector-loop! selector continuations))
- :selector-continuations-queue continuations
+ :selector-thread (Thread. (partial selector-loop! selector callbacks))
+ :selector-callbacks-queue callbacks
:random (:random options)
:cpu-bound-executor cpu-bound-executor
:cpu-bound-queue cpu-bound-queue
@@ -159,7 +159,7 @@
(defn network-load
[peer]
;; TODO: figure out if we really need size, it's an O(n) operation
- (.size (:selector-continuations-queue peer)))
+ (.size (:selector-callbacks-queue peer)))
(defn cpu-load
[peer]
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
index 1246c6e..aa0b76c 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
@@ -10,277 +10,264 @@
(def message-type-tcp-welcome 60)
(defn encode-welcome
- [welcome]
- (:peer-id welcome))
+ [peer]
+ (:id peer))
(def parse-welcome
(domonad parser-m
- [peer-id (items id-size)]
- {:peer-id peer-id}))
+ [id (items id-size)]
+ {:id id}))
(defn generate-welcome-message
[peer]
(encode-message
{:message-type message-type-tcp-welcome
- :bytes (encode-welcome {:peer-id (:id peer)})}))
+ :bytes (encode-welcome peer)}))
(defn handle-disconnect!
- [peer transport encoded-address selection-key]
+ [peer transport connection]
;; This is always called from inside the selector thread
- (assert-args handle-disconnect!
- (vector? encoded-address) "encoded-address as vector")
- (.cancel selection-key)
- (.close (.channel selection-key))
- (send-do-exception-m! (:sessions-agent transport)
- [remote-peer-id (with-state-field [:connection encoded-address]
- (fetch-val :remote-peer-id))
- send-queue (fetch-val [encoded-address remote-peer-id])
- _ (update-state #(dissoc %
- [:connection encoded-address]
- [encoded-address remote-peer-id]))
- :when send-queue]
- (doseq [packet (queue-seq! send-queue)]
- ((:continuation! packet) false))))
+ (.cancel (:selection-key connection))
+ (.close (:socket-channel connection))
+ (send-do-exception-m!
+ (:connections-agent transport)
+ [pending-connections (fetch-val nil (hash-set))
+ _ (set-val nil (disj pending-connections connection))
+ :when-let [remote-peer-id (deref (:remote-peer-id-atom connection))]
+ remote-peer-connections (fetch-val remote-peer-id)
+ _ (set-val remote-peer-id (disj remote-peer-connections connection))]))
+
+(defn update-selection-key!
+ [selection-key ops & attachment]
+ ;; This is always called from inside the selector thread
+ (try
+ (.interestOps selection-key ops)
+ (when attachment (.attach selection-key (first attachment)))
+ (catch Exception e)))
(defn update-selection-key-async!
- ([peer selection-key ops]
- (.add (:selector-continuations-queue peer)
- #(try
- (.interestOps selection-key ops)
- (catch Exception e)))
- (.wakeup (:selector peer)))
- ([peer selection-key ops attachment]
- (.add (:selector-continuations-queue peer)
- #(try
- (.interestOps selection-key ops)
- (.attach selection-key attachment)
- (catch Exception e)))
- (.wakeup (:selector peer))))
+ [peer selection-key ops & attachment]
+ (.add (:selector-callbacks-queue peer)
+ #(apply update-selection-key! selection-key ops attachment))
+ (.wakeup (:selector peer)))
(defn admit-tcp-message!
- [peer transport encoded-address selection-key message]
- (assert-args admit-tcp-message!
- (vector? encoded-address) "encoded-address as vector")
+ [peer transport connection message]
+ ;; This is always called from inside the selector thread
(if (== message-type-tcp-welcome (:message-type message))
- (send-do-exception-m! (:sessions-agent transport)
+ (domonad
+ exception-m
[:when-let [welcome (first (parse-welcome (:bytes message)))]
- connection (fetch-val [:connection encoded-address])
- :when-not (nil? connection)
- ;; XXX: This is weird because for an outgoing connection we don't check
- ;; that the peer-id matches who we thought we connected to.
- _ (set-val [:connection encoded-address]
- (conj connection {:remote-peer-id (:peer-id welcome)
- :received-welcome true}))]
- (do
- (when (nil? (:remote-peer-id connection))
- (.add (:send-queue connection)
- {:bytes (generate-welcome-message peer)
- :continuation! identity}))
- ;; We have to send again to make sure our update to the sessions-agent
- ;; is finished before updating the selection-key
- (send (:sessions-agent transport)
- (fn [sessions]
- (update-selection-key-async! peer selection-key
- (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))
- sessions))))
- (send-do-exception-m! (:sessions-agent transport)
- [connection (fetch-val [:connection encoded-address])
- :when-let [remote-peer-id (:remote-peer-id connection)]
- :when (:received-welcome connection)
- :let [address {:transport "tcp"
- :encoded-address encoded-address
- :expiration (idle-connection-timeout)}]]
- (admit-message! peer remote-peer-id address message))))
+ :let [remote-peer-id (:peer-id welcome)]
+ :when-not (== remote-peer-id (:id peer))
+ :when (nil? (deref (:remote-peer-id-atom connection)))]
+ (let [remote-peer-id (:peer-id welcome)]
+ (swap! (:remote-peer-id-atom connection)
+ (fn [_] remote-peer-id))
+ (.add (:send-queue connection)
+ {:bytes (generate-welcome-message peer)
+ :callback! skip})
+ (update-selection-key!
+ (:selection-key connection)
+ (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))
+ (send-do-exception-m
+ (:connections-agent transport)
+ [pending-connections (fetch-val nil)
+ _ (set-val nil (disj pending-connections connection))
+ remote-peer-connections (fetch-val remote-peer-id (hash-set))
+ _ (set-val remote-peer-id (conj remote-peer-connections
+ connection))])))
+ (when-let [remote-peer-id (deref (:remote-peer-id-atom connection))]
+ (let [address {:transport "tcp"
+ :encoded-address encoded-address
+ :expiration (idle-connection-timeout)}]
+ (admit-message! peer remote-peer-id address message)))))
(defn handle-socket-channel-connectable!
- [peer transport encoded-address selection-key]
- (.finishConnect (.channel selection-key))
- (.interestOps selection-key
+ [peer transport connection]
+ ;; This is always called from inside the selector thread
+ (.finishConnect (:socket-channel connection))
+ (.interestOps (:selection-key connection)
(bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))
(defn handle-socket-channel-readable!
- [peer transport encoded-address selection-key]
- (assert-args handle-socket-channel-readable!
- (vector? encoded-address) "encoded-address as vector")
- (let [socket-channel (.channel selection-key)
+ [peer transport connection]
+ ;; This is always called from inside the selector thread
+ (let [socket-channel (:socket-channel connection)
socket (.socket socket-channel)
buffer-length (.getReceiveBufferSize socket)
byte-buffer (doto (ByteBuffer/allocate buffer-length) (.clear))
- bytes-read (.read (.channel selection-key) byte-buffer)]
+ bytes-read (.read socket-channel byte-buffer)]
(if (== -1 bytes-read)
- (handle-disconnect! peer transport encoded-address selection-key)
- (send-do-exception-m! (:sessions-agent transport)
- [connection (fetch-val [:connection encoded-address])
- :when-not (nil? connection)
- :let [received-bytes (concat (:received-bytes connection)
- (buffer-seq! (.flip byte-buffer)))]
- :let [[messages residue] ((one-or-more parse-message)
- received-bytes)]
- :let [received-bytes (if (nil? residue) received-bytes residue)]
- _ (set-val [:connection encoded-address]
- (assoc connection :received-bytes (vec received-bytes)))]
+ (handle-disconnect! peer transport connection)
+ ;; NB: parse-message will only fail on an incomplete message and
+ ;; there is a maximum message size.
+ ;; Therefore received-bytes is bounded by the maximum message size.
+ ;; If, for example, there were some checksum that it verified then
+ ;; received-bytes could grow without bound having an invalid message
+ ;; stuck in the buffer.
+ (let [received-bytes (concat (deref (:received-bytes-atom connection))
+ (buffer-seq! (.flip byte-buffer)))
+ [messages residue] ((one-or-more parse-message) received-bytes)
+ received-bytes (if (nil? residue) received-bytes residue)]
+ (swap! (:received-bytes-atom connection)
+ (fn [_] received-bytes))
(doseq [message messages]
- (admit-tcp-message! peer transport encoded-address
- selection-key message))))))
+ (admit-tcp-message! peer transport connection message))))))
(defn handle-socket-channel-writable!
- [peer transport encoded-address selection-key]
+ [peer transport connection]
;; We are already in the selector thread, but add ourselves to the end of the
- ;; selector-continuations-queue because we want to make sure we set the
- ;; interest ops on the selection-key after any other continuations that might
+ ;; selector-callbacks-queue because we want to make sure we set the
+ ;; interest ops on the selection-key after any other callbacks that might
;; be setting OP_WRITE.
- (assert-args handle-socket-channel-writable!
- (vector? encoded-address) "encoded-address as vector")
- (.add (:selector-continuations-queue peer)
- #(let [sessions (deref (:sessions-agent transport))]
- (when-let [connection (sessions [:connection encoded-address])]
- (let [packet (.poll (:send-queue connection))
- packet (if (nil? packet)
- (when-let [send-queue (sessions
- [encoded-address
- (:remote-peer-id connection)])]
- (.poll send-queue))
- packet)]
- (if (nil? packet)
- (.interestOps selection-key SelectionKey/OP_READ)
- (try
- (.write (.channel selection-key)
- (ByteBuffer/wrap (byte-array (:bytes packet))))
- ((:continuation! packet) true)
- (catch Exception e
- ((:continuation! packet) false)
- (handle-disconnect! peer transport encoded-address
- selection-key)))))))))
+ (.add (:selector-callbacks-queue peer)
+ #(let [packet (.poll (:send-queue connection))
+ remote-peer-id (deref (:remote-peer-id-atom connection))
+ packet (if (and (not (nil? remote-peer-id)) (nil? packet))
+ (.poll ((deref (:sessions-agent transport)) remote-peer-id))
+ packet)]
+ (if (nil? packet)
+ (.interestOps selection-key SelectionKey/OP_READ)
+ (try
+ (.write (:socket-channel connection)
+ (ByteBuffer/wrap (byte-array (:bytes packet))))
+ ((:callback! packet) true)
+ (catch Exception e
+ ((:callback! packet) false)
+ (handle-disconnect! peer transport connection)))))))
(defn handle-socket-channel-selected!
- [peer transport encoded-address selection-key]
+ [peer transport connection]
+ ;; This is always called from inside the selector thread
(try
(if (.isConnectable selection-key)
- (handle-socket-channel-connectable! peer transport encoded-address
- selection-key))
+ (handle-socket-channel-connectable! peer transport connection))
(if (.isReadable selection-key)
- (handle-socket-channel-readable! peer transport encoded-address
- selection-key))
+ (handle-socket-channel-readable! peer transport connection))
(if (.isWritable selection-key)
- (handle-socket-channel-writable! peer transport encoded-address
- selection-key))
+ (handle-socket-channel-writable! peer transport connection))
(catch Exception e
- (handle-disconnect! peer transport encoded-address selection-key))))
+ (handle-disconnect! peer transport connection))))
-(defn set-connection-writable!
- [peer remote-peer connection]
- (when (= (:remote-peer-id connection) (:id remote-peer))
- (update-selection-key-async! peer (:selection-key connection)
- (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))))
+(defn connect!
+ [peer transport remote-peer encoded-address]
+ (when-let [address (first (parse-address encoded-address))]
+ (.add (:selector-callbacks-queue peer)
+ (fn []
+ (let [socket-channel (doto (SocketChannel/open)
+ (.configureBlocking false))
+ selection-key (.register socket-channel (:selector peer) 0)
+ remote-peer-id (:id remote-peer)
+ send-queue (ConcurrentLinkedQueue.)
+ connection {:socket-channel socket-channel
+ :encoded-address encoded-address
+ :selection-key selection-key
+ :send-queue send-queue
+ :received-bytes-atom (atom nil)
+ :remote-peer-id-atom (atom remote-peer-id)}]
+ (enqueue-welcome-message! peer connection)
+ (.add send-queue
+ {:bytes (generate-welcome-message peer)
+ :callback! skip})
+ (update-selection-key!
+ selection-key
+ SelectionKey/OP_CONNECT
+ (partial handle-socket-channel-selected! peer transport connection))
+ (try
+ (.connect socket-channel address)
+ (catch Exception e
+ (handle-disconnect! peer transport connection)))
+ (send-do-exception-m!
+ (:connections-agent transport)
+ [remote-peer-connections (fetch-val remote-peer-id (hash-set))
+ _ (set-val remote-peer-id
+ (conj remote-peer-connections connection))]))))
+ (.wakeup (:selector peer))))
-(defn set-connection-writable-or-connect!
+(defn set-write-interest-or-connect!
[peer transport remote-peer encoded-address]
- (assert-args set-connection-writable-or-connect!
- (vector? encoded-address) "encoded-address as vector")
- (if-let [connection ((deref (:sessions-agent transport))
- [:connection encoded-address])]
- (set-connection-writable! peer remote-peer connection)
- (when-let [address (first (parse-address encoded-address))]
- (.add (:selector-continuations-queue peer)
- (fn []
- (let [socket-channel (doto (SocketChannel/open)
- (.configureBlocking false))
- selection-key (.register socket-channel (:selector peer) 0)
- send-queue (ConcurrentLinkedQueue.)]
- (send-do-exception-m! (:sessions-agent transport)
- [connection (fetch-val [:connection encoded-address])
- :when (if (nil? connection)
- true
- (do
- (.cancel selection-key)
- (set-connection-writable! peer remote-peer connection)
- false))
- _ (set-val [:connection encoded-address]
- {:socket-channel socket-channel
- :selection-key selection-key
- :send-queue send-queue
- :remote-peer-id (:id remote-peer)
- :received-bytes []})]
- (try
- (.connect socket-channel address)
- (.add send-queue
- {:bytes (generate-welcome-message peer)
- :continuation! identity})
- (update-selection-key-async! peer selection-key
- SelectionKey/OP_CONNECT
- (partial handle-socket-channel-selected! peer transport
- encoded-address))
- (catch Exception e
- (.add (:selector-continuations-queue peer)
- #(handle-disconnect! peer transport encoded-address
- selection-key))
- (.wakeup (:selector peer))))))))
- (.wakeup (:selector peer)))))
+ (if-let [remote-peer-connections ((deref (:connections-agent transport))
+ (:id remote-peer))]
+ (update-selection-key-async!
+ peer
+ (:selection-key (first remote-peer-connections))
+ (bit-or SelectionKey/OP_CONNECT
+ SelectionKey/OP_READ
+ SelectionKey/OP_WRITE))
+ (connect! peer transport remote-peer encoded-address)))
+
+(defn enqueue-messages!
+ [peer transport remote-peer encoded-address callback! messages send-queue]
+ ;; TODO: clean up messages for sessions that are never established.
+ (.add send-queue
+ {:bytes (mapcat encode-message messages)
+ :callback! callback!})
+ (set-write-interest-or-connect! peer transport remote-peer
+ encoded-address))
(defn emit-messages-tcp!
- [peer transport remote-peer encoded-address continuation! messages]
- (assert-args emit-tcp-message!
+ [peer transport remote-peer encoded-address callback! messages]
+ (assert-args emit-messages-tcp!
(vector? encoded-address) "encoded-address as vector")
- (send-do-exception-m! (:sessions-agent transport)
- [:let [continuation! #(do (emit-continuation! peer transport remote-peer
- encoded-address %)
- (when continuation! (continuation! %)))]
- send-queue (fetch-val [encoded-address (:id remote-peer)]
- (ConcurrentLinkedQueue.))
- _ (set-val [encoded-address (:id remote-peer)] send-queue)]
- (do
- ;; TODO: clean up messages for sessions that are never established.
- (.add send-queue
- {:bytes (mapcat encode-message messages)
- :continuation! continuation!})
- (set-connection-writable-or-connect! peer transport remote-peer
- encoded-address))))
+ (let [callback! #(do (emit-callback! peer transport remote-peer
+ encoded-address %)
+ (when callback! (callback! %)))
+ enqueue! (partial enqueue-messages! peer transport remote-peer
+ encoded-address callback! messages)
+ remote-peer-id (:id remote-peer)
+ send-queue ((deref (:sessions-agent transport)) remote-peer-id)]
+ (if (nil? send-queue)
+ (send-do-exception-m!
+ (:sessions-agent transport)
+ [send-queue (fetch-val remote-peer-id (ConcurrentLinkedQueue.))
+ _ (set-val remote-peer-id send-queue)]
+ (enqueue! send-queue))
+ (enqueue! send-queue))))
(defn handle-channel-acceptable!
- [peer transport server-selection-key]
+ [peer transport server-socket-channel]
+ ;; This is always called from inside the selector thread
(try
- (let [socket-channel (doto (.accept (.channel server-selection-key))
+ (let [socket-channel (doto (.accept server-socket-channel)
(.configureBlocking false))
address (.getRemoteSocketAddress (.socket socket-channel))
encoded-address (encode-address address)
- selection-key (.register socket-channel (:selector peer) 0)]
- (send-do-exception-m! (:sessions-agent transport)
- [connection (fetch-val [:connection encoded-address])
- :when (if (nil? connection)
- true
- (do
- (.cancel selection-key)
- (.close (.channel selection-key))
- false))
- _ (set-val [:connection encoded-address]
- {:socket-channel socket-channel
- :selection-key selection-key
- :send-queue (ConcurrentLinkedQueue.)
- :received-bytes []
- :incoming true})]
- (update-selection-key-async! peer selection-key SelectionKey/OP_READ
- (partial handle-socket-channel-selected! peer transport
- encoded-address))))
- (catch Exception e
- ;; If accept throws an exception, ignore it
- (do))))
+ selection-key (.register socket-channel (:selector peer) 0)
+ connection {:socket-channel socket-channel
+ :encoded-address encoded-address
+ :selection-key selection-key
+ :send-queue (ConcurrentLinkedQueue.)
+ :received-bytes-atom (atom nil)
+ :remote-peer-id-atom (atom nil)
+ :incoming true}]
+ (update-selection-key!
+ selection-key
+ SelectionKey/OP_READ
+ (partial handle-socket-channel-selected! peer transport connection))
+ (send-do-exception-m! (:connections-agent transport)
+ [pending-connections (fetch-val nil (hash-set))
+ _ (set-val nil (conj pending-connections connection))]))
+ (catch Exception e nil)))
(defn register-server-channel!
[peer port]
- (let [server-channel (ServerSocketChannel/open)
- socket (.socket server-channel)
+ ;; This is always called from inside the selector thread
+ (let [server-socket-channel (ServerSocketChannel/open)
+ socket (.socket server-socket-channel)
transport {:name "tcp"
:emit-messages! (partial emit-messages-tcp! peer)
+ :connections-agent (agent {})
:sessions-agent (agent {})}]
- (.configureBlocking server-channel false)
+ (.configureBlocking server-socket-channel false)
(.bind socket (InetSocketAddress. port))
- (.register server-channel (:selector peer) SelectionKey/OP_ACCEPT
- (partial handle-channel-acceptable! peer transport))
+ (.register server-socket-channel (:selector peer) SelectionKey/OP_ACCEPT
+ (partial handle-channel-acceptable! peer transport server-socket-channel))
(send (:transports-agent peer)
#(assoc % "tcp" transport))))
(defn activate-tcp!
[peer port]
- (.add (:selector-continuations-queue peer)
+ (.add (:selector-callbacks-queue peer)
(partial register-server-channel! peer port))
(.wakeup (:selector peer)))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
index 25d1511..4f8e38a 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
@@ -327,7 +327,7 @@
signature-purpose-pong-using (handle-pong-using! peer remote-peer pong)
nil)))
-(defn emit-continuation!
+(defn emit-callback!
[peer transport remote-peer encoded-address result]
(when result
(let [addresses ((deref (:transport-addresses-agent remote-peer))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/util.clj b/src/main/clojure/org/gnu/clojure/gnunet/util.clj
index 329df62..ea813e3 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/util.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/util.clj
@@ -26,3 +26,5 @@
(defn available-processors
[]
(.availableProcessors (Runtime/getRuntime)))
+
+(defn skip [& _])