From fdaf13916b30df6daf377dab845ef16e04dc961b Mon Sep 17 00:00:00 2001 From: David Barksdale Date: Tue, 23 Aug 2011 23:09:01 -0500 Subject: Some work on topology. Fixed tcp so it actually compiles. --- src/main/clojure/org/gnu/clojure/gnunet/core.clj | 7 +++- .../clojure/org/gnu/clojure/gnunet/filesharing.clj | 16 +++----- src/main/clojure/org/gnu/clojure/gnunet/peer.clj | 9 ++++ src/main/clojure/org/gnu/clojure/gnunet/tcp.clj | 48 ++++++++++++---------- .../clojure/org/gnu/clojure/gnunet/topology.clj | 14 ++++++- .../clojure/org/gnu/clojure/gnunet/transport.clj | 27 +++++++----- src/main/clojure/org/gnu/clojure/gnunet/util.clj | 18 ++++++-- 7 files changed, 88 insertions(+), 51 deletions(-) (limited to 'src') diff --git a/src/main/clojure/org/gnu/clojure/gnunet/core.clj b/src/main/clojure/org/gnu/clojure/gnunet/core.clj index 3980dc7..b7bf839 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/core.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/core.clj @@ -1,5 +1,5 @@ (ns org.gnu.clojure.gnunet.core - (:use (org.gnu.clojure.gnunet crypto exception message parser peer) + (:use (org.gnu.clojure.gnunet crypto exception message parser peer util) clojure.contrib.monads) (:import (java.util Date Calendar))) @@ -381,3 +381,8 @@ (emit-messages! peer remote-peer [{:messsage-type message-type-core-encrypted-message :bytes encoded-message}])))))) + +(defn core-register-dispatchers! + [peer new-dispatchers] + (send (:dispatch-agent peer) + conj-vals #{} new-dispatchers)) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj b/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj index c05c36c..a540406 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj @@ -206,14 +206,8 @@ (defn activate-filesharing! [peer] - (send (:dispatch-agent peer) - (fn [dispatchers] - (let [get-dispatchers (dispatchers message-type-fs-get #{}) - put-dispatchers (dispatchers message-type-fs-put #{}) - migration-stop-dispatchers - (dispatchers message-type-fs-migration-stop #{})] - (conj dispatchers - {message-type-fs-get (conj get-dispatchers admit-get!) - message-type-fs-put (conj put-dispatchers admit-put!) - message-type-fs-migration-stop (conj migration-stop-dispatchers - admit-migration-stop!)}))))) + (core-register-dispatchers! + peer + [message-type-fs-get admit-get! + message-type-fs-put admit-put! + message-type-migration-stop admit-migration-stop!])) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj index 8f3b645..45398d8 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj @@ -56,6 +56,14 @@ ;; agent of a map of transport names (String) to maps of {:emit-message!} :transports-agent + ;; { + ;; :new-peer-callbacks + ;; :peer-changed-callbacks + ;; :new-address-callbacks + ;; :address-changed-callbacks + ;; } + :topology-agent + ;; agent of a map of message types to sets of dispatch handlers :dispatch-agent @@ -141,6 +149,7 @@ :private-key (:private-key options) :remote-peers-agent (agent {}) :transports-agent (agent {}) + :topology-agent (agent {}) :dispatch-agent (agent {}) :selector selector :selector-thread (Thread. (partial selector-loop! selector callbacks)) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj index aa0b76c..735bfca 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj @@ -35,7 +35,8 @@ _ (set-val nil (disj pending-connections connection)) :when-let [remote-peer-id (deref (:remote-peer-id-atom connection))] remote-peer-connections (fetch-val remote-peer-id) - _ (set-val remote-peer-id (disj remote-peer-connections connection))])) + _ (set-val remote-peer-id (disj remote-peer-connections connection))] + nil)) (defn update-selection-key! [selection-key ops & attachment] @@ -70,16 +71,17 @@ (update-selection-key! (:selection-key connection) (bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE)) - (send-do-exception-m + (send-do-exception-m! (:connections-agent transport) [pending-connections (fetch-val nil) _ (set-val nil (disj pending-connections connection)) remote-peer-connections (fetch-val remote-peer-id (hash-set)) _ (set-val remote-peer-id (conj remote-peer-connections - connection))]))) + connection))] + nil))) (when-let [remote-peer-id (deref (:remote-peer-id-atom connection))] (let [address {:transport "tcp" - :encoded-address encoded-address + :encoded-address (:encoded-address connection) :expiration (idle-connection-timeout)}] (admit-message! peer remote-peer-id address message))))) @@ -128,7 +130,7 @@ (.poll ((deref (:sessions-agent transport)) remote-peer-id)) packet)] (if (nil? packet) - (.interestOps selection-key SelectionKey/OP_READ) + (.interestOps (:selection-key connection) SelectionKey/OP_READ) (try (.write (:socket-channel connection) (ByteBuffer/wrap (byte-array (:bytes packet)))) @@ -140,15 +142,16 @@ (defn handle-socket-channel-selected! [peer transport connection] ;; This is always called from inside the selector thread - (try - (if (.isConnectable selection-key) - (handle-socket-channel-connectable! peer transport connection)) - (if (.isReadable selection-key) - (handle-socket-channel-readable! peer transport connection)) - (if (.isWritable selection-key) - (handle-socket-channel-writable! peer transport connection)) - (catch Exception e - (handle-disconnect! peer transport connection)))) + (let [selection-key (:selection-key connection)] + (try + (when (.isConnectable selection-key) + (handle-socket-channel-connectable! peer transport connection)) + (when (.isReadable selection-key) + (handle-socket-channel-readable! peer transport connection)) + (when (.isWritable selection-key) + (handle-socket-channel-writable! peer transport connection)) + (catch Exception e + (handle-disconnect! peer transport connection))))) (defn connect! [peer transport remote-peer encoded-address] @@ -166,7 +169,6 @@ :send-queue send-queue :received-bytes-atom (atom nil) :remote-peer-id-atom (atom remote-peer-id)}] - (enqueue-welcome-message! peer connection) (.add send-queue {:bytes (generate-welcome-message peer) :callback! skip}) @@ -178,11 +180,11 @@ (.connect socket-channel address) (catch Exception e (handle-disconnect! peer transport connection))) - (send-do-exception-m! + (send (:connections-agent transport) - [remote-peer-connections (fetch-val remote-peer-id (hash-set)) - _ (set-val remote-peer-id - (conj remote-peer-connections connection))])))) + conj-vals + (hash-set) + [remote-peer-id connection])))) (.wakeup (:selector peer)))) (defn set-write-interest-or-connect! @@ -245,9 +247,11 @@ selection-key SelectionKey/OP_READ (partial handle-socket-channel-selected! peer transport connection)) - (send-do-exception-m! (:connections-agent transport) - [pending-connections (fetch-val nil (hash-set)) - _ (set-val nil (conj pending-connections connection))])) + (send + (:connections-agent transport) + conj-vals + (hash-set) + [nil connection])) (catch Exception e nil))) (defn register-server-channel! diff --git a/src/main/clojure/org/gnu/clojure/gnunet/topology.clj b/src/main/clojure/org/gnu/clojure/gnunet/topology.clj index 96bbff9..9ac9f70 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/topology.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/topology.clj @@ -1,5 +1,7 @@ (ns org.gnu.clojure.gnunet.topology - (:use (org.gnu.clojure.gnunet peer transport))) + (:use clojure.contrib.monads + (org.gnu.clojure.gnunet exception peer transport util)) + (:import java.util.Date)) (defn verify-transport-address! [peer remote-peer address] @@ -33,6 +35,14 @@ verify-transport-addresses! peer remote-peer)) remote-peers) +(defn new-peer-callback! + [peer remote-peer] + ) + (defn activate-topology! [peer] - ) + (send + (:topology-agent peer) + conj-vals + #{} + [:new-peer-callbacks new-peer-callback!])) diff --git a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj index 642b362..c323dd8 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj @@ -1,6 +1,6 @@ (ns org.gnu.clojure.gnunet.transport (:use (org.gnu.clojure.gnunet core crypto exception message metrics parser - peer util) + peer topology_events util) clojure.contrib.monads) (:import (java.util Date Calendar))) @@ -183,7 +183,7 @@ (concat (list-transport-addresses addresses) new-addresses)))) (defn update-remote-peers! - [remote-peers peer-id hello] + [remote-peers peer peer-id hello] (if-let [remote-peer (remote-peers peer-id)] (do (if (:public-key hello) @@ -193,21 +193,26 @@ update-transport-addresses (:transport-addresses hello)) remote-peers) - (assoc remote-peers peer-id - (struct-map remote-peer-struct - :public-key-atom (atom (:public-key hello)) - :id peer-id - :transport-addresses-agent (agent - (merge-transport-addresses {} - (:transport-addresses hello))) - :state-agent (agent {:is-connected false}))))) + (let + [remote-peer + (struct-map + remote-peer-struct + :public-key-atom (atom (:public-key hello)) + :id peer-id + :transport-addresses-agent (agent + (merge-transport-addresses + {} (:transport-addresses hello))) + :state-agent (agent {:is-connected false}))] + (notify-new-remote-peer! peer remote-peer) + (assoc remote-peers peer-id remote-peer)))) (defn admit-hello! "Updates peer:remote-peers-agent with new information contained in hello." [peer hello] (let [peer-id (generate-id (:public-key hello))] (when-not (= peer-id (:id peer)) - (send (:remote-peers-agent peer) update-remote-peers! peer-id hello)))) + (send (:remote-peers-agent peer) + update-remote-peers! peer peer-id hello)))) (defn handle-hello! [peer message] diff --git a/src/main/clojure/org/gnu/clojure/gnunet/util.clj b/src/main/clojure/org/gnu/clojure/gnunet/util.clj index 34fe34d..9914dfd 100644 --- a/src/main/clojure/org/gnu/clojure/gnunet/util.clj +++ b/src/main/clojure/org/gnu/clojure/gnunet/util.clj @@ -36,7 +36,17 @@ (defn assoc-deep "Associates val with the 'path' of keys in a nested map." - [map val key & keys] - (if (nil? keys) - (assoc map key val) - (assoc map key (apply assoc-deep (map key) val keys)))) + [_map _val _key & _keys] + (if (nil? _keys) + (assoc _map _key _val) + (assoc _map _key (apply assoc-deep (_map _key) _val _keys)))) + +(defn conj-vals + [_map zero kvs] + (reduce + (fn [_map kv] + (let [_key (first kv) + old-val (_map _key zero)] + (assoc _map _key + (conj old-val (second kv))))) + _map (partition 2 kvs))) -- cgit v1.2.3-18-g5258