summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2010-11-02 07:15:48 -0700
committerDavid Barksdale <amatus.amongus@gmail.com>2010-11-02 07:15:48 -0700
commitfe33193b9e34492d6eb38b067e3686bc9afdf2c8 (patch)
tree3580ee1b268e3ece747fa5e7c1c48165a0fb2036
parent8d14a8036807bb75b66d83dbc605ee8eaa8fd5f4 (diff)
New filesharing message handlers.
-rw-r--r--NOTES3
-rw-r--r--src/org/gnu/clojure/gnunet/bloomfilter.clj19
-rw-r--r--src/org/gnu/clojure/gnunet/core.clj25
-rw-r--r--src/org/gnu/clojure/gnunet/filesharing.clj74
-rw-r--r--src/org/gnu/clojure/gnunet/parser.clj7
-rw-r--r--src/org/gnu/clojure/gnunet/peer.clj6
-rw-r--r--src/org/gnu/clojure/gnunet/transport.clj8
7 files changed, 123 insertions, 19 deletions
diff --git a/NOTES b/NOTES
index 00d950b..f508074 100644
--- a/NOTES
+++ b/NOTES
@@ -1,8 +1,9 @@
-(use '(org.gnu.clojure.gnunet core crypto hello hostlist inet iostream message parser peer tcp transport udp util))
+(use '(org.gnu.clojure.gnunet core crypto filesharing hello hostlist inet iostream message parser peer tcp transport udp util))
(def random (java.security.SecureRandom.))
(def keypair (generate-rsa-keypair! random))
(def my-peer (new-peer {:keypair keypair :random random}))
(.start (:selector-thread my-peer))
(activate-udp! my-peer 5678)
(configure-udp-addresses! my-peer (get-local-addresses) 5678)
+(activate-filesharing! my-peer)
(download-hostlist! (partial admit-hello! my-peer) "http://192.168.8.2:58080")
diff --git a/src/org/gnu/clojure/gnunet/bloomfilter.clj b/src/org/gnu/clojure/gnunet/bloomfilter.clj
new file mode 100644
index 0000000..cd9dd33
--- /dev/null
+++ b/src/org/gnu/clojure/gnunet/bloomfilter.clj
@@ -0,0 +1,19 @@
+(ns org.gnu.clojure.gnunet.bloomfilter
+ (:use (org.gnu.clojure.gnunet message parser)
+ clojure.contrib.monads))
+
+(def bloomfilter-k 16)
+
+(defn make-bloomfilter
+ [bitmap size k]
+ {:bitmap bitmap
+ :size size
+ :k k})
+
+(defn parse-bloomfilter
+ [k]
+ (domonad parser-m
+ [bitmap (one-or-more item)
+ :let [size (count bitmap)]
+ :when (== 0 (bit-and size (dec size)))]
+ (make-bloomfilter (decode-uint bitmap) size k))) \ No newline at end of file
diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj
index 10d61b2..f13ec9f 100644
--- a/src/org/gnu/clojure/gnunet/core.clj
+++ b/src/org/gnu/clojure/gnunet/core.clj
@@ -150,11 +150,11 @@
[sequence-number parse-uint32
inbound-bw-limit parse-uint32
timestamp parse-date
- message-bytes (none-or-more item)]
+ messages (none-or-more parse-message)]
{:sequence-number sequence-number
:inbound-bw-limit inbound-bw-limit
:timestamp timestamp
- :bytes message-bytes})
+ :messages messages})
plaintext))]
:when message]
message))
@@ -304,11 +304,14 @@
(defn admit-core-message!
[peer remote-peer message]
- (send (:state-agent remote-peer)
- (fn [state]
- ;; TODO: update bandwidth tracking
- state
- )))
+ (if-let [dispatchers ((deref (:dispatch-agent peer))
+ (:message-type message))]
+ (doseq [dispatcher! dispatchers]
+ (dispatcher! peer remote-peer message))
+ (do
+ (.write *out* "No dispatcher for message type ")
+ (.write *out* (.toString (:message-type message)))
+ (.write *out* "\n"))))
(defn handle-core-encrypted-message!
[peer remote-peer message]
@@ -333,14 +336,18 @@
(if (bit-test bitmap bit)
state
(do
- (admit-core-message! peer remote-peer message)
+ ;; TODO: update bandwidth tracking
+ (doseq [message (:messages message)]
+ (admit-core-message! peer remote-peer message))
(assoc state :last-packets-bitmap (bit-or bitmap bit)))))
(< last-seqnum-received seqnum)
(let [bitmap (.intValue
(bit-shift-left
(bigint (:last-packets-bitmap state))
(- seqnum last-seqnum-received)))]
- (admit-core-message! peer remote-peer message)
+ ;; TODO: update bandwidth tracking
+ (doseq [message (:messages message)]
+ (admit-core-message! peer remote-peer message))
(conj state {:last-packets-bitmap bitmap
:last-sequence-number-received seqnum}))
:else state))
diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj
new file mode 100644
index 0000000..16c22b4
--- /dev/null
+++ b/src/org/gnu/clojure/gnunet/filesharing.clj
@@ -0,0 +1,74 @@
+(ns org.gnu.clojure.gnunet.filesharing
+ (:use (org.gnu.clojure.gnunet bloomfilter crypto message parser peer)
+ clojure.contrib.monads))
+
+(def message-type-fs-get 137)
+(def message-type-fs-put 138)
+(def message-type-fs-migration-stop 139)
+
+(def bit-return-to 1)
+(def bit-sks-namespace 2)
+(def bit-transmit-to 3)
+
+(def parse-get-message
+ (domonad parser-m
+ [block-type parse-int32
+ priority parse-uint32
+ ttl parse-int32
+ filter-mutator parse-int32
+ hash-bitmap parse-int32
+ query (items hash-size)
+ return-to (conditional (bit-test hash-bitmap bit-return-to)
+ (items id-size))
+ sks-namespace (conditional (bit-test hash-bitmap bit-sks-namespace)
+ (items hash-size))
+ transmit-to (conditional (bit-test hash-bitmap bit-transmit-to)
+ (items id-size))
+ bloomfilter (optional (parse-bloomfilter bloomfilter-k))]
+ {:block-type block-type
+ :priority priority
+ :ttl ttl
+ :filter-mutator filter-mutator
+ :query query
+ :return-to return-to
+ :sks-namespace sks-namespace
+ :transmit-to transmit-to
+ :bloomfilter bloomfilter}))
+
+(defn bound-priority
+ [priority remote-peer]
+ )
+
+(defn admit-get!
+ [peer remote-peer message]
+ (when-let [get-message (first (parse-get-message (:bytes message)))]
+ (.write *out* (.toString get-message))
+ (.write *out* "\n")
+ )
+ (when-let [return-to (if (:return-to message)
+ ((deref (:remote-peers-agent peer))
+ (:return-to message))
+ remote-peer)]
+ (when-let [priority (bound-priority (:priority message) remote-peer)]
+ )))
+
+
+(defn admit-put!
+ [peer remote-peer message])
+
+(defn admit-migration-stop!
+ [peer remote-peer message])
+
+(defn activate-filesharing!
+ [peer]
+ (send (:dispatch-agent peer)
+ (fn [dispatchers]
+ (let [get-dispatchers (dispatchers message-type-fs-get #{})
+ put-dispatchers (dispatchers message-type-fs-put #{})
+ migration-stop-dispatchers
+ (dispatchers message-type-fs-migration-stop #{})]
+ (conj dispatchers
+ {message-type-fs-get (conj get-dispatchers admit-get!)
+ message-type-fs-put (conj put-dispatchers admit-put!)
+ message-type-fs-migration-stop (conj migration-stop-dispatchers
+ admit-migration-stop!)})))))
diff --git a/src/org/gnu/clojure/gnunet/parser.clj b/src/org/gnu/clojure/gnunet/parser.clj
index b4899fa..c96b293 100644
--- a/src/org/gnu/clojure/gnunet/parser.clj
+++ b/src/org/gnu/clojure/gnunet/parser.clj
@@ -38,6 +38,13 @@
(with-monad parser-m
(m-plus mv (m-result nil))))
+(defn conditional
+ "Returns the parser on a condition, otherwise a parser that returns nil."
+ [condition mv]
+ (if condition
+ mv
+ (with-monad parser-m (m-result nil))))
+
(defn none-or-more
"Makes a parser repeat none or more times."
[mv]
diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj
index b9fff19..a6c8ba5 100644
--- a/src/org/gnu/clojure/gnunet/peer.clj
+++ b/src/org/gnu/clojure/gnunet/peer.clj
@@ -42,6 +42,9 @@
;; agent of a map of transport names (String) to maps of {:emit-message!}
:transports-agent
+ ;; agent of a map of message types to sets of dispatch handlers
+ :dispatch-agent
+
;; java.nio.channels.Selector
:selector
@@ -86,7 +89,8 @@
:transport-addresses-agent (agent {})
:private-key (.getPrivate (:keypair options))
:remote-peers-agent (agent {})
- :transports-agent (agent nil)
+ :transports-agent (agent {})
+ :dispatch-agent (agent {})
:selector selector
:selector-thread (Thread. (partial selector-loop! selector continuations))
:selector-continuations-queue continuations
diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj
index 8e368a8..6eca59a 100644
--- a/src/org/gnu/clojure/gnunet/transport.clj
+++ b/src/org/gnu/clojure/gnunet/transport.clj
@@ -304,14 +304,6 @@
(defn admit-message!
[peer sender-id address message]
- (let [string-builder (StringBuilder. "Received message type ")]
- (.append string-builder (:message-type message))
- (.append string-builder " from ")
- (.append string-builder (vec (:encoded-address address)))
- (.append string-builder " id ")
- (.append string-builder sender-id)
- (.append string-builder "\n")
- (.write *out* (.toString string-builder)))
(send (:remote-peers-agent peer)
(fn [remote-peers]
(let [remote-peers (update-remote-peers remote-peers