summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDavid Barksdale <amatus.amongus@gmail.com>2011-08-23 23:09:01 -0500
committerDavid Barksdale <amatus.amongus@gmail.com>2011-08-23 23:09:01 -0500
commitfdaf13916b30df6daf377dab845ef16e04dc961b (patch)
tree933144234de3ffc1ad23480011c39c448c322bff /src
parent0d5338d650c4bfb2e348a9ba2fc9f96d95338bb4 (diff)
Some work on topology. Fixed tcp so it actually compiles.
Diffstat (limited to 'src')
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/core.clj7
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj16
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/peer.clj9
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/tcp.clj48
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/topology.clj14
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/transport.clj27
-rw-r--r--src/main/clojure/org/gnu/clojure/gnunet/util.clj18
7 files changed, 88 insertions, 51 deletions
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/core.clj b/src/main/clojure/org/gnu/clojure/gnunet/core.clj
index 3980dc7..b7bf839 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/core.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/core.clj
@@ -1,5 +1,5 @@
(ns org.gnu.clojure.gnunet.core
- (:use (org.gnu.clojure.gnunet crypto exception message parser peer)
+ (:use (org.gnu.clojure.gnunet crypto exception message parser peer util)
clojure.contrib.monads)
(:import (java.util Date Calendar)))
@@ -381,3 +381,8 @@
(emit-messages! peer remote-peer
[{:messsage-type message-type-core-encrypted-message
:bytes encoded-message}]))))))
+
+(defn core-register-dispatchers!
+ [peer new-dispatchers]
+ (send (:dispatch-agent peer)
+ conj-vals #{} new-dispatchers))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj b/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj
index c05c36c..a540406 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/filesharing.clj
@@ -206,14 +206,8 @@
(defn activate-filesharing!
[peer]
- (send (:dispatch-agent peer)
- (fn [dispatchers]
- (let [get-dispatchers (dispatchers message-type-fs-get #{})
- put-dispatchers (dispatchers message-type-fs-put #{})
- migration-stop-dispatchers
- (dispatchers message-type-fs-migration-stop #{})]
- (conj dispatchers
- {message-type-fs-get (conj get-dispatchers admit-get!)
- message-type-fs-put (conj put-dispatchers admit-put!)
- message-type-fs-migration-stop (conj migration-stop-dispatchers
- admit-migration-stop!)})))))
+ (core-register-dispatchers!
+ peer
+ [message-type-fs-get admit-get!
+ message-type-fs-put admit-put!
+ message-type-migration-stop admit-migration-stop!]))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
index 8f3b645..45398d8 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/peer.clj
@@ -56,6 +56,14 @@
;; agent of a map of transport names (String) to maps of {:emit-message!}
:transports-agent
+ ;; {
+ ;; :new-peer-callbacks
+ ;; :peer-changed-callbacks
+ ;; :new-address-callbacks
+ ;; :address-changed-callbacks
+ ;; }
+ :topology-agent
+
;; agent of a map of message types to sets of dispatch handlers
:dispatch-agent
@@ -141,6 +149,7 @@
:private-key (:private-key options)
:remote-peers-agent (agent {})
:transports-agent (agent {})
+ :topology-agent (agent {})
:dispatch-agent (agent {})
:selector selector
:selector-thread (Thread. (partial selector-loop! selector callbacks))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
index aa0b76c..735bfca 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/tcp.clj
@@ -35,7 +35,8 @@
_ (set-val nil (disj pending-connections connection))
:when-let [remote-peer-id (deref (:remote-peer-id-atom connection))]
remote-peer-connections (fetch-val remote-peer-id)
- _ (set-val remote-peer-id (disj remote-peer-connections connection))]))
+ _ (set-val remote-peer-id (disj remote-peer-connections connection))]
+ nil))
(defn update-selection-key!
[selection-key ops & attachment]
@@ -70,16 +71,17 @@
(update-selection-key!
(:selection-key connection)
(bit-or SelectionKey/OP_READ SelectionKey/OP_WRITE))
- (send-do-exception-m
+ (send-do-exception-m!
(:connections-agent transport)
[pending-connections (fetch-val nil)
_ (set-val nil (disj pending-connections connection))
remote-peer-connections (fetch-val remote-peer-id (hash-set))
_ (set-val remote-peer-id (conj remote-peer-connections
- connection))])))
+ connection))]
+ nil)))
(when-let [remote-peer-id (deref (:remote-peer-id-atom connection))]
(let [address {:transport "tcp"
- :encoded-address encoded-address
+ :encoded-address (:encoded-address connection)
:expiration (idle-connection-timeout)}]
(admit-message! peer remote-peer-id address message)))))
@@ -128,7 +130,7 @@
(.poll ((deref (:sessions-agent transport)) remote-peer-id))
packet)]
(if (nil? packet)
- (.interestOps selection-key SelectionKey/OP_READ)
+ (.interestOps (:selection-key connection) SelectionKey/OP_READ)
(try
(.write (:socket-channel connection)
(ByteBuffer/wrap (byte-array (:bytes packet))))
@@ -140,15 +142,16 @@
(defn handle-socket-channel-selected!
[peer transport connection]
;; This is always called from inside the selector thread
- (try
- (if (.isConnectable selection-key)
- (handle-socket-channel-connectable! peer transport connection))
- (if (.isReadable selection-key)
- (handle-socket-channel-readable! peer transport connection))
- (if (.isWritable selection-key)
- (handle-socket-channel-writable! peer transport connection))
- (catch Exception e
- (handle-disconnect! peer transport connection))))
+ (let [selection-key (:selection-key connection)]
+ (try
+ (when (.isConnectable selection-key)
+ (handle-socket-channel-connectable! peer transport connection))
+ (when (.isReadable selection-key)
+ (handle-socket-channel-readable! peer transport connection))
+ (when (.isWritable selection-key)
+ (handle-socket-channel-writable! peer transport connection))
+ (catch Exception e
+ (handle-disconnect! peer transport connection)))))
(defn connect!
[peer transport remote-peer encoded-address]
@@ -166,7 +169,6 @@
:send-queue send-queue
:received-bytes-atom (atom nil)
:remote-peer-id-atom (atom remote-peer-id)}]
- (enqueue-welcome-message! peer connection)
(.add send-queue
{:bytes (generate-welcome-message peer)
:callback! skip})
@@ -178,11 +180,11 @@
(.connect socket-channel address)
(catch Exception e
(handle-disconnect! peer transport connection)))
- (send-do-exception-m!
+ (send
(:connections-agent transport)
- [remote-peer-connections (fetch-val remote-peer-id (hash-set))
- _ (set-val remote-peer-id
- (conj remote-peer-connections connection))]))))
+ conj-vals
+ (hash-set)
+ [remote-peer-id connection]))))
(.wakeup (:selector peer))))
(defn set-write-interest-or-connect!
@@ -245,9 +247,11 @@
selection-key
SelectionKey/OP_READ
(partial handle-socket-channel-selected! peer transport connection))
- (send-do-exception-m! (:connections-agent transport)
- [pending-connections (fetch-val nil (hash-set))
- _ (set-val nil (conj pending-connections connection))]))
+ (send
+ (:connections-agent transport)
+ conj-vals
+ (hash-set)
+ [nil connection]))
(catch Exception e nil)))
(defn register-server-channel!
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/topology.clj b/src/main/clojure/org/gnu/clojure/gnunet/topology.clj
index 96bbff9..9ac9f70 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/topology.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/topology.clj
@@ -1,5 +1,7 @@
(ns org.gnu.clojure.gnunet.topology
- (:use (org.gnu.clojure.gnunet peer transport)))
+ (:use clojure.contrib.monads
+ (org.gnu.clojure.gnunet exception peer transport util))
+ (:import java.util.Date))
(defn verify-transport-address!
[peer remote-peer address]
@@ -33,6 +35,14 @@
verify-transport-addresses! peer remote-peer))
remote-peers)
+(defn new-peer-callback!
+ [peer remote-peer]
+ )
+
(defn activate-topology!
[peer]
- )
+ (send
+ (:topology-agent peer)
+ conj-vals
+ #{}
+ [:new-peer-callbacks new-peer-callback!]))
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
index 642b362..c323dd8 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/transport.clj
@@ -1,6 +1,6 @@
(ns org.gnu.clojure.gnunet.transport
(:use (org.gnu.clojure.gnunet core crypto exception message metrics parser
- peer util)
+ peer topology_events util)
clojure.contrib.monads)
(:import (java.util Date Calendar)))
@@ -183,7 +183,7 @@
(concat (list-transport-addresses addresses) new-addresses))))
(defn update-remote-peers!
- [remote-peers peer-id hello]
+ [remote-peers peer peer-id hello]
(if-let [remote-peer (remote-peers peer-id)]
(do
(if (:public-key hello)
@@ -193,21 +193,26 @@
update-transport-addresses
(:transport-addresses hello))
remote-peers)
- (assoc remote-peers peer-id
- (struct-map remote-peer-struct
- :public-key-atom (atom (:public-key hello))
- :id peer-id
- :transport-addresses-agent (agent
- (merge-transport-addresses {}
- (:transport-addresses hello)))
- :state-agent (agent {:is-connected false})))))
+ (let
+ [remote-peer
+ (struct-map
+ remote-peer-struct
+ :public-key-atom (atom (:public-key hello))
+ :id peer-id
+ :transport-addresses-agent (agent
+ (merge-transport-addresses
+ {} (:transport-addresses hello)))
+ :state-agent (agent {:is-connected false}))]
+ (notify-new-remote-peer! peer remote-peer)
+ (assoc remote-peers peer-id remote-peer))))
(defn admit-hello!
"Updates peer:remote-peers-agent with new information contained in hello."
[peer hello]
(let [peer-id (generate-id (:public-key hello))]
(when-not (= peer-id (:id peer))
- (send (:remote-peers-agent peer) update-remote-peers! peer-id hello))))
+ (send (:remote-peers-agent peer)
+ update-remote-peers! peer peer-id hello))))
(defn handle-hello!
[peer message]
diff --git a/src/main/clojure/org/gnu/clojure/gnunet/util.clj b/src/main/clojure/org/gnu/clojure/gnunet/util.clj
index 34fe34d..9914dfd 100644
--- a/src/main/clojure/org/gnu/clojure/gnunet/util.clj
+++ b/src/main/clojure/org/gnu/clojure/gnunet/util.clj
@@ -36,7 +36,17 @@
(defn assoc-deep
"Associates val with the 'path' of keys in a nested map."
- [map val key & keys]
- (if (nil? keys)
- (assoc map key val)
- (assoc map key (apply assoc-deep (map key) val keys))))
+ [_map _val _key & _keys]
+ (if (nil? _keys)
+ (assoc _map _key _val)
+ (assoc _map _key (apply assoc-deep (_map _key) _val _keys))))
+
+(defn conj-vals
+ [_map zero kvs]
+ (reduce
+ (fn [_map kv]
+ (let [_key (first kv)
+ old-val (_map _key zero)]
+ (assoc _map _key
+ (conj old-val (second kv)))))
+ _map (partition 2 kvs)))