diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2010-11-02 07:15:48 -0700 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2010-11-02 07:15:48 -0700 |
commit | fe33193b9e34492d6eb38b067e3686bc9afdf2c8 (patch) | |
tree | 3580ee1b268e3ece747fa5e7c1c48165a0fb2036 | |
parent | 8d14a8036807bb75b66d83dbc605ee8eaa8fd5f4 (diff) |
New filesharing message handlers.
-rw-r--r-- | NOTES | 3 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/bloomfilter.clj | 19 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/core.clj | 25 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/filesharing.clj | 74 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/parser.clj | 7 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/peer.clj | 6 | ||||
-rw-r--r-- | src/org/gnu/clojure/gnunet/transport.clj | 8 |
7 files changed, 123 insertions, 19 deletions
@@ -1,8 +1,9 @@ -(use '(org.gnu.clojure.gnunet core crypto hello hostlist inet iostream message parser peer tcp transport udp util)) +(use '(org.gnu.clojure.gnunet core crypto filesharing hello hostlist inet iostream message parser peer tcp transport udp util)) (def random (java.security.SecureRandom.)) (def keypair (generate-rsa-keypair! random)) (def my-peer (new-peer {:keypair keypair :random random})) (.start (:selector-thread my-peer)) (activate-udp! my-peer 5678) (configure-udp-addresses! my-peer (get-local-addresses) 5678) +(activate-filesharing! my-peer) (download-hostlist! (partial admit-hello! my-peer) "http://192.168.8.2:58080") diff --git a/src/org/gnu/clojure/gnunet/bloomfilter.clj b/src/org/gnu/clojure/gnunet/bloomfilter.clj new file mode 100644 index 0000000..cd9dd33 --- /dev/null +++ b/src/org/gnu/clojure/gnunet/bloomfilter.clj @@ -0,0 +1,19 @@ +(ns org.gnu.clojure.gnunet.bloomfilter + (:use (org.gnu.clojure.gnunet message parser) + clojure.contrib.monads)) + +(def bloomfilter-k 16) + +(defn make-bloomfilter + [bitmap size k] + {:bitmap bitmap + :size size + :k k}) + +(defn parse-bloomfilter + [k] + (domonad parser-m + [bitmap (one-or-more item) + :let [size (count bitmap)] + :when (== 0 (bit-and size (dec size)))] + (make-bloomfilter (decode-uint bitmap) size k)))
\ No newline at end of file diff --git a/src/org/gnu/clojure/gnunet/core.clj b/src/org/gnu/clojure/gnunet/core.clj index 10d61b2..f13ec9f 100644 --- a/src/org/gnu/clojure/gnunet/core.clj +++ b/src/org/gnu/clojure/gnunet/core.clj @@ -150,11 +150,11 @@ [sequence-number parse-uint32 inbound-bw-limit parse-uint32 timestamp parse-date - message-bytes (none-or-more item)] + messages (none-or-more parse-message)] {:sequence-number sequence-number :inbound-bw-limit inbound-bw-limit :timestamp timestamp - :bytes message-bytes}) + :messages messages}) plaintext))] :when message] message)) @@ -304,11 +304,14 @@ (defn admit-core-message! [peer remote-peer message] - (send (:state-agent remote-peer) - (fn [state] - ;; TODO: update bandwidth tracking - state - ))) + (if-let [dispatchers ((deref (:dispatch-agent peer)) + (:message-type message))] + (doseq [dispatcher! dispatchers] + (dispatcher! peer remote-peer message)) + (do + (.write *out* "No dispatcher for message type ") + (.write *out* (.toString (:message-type message))) + (.write *out* "\n")))) (defn handle-core-encrypted-message! [peer remote-peer message] @@ -333,14 +336,18 @@ (if (bit-test bitmap bit) state (do - (admit-core-message! peer remote-peer message) + ;; TODO: update bandwidth tracking + (doseq [message (:messages message)] + (admit-core-message! peer remote-peer message)) (assoc state :last-packets-bitmap (bit-or bitmap bit))))) (< last-seqnum-received seqnum) (let [bitmap (.intValue (bit-shift-left (bigint (:last-packets-bitmap state)) (- seqnum last-seqnum-received)))] - (admit-core-message! peer remote-peer message) + ;; TODO: update bandwidth tracking + (doseq [message (:messages message)] + (admit-core-message! peer remote-peer message)) (conj state {:last-packets-bitmap bitmap :last-sequence-number-received seqnum})) :else state)) diff --git a/src/org/gnu/clojure/gnunet/filesharing.clj b/src/org/gnu/clojure/gnunet/filesharing.clj new file mode 100644 index 0000000..16c22b4 --- /dev/null +++ b/src/org/gnu/clojure/gnunet/filesharing.clj @@ -0,0 +1,74 @@ +(ns org.gnu.clojure.gnunet.filesharing + (:use (org.gnu.clojure.gnunet bloomfilter crypto message parser peer) + clojure.contrib.monads)) + +(def message-type-fs-get 137) +(def message-type-fs-put 138) +(def message-type-fs-migration-stop 139) + +(def bit-return-to 1) +(def bit-sks-namespace 2) +(def bit-transmit-to 3) + +(def parse-get-message + (domonad parser-m + [block-type parse-int32 + priority parse-uint32 + ttl parse-int32 + filter-mutator parse-int32 + hash-bitmap parse-int32 + query (items hash-size) + return-to (conditional (bit-test hash-bitmap bit-return-to) + (items id-size)) + sks-namespace (conditional (bit-test hash-bitmap bit-sks-namespace) + (items hash-size)) + transmit-to (conditional (bit-test hash-bitmap bit-transmit-to) + (items id-size)) + bloomfilter (optional (parse-bloomfilter bloomfilter-k))] + {:block-type block-type + :priority priority + :ttl ttl + :filter-mutator filter-mutator + :query query + :return-to return-to + :sks-namespace sks-namespace + :transmit-to transmit-to + :bloomfilter bloomfilter})) + +(defn bound-priority + [priority remote-peer] + ) + +(defn admit-get! + [peer remote-peer message] + (when-let [get-message (first (parse-get-message (:bytes message)))] + (.write *out* (.toString get-message)) + (.write *out* "\n") + ) + (when-let [return-to (if (:return-to message) + ((deref (:remote-peers-agent peer)) + (:return-to message)) + remote-peer)] + (when-let [priority (bound-priority (:priority message) remote-peer)] + ))) + + +(defn admit-put! + [peer remote-peer message]) + +(defn admit-migration-stop! + [peer remote-peer message]) + +(defn activate-filesharing! + [peer] + (send (:dispatch-agent peer) + (fn [dispatchers] + (let [get-dispatchers (dispatchers message-type-fs-get #{}) + put-dispatchers (dispatchers message-type-fs-put #{}) + migration-stop-dispatchers + (dispatchers message-type-fs-migration-stop #{})] + (conj dispatchers + {message-type-fs-get (conj get-dispatchers admit-get!) + message-type-fs-put (conj put-dispatchers admit-put!) + message-type-fs-migration-stop (conj migration-stop-dispatchers + admit-migration-stop!)}))))) diff --git a/src/org/gnu/clojure/gnunet/parser.clj b/src/org/gnu/clojure/gnunet/parser.clj index b4899fa..c96b293 100644 --- a/src/org/gnu/clojure/gnunet/parser.clj +++ b/src/org/gnu/clojure/gnunet/parser.clj @@ -38,6 +38,13 @@ (with-monad parser-m (m-plus mv (m-result nil)))) +(defn conditional + "Returns the parser on a condition, otherwise a parser that returns nil." + [condition mv] + (if condition + mv + (with-monad parser-m (m-result nil)))) + (defn none-or-more "Makes a parser repeat none or more times." [mv] diff --git a/src/org/gnu/clojure/gnunet/peer.clj b/src/org/gnu/clojure/gnunet/peer.clj index b9fff19..a6c8ba5 100644 --- a/src/org/gnu/clojure/gnunet/peer.clj +++ b/src/org/gnu/clojure/gnunet/peer.clj @@ -42,6 +42,9 @@ ;; agent of a map of transport names (String) to maps of {:emit-message!} :transports-agent + ;; agent of a map of message types to sets of dispatch handlers + :dispatch-agent + ;; java.nio.channels.Selector :selector @@ -86,7 +89,8 @@ :transport-addresses-agent (agent {}) :private-key (.getPrivate (:keypair options)) :remote-peers-agent (agent {}) - :transports-agent (agent nil) + :transports-agent (agent {}) + :dispatch-agent (agent {}) :selector selector :selector-thread (Thread. (partial selector-loop! selector continuations)) :selector-continuations-queue continuations diff --git a/src/org/gnu/clojure/gnunet/transport.clj b/src/org/gnu/clojure/gnunet/transport.clj index 8e368a8..6eca59a 100644 --- a/src/org/gnu/clojure/gnunet/transport.clj +++ b/src/org/gnu/clojure/gnunet/transport.clj @@ -304,14 +304,6 @@ (defn admit-message! [peer sender-id address message] - (let [string-builder (StringBuilder. "Received message type ")] - (.append string-builder (:message-type message)) - (.append string-builder " from ") - (.append string-builder (vec (:encoded-address address))) - (.append string-builder " id ") - (.append string-builder sender-id) - (.append string-builder "\n") - (.write *out* (.toString string-builder))) (send (:remote-peers-agent peer) (fn [remote-peers] (let [remote-peers (update-remote-peers remote-peers |