diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-10-05 16:23:21 -0700 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-10-05 16:23:21 -0700 |
commit | 70948418ea93157e1d09da9875685f4a588ea8c6 (patch) | |
tree | 6a59e2d1d01cbb6ec9dd9707f8047adf81580d6e /src | |
parent | 3d4851cb3c3dac0a5c6503c075ac2ef5c3d6c69e (diff) |
UDP transport can now receive packets!
Diffstat (limited to 'src')
-rw-r--r-- | src/org/gnu/clojure/gnunet/peer.clj | 25 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/udp.clj | 53 |
2 files changed, 54 insertions, 24 deletions
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj index 61acbf9..934000a 100644 --- a/src/org/gnu/clojure/gnunet/peer.clj +++ b/src/org/gnu/clojure/gnunet/peer.clj @@ -35,7 +35,12 @@ :selector ;; Thread which selects on :selector - :selector-thread)))) + :selector-thread + + ;; ref of list of continuations, in order to access the selector while the + ;; selector-thread is running append a continuation to this list and call + ;; .wakeup on the selector + :selector-continuations-ref)))) (defstruct peer-options :keypair) @@ -48,16 +53,23 @@ (def id-size (count (sha-512 ()))) (defn selector-loop - [selector] + [selector continuations] + (let [pending (ref '())] + (dosync + (alter pending #(identity %2) (deref continuations)) + (alter continuations #(identity %2) '())) + (doseq [continuation (deref pending)] + (continuation))) (.select selector) (let [selected-keys (.selectedKeys selector)] - (for [selection-key (enumeration-seq (.iterator selected-keys))] + (doseq [selection-key selected-keys] ((.attachment selection-key))) (.clear selected-keys)) - (recur selector)) + (recur selector continuations)) (defn new-peer [options] - (let [selector (Selector/open)] + (let [selector (Selector/open) + continuations (ref '())] (struct-map peer :public-key (.getPublic (:keypair options)) :id (generate-id (.getPublic (:keypair options))) @@ -66,4 +78,5 @@ :remote-peers-agent (agent {}) :transports-agent (agent nil) :selector selector - :selector-thread (Thread. (partial selector-loop selector)))) + :selector-thread (Thread. (partial selector-loop selector continuations)) + :selector-continuations-ref continuations))) diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj index 5934cdf..0381353 100644 --- a/src/org/gnu/clojure/gnunet/udp.clj +++ b/src/org/gnu/clojure/gnunet/udp.clj @@ -2,8 +2,11 @@ (:use (org.gnu.clojure.gnunet inet parser message peer transport) clojure.contrib.monads) (:import (java.util Date Calendar) - java.net.InetSocketAddress - (java.nio.channels DatagramChannel SelectionKey))) + (java.net InetSocketAddress DatagramPacket) + (java.nio.channels DatagramChannel SelectionKey) + java.nio.ByteBuffer)) + +(def max-udp-packet-length 65536) (defn configure-udp-addresses! "Adds new addresses for the udp transport to peer's transports-agent expiring @@ -43,24 +46,38 @@ ) (defn admit-message-udp! - [peer socket] - ) + [peer datagram-channel] + (let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear)) + source-address (.receive datagram-channel byte-buffer)] + (.flip byte-buffer) + (let [string-builder (StringBuilder. "Received packet of length ")] + (.append string-builder (.limit byte-buffer)) + (.append string-builder " from ") + (.append string-builder source-address) + (.append string-builder "\n") + (.write *out* (.toString string-builder))) + )) -(defn activate-udp! +(defn register-datagram-channel! [peer port] (let [datagram-channel (DatagramChannel/open) - socket (.socket datagram-channel) - selection-key (do - (.configureBlocking datagram-channel false) - (.bind socket (InetSocketAddress. port)) - (.register datagram-channel + socket (.socket datagram-channel)] + (.configureBlocking datagram-channel false) + (.bind socket (InetSocketAddress. port)) + (let [selection-key (.register datagram-channel (:selector peer) SelectionKey/OP_READ - (partial admit-message-udp! peer socket)))] - (send (:transports-agent peer) - (fn [transports] - (assoc transports "udp" - {:connect! connect-udp! - :emit-message! emit-message-udp! - :socket socket - :selection-key selection-key}))))) + (partial admit-message-udp! peer datagram-channel))] + (send (:transports-agent peer) + (fn [transports] + (assoc transports "udp" + {:connect! connect-udp! + :emit-message! emit-message-udp! + :socket socket + :selection-key selection-key})))))) + +(defn activate-udp! + [peer port] + (dosync (alter (:selector-continuations-ref peer) + conj (partial register-datagram-channel! peer port))) + (.wakeup (:selector peer))) |