diff options
-rw-r--r-- | src/main/clojure/org/gnu/clojure/gnunet/tcp.clj | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj index d5b3d29..0402fad 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj @@ -56,11 +56,15 @@ [peer transport connection message] ;; This is always called from inside the selector thread (if (== message-type-tcp-welcome (:message-type message)) + ;; XXX We ignore this message if this is an outgoing connection. + ;; Seems like we should reset :remote-peer-id-atom and then move it to the + ;; correct set in :connections-agent and probably clear out the :send-queue. + ;; Or we could just drop the connection. (domonad - exception-m - [:when-let [welcome (first (parse-welcome (:bytes message)))] + maybe-m + [welcome (first (parse-welcome (:bytes message))) :let [remote-peer-id (:peer-id welcome)] - :when-not (== remote-peer-id (:id peer)) + :when-not (= remote-peer-id (:id peer)) :when (nil? (deref (:remote-peer-id-atom connection)))] (let [remote-peer-id (:peer-id welcome)] (swap! (:remote-peer-id-atom connection) @@ -86,7 +90,7 @@ [peer transport connection] ;; This is always called from inside the selector thread (.finishConnect (:socket-channel connection)) - (.interestOps (:selection-key connection) + (update-selection-key! (:selection-key connection) (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))) (defn handle-socket-channel-readable! @@ -131,9 +135,11 @@ (try (.write (:socket-channel connection) (ByteBuffer/wrap (byte-array (:bytes packet)))) - ((:callback! packet) true) + (when-let [callback! (:callback! packet)] + (callback! true)) (catch Exception e - ((:callback! packet) false) + (when-let [callback! (:callback! packet)] + (callback! false)) (handle-disconnect! peer transport connection))))))) (defn handle-socket-channel-selected! @@ -148,6 +154,7 @@ (when (.isWritable selection-key) (handle-socket-channel-writable! peer transport connection)) (catch Exception e + ;; (.printStackTrace e) (handle-disconnect! peer transport connection))))) (defn connect! @@ -166,26 +173,27 @@ :send-queue send-queue :received-bytes-atom (atom nil) :remote-peer-id-atom (atom remote-peer-id)}] - (.add send-queue - {:bytes (generate-welcome-message peer) - :callback! skip}) + (.add send-queue {:bytes (generate-welcome-message peer)}) (update-selection-key! selection-key SelectionKey/OP_CONNECT (partial handle-socket-channel-selected! peer transport connection)) (try (.connect socket-channel address) + (send + (:connections-agent transport) + conj-vals + (hash-set) + [remote-peer-id connection]) (catch Exception e - (handle-disconnect! peer transport connection))) - (send - (:connections-agent transport) - conj-vals - (hash-set) - [remote-peer-id connection])))) + (handle-disconnect! peer transport connection)))))) (.wakeup (:selector peer)))) -(defn set-write-interest-or-connect! - [peer transport remote-peer encoded-address] +(defn enqueue-messages! + [peer transport remote-peer encoded-address callback! messages send-queue] + ;; TODO: clean up messages for sessions that are never established. + (.add send-queue {:bytes (mapcat encode-message messages) + :callback! callback!}) (if-let [remote-peer-connections ((deref (:connections-agent transport)) (:id remote-peer))] (update-selection-key-async! @@ -196,15 +204,6 @@ SelectionKey/OP_WRITE)) (connect! peer transport remote-peer encoded-address))) -(defn enqueue-messages! - [peer transport remote-peer encoded-address callback! messages send-queue] - ;; TODO: clean up messages for sessions that are never established. - (.add send-queue - {:bytes (mapcat encode-message messages) - :callback! callback!}) - (set-write-interest-or-connect! peer transport remote-peer - encoded-address)) - (defn emit-messages-tcp! [peer transport remote-peer encoded-address callback! messages] (assert-args emit-messages-tcp! |