summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-10-05 16:23:21 -0700
committerDavid Barksdale <amatus.amongus@gmail.com>2010-10-05 16:23:21 -0700
commit70948418ea93157e1d09da9875685f4a588ea8c6 (patch)
tree6a59e2d1d01cbb6ec9dd9707f8047adf81580d6e /src
parent3d4851cb3c3dac0a5c6503c075ac2ef5c3d6c69e (diff)
UDP transport can now receive packets!
Diffstat (limited to 'src')
-rw-r--r--src/org/gnu/clojure/gnunet/peer.clj25
-rw-r--r--src/org/gnu/clojure/gnunet/udp.clj53
2 files changed, 54 insertions, 24 deletions
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj
index 61acbf9..934000a 100644
--- a/src/org/gnu/clojure/gnunet/peer.clj
+++ b/src/org/gnu/clojure/gnunet/peer.clj
@@ -35,7 +35,12 @@
:selector
;; Thread which selects on :selector
- :selector-thread))))
+ :selector-thread
+
+ ;; ref of list of continuations, in order to access the selector while the
+ ;; selector-thread is running append a continuation to this list and call
+ ;; .wakeup on the selector
+ :selector-continuations-ref))))
(defstruct peer-options
:keypair)
@@ -48,16 +53,23 @@
(def id-size (count (sha-512 ())))
(defn selector-loop
- [selector]
+ [selector continuations]
+ (let [pending (ref '())]
+ (dosync
+ (alter pending #(identity %2) (deref continuations))
+ (alter continuations #(identity %2) '()))
+ (doseq [continuation (deref pending)]
+ (continuation)))
(.select selector)
(let [selected-keys (.selectedKeys selector)]
- (for [selection-key (enumeration-seq (.iterator selected-keys))]
+ (doseq [selection-key selected-keys]
((.attachment selection-key)))
(.clear selected-keys))
- (recur selector))
+ (recur selector continuations))
(defn new-peer [options]
- (let [selector (Selector/open)]
+ (let [selector (Selector/open)
+ continuations (ref '())]
(struct-map peer
:public-key (.getPublic (:keypair options))
:id (generate-id (.getPublic (:keypair options)))
@@ -66,4 +78,5 @@
:remote-peers-agent (agent {})
:transports-agent (agent nil)
:selector selector
- :selector-thread (Thread. (partial selector-loop selector))))
+ :selector-thread (Thread. (partial selector-loop selector continuations))
+ :selector-continuations-ref continuations)))
diff --git a/src/org/gnu/clojure/gnunet/udp.clj b/src/org/gnu/clojure/gnunet/udp.clj
index 5934cdf..0381353 100644
--- a/src/org/gnu/clojure/gnunet/udp.clj
+++ b/src/org/gnu/clojure/gnunet/udp.clj
@@ -2,8 +2,11 @@
(:use (org.gnu.clojure.gnunet inet parser message peer transport)
clojure.contrib.monads)
(:import (java.util Date Calendar)
- java.net.InetSocketAddress
- (java.nio.channels DatagramChannel SelectionKey)))
+ (java.net InetSocketAddress DatagramPacket)
+ (java.nio.channels DatagramChannel SelectionKey)
+ java.nio.ByteBuffer))
+
+(def max-udp-packet-length 65536)
(defn configure-udp-addresses!
"Adds new addresses for the udp transport to peer's transports-agent expiring
@@ -43,24 +46,38 @@
)
(defn admit-message-udp!
- [peer socket]
- )
+ [peer datagram-channel]
+ (let [byte-buffer (doto (ByteBuffer/allocate max-udp-packet-length) (.clear))
+ source-address (.receive datagram-channel byte-buffer)]
+ (.flip byte-buffer)
+ (let [string-builder (StringBuilder. "Received packet of length ")]
+ (.append string-builder (.limit byte-buffer))
+ (.append string-builder " from ")
+ (.append string-builder source-address)
+ (.append string-builder "\n")
+ (.write *out* (.toString string-builder)))
+ ))
-(defn activate-udp!
+(defn register-datagram-channel!
[peer port]
(let [datagram-channel (DatagramChannel/open)
- socket (.socket datagram-channel)
- selection-key (do
- (.configureBlocking datagram-channel false)
- (.bind socket (InetSocketAddress. port))
- (.register datagram-channel
+ socket (.socket datagram-channel)]
+ (.configureBlocking datagram-channel false)
+ (.bind socket (InetSocketAddress. port))
+ (let [selection-key (.register datagram-channel
(:selector peer)
SelectionKey/OP_READ
- (partial admit-message-udp! peer socket)))]
- (send (:transports-agent peer)
- (fn [transports]
- (assoc transports "udp"
- {:connect! connect-udp!
- :emit-message! emit-message-udp!
- :socket socket
- :selection-key selection-key})))))
+ (partial admit-message-udp! peer datagram-channel))]
+ (send (:transports-agent peer)
+ (fn [transports]
+ (assoc transports "udp"
+ {:connect! connect-udp!
+ :emit-message! emit-message-udp!
+ :socket socket
+ :selection-key selection-key}))))))
+
+(defn activate-udp!
+ [peer port]
+ (dosync (alter (:selector-continuations-ref peer)
+ conj (partial register-datagram-channel! peer port)))
+ (.wakeup (:selector peer)))