summaryrefslogtreecommitdiff
path: root/src/main/clojure/org/gnu
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/clojure/org/gnu')
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/tcp.clj51
1 files changed, 25 insertions, 26 deletions
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
index d5b3d29..0402fad 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
@@ -56,11 +56,15 @@
[peer transport connection message]
;; This is always called from inside the selector thread
(if (== message-type-tcp-welcome (:message-type message))
+ ;; XXX We ignore this message if this is an outgoing connection.
+ ;; Seems like we should reset :remote-peer-id-atom and then move it to the
+ ;; correct set in :connections-agent and probably clear out the :send-queue.
+ ;; Or we could just drop the connection.
(domonad
- exception-m
- [:when-let [welcome (first (parse-welcome (:bytes message)))]
+ maybe-m
+ [welcome (first (parse-welcome (:bytes message)))
:let [remote-peer-id (:peer-id welcome)]
- :when-not (== remote-peer-id (:id peer))
+ :when-not (= remote-peer-id (:id peer))
:when (nil? (deref (:remote-peer-id-atom connection)))]
(let [remote-peer-id (:peer-id welcome)]
(swap! (:remote-peer-id-atom connection)
@@ -86,7 +90,7 @@
[peer transport connection]
;; This is always called from inside the selector thread
(.finishConnect (:socket-channel connection))
- (.interestOps (:selection-key connection)
+ (update-selection-key! (:selection-key connection)
(bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)))
(defn handle-socket-channel-readable!
@@ -131,9 +135,11 @@
(try
(.write (:socket-channel connection)
(ByteBuffer/wrap (byte-array (:bytes packet))))
- ((:callback! packet) true)
+ (when-let [callback! (:callback! packet)]
+ (callback! true))
(catch Exception e
- ((:callback! packet) false)
+ (when-let [callback! (:callback! packet)]
+ (callback! false))
(handle-disconnect! peer transport connection)))))))
(defn handle-socket-channel-selected!
@@ -148,6 +154,7 @@
(when (.isWritable selection-key)
(handle-socket-channel-writable! peer transport connection))
(catch Exception e
+ ;; (.printStackTrace e)
(handle-disconnect! peer transport connection)))))
(defn connect!
@@ -166,26 +173,27 @@
:send-queue send-queue
:received-bytes-atom (atom nil)
:remote-peer-id-atom (atom remote-peer-id)}]
- (.add send-queue
- {:bytes (generate-welcome-message peer)
- :callback! skip})
+ (.add send-queue {:bytes (generate-welcome-message peer)})
(update-selection-key!
selection-key
SelectionKey/OP_CONNECT
(partial handle-socket-channel-selected! peer transport connection))
(try
(.connect socket-channel address)
+ (send
+ (:connections-agent transport)
+ conj-vals
+ (hash-set)
+ [remote-peer-id connection])
(catch Exception e
- (handle-disconnect! peer transport connection)))
- (send
- (:connections-agent transport)
- conj-vals
- (hash-set)
- [remote-peer-id connection]))))
+ (handle-disconnect! peer transport connection))))))
(.wakeup (:selector peer))))
-(defn set-write-interest-or-connect!
- [peer transport remote-peer encoded-address]
+(defn enqueue-messages!
+ [peer transport remote-peer encoded-address callback! messages send-queue]
+ ;; TODO: clean up messages for sessions that are never established.
+ (.add send-queue {:bytes (mapcat encode-message messages)
+ :callback! callback!})
(if-let [remote-peer-connections ((deref (:connections-agent transport))
(:id remote-peer))]
(update-selection-key-async!
@@ -196,15 +204,6 @@
SelectionKey/OP_WRITE))
(connect! peer transport remote-peer encoded-address)))
-(defn enqueue-messages!
- [peer transport remote-peer encoded-address callback! messages send-queue]
- ;; TODO: clean up messages for sessions that are never established.
- (.add send-queue
- {:bytes (mapcat encode-message messages)
- :callback! callback!})
- (set-write-interest-or-connect! peer transport remote-peer
- encoded-address))
-
(defn emit-messages-tcp!
[peer transport remote-peer encoded-address callback! messages]
(assert-args emit-messages-tcp!