diff options
-rw-r--r-- | src/testbed/gnunet-service-testbed.h | 107 | ||||
-rw-r--r-- | src/testbed/gnunet-service-testbed_cache.c | 791 | ||||
-rw-r--r-- | src/testbed/gnunet-service-testbed_connectionpool.c | 59 | ||||
-rw-r--r-- | src/testbed/gnunet-service-testbed_oc.c | 57 |
4 files changed, 78 insertions, 936 deletions
diff --git a/src/testbed/gnunet-service-testbed.h b/src/testbed/gnunet-service-testbed.h index 55861faba0..7966864b6e 100644 --- a/src/testbed/gnunet-service-testbed.h +++ b/src/testbed/gnunet-service-testbed.h @@ -834,113 +834,6 @@ GST_cache_add_hello (const unsigned int peer_id, /** - * Functions of this type are called when the needed handle is available for - * usage. These functions are to be registered with either of the functions - * GST_cache_get_handle_transport() or GST_cache_get_handle_core(). The - * corresponding handles will be set and if they are not, then it signals an - * error while opening the handles. - * - * @param cls the closure passed to GST_cache_get_handle_transport() or - * GST_cache_get_handle_core() - * @param ch the handle to CORE. Can be NULL if it is not requested - * @param th the handle to TRANSPORT. Can be NULL if it is not requested - * @param peer_id the identity of the peer. Will be NULL if ch is NULL. In other - * cases, its value being NULL means that CORE connection has failed. - */ -typedef void (*GST_cache_handle_ready_cb) (void *cls, - struct GNUNET_CORE_Handle * ch, - struct GNUNET_TRANSPORT_Handle * th, - const struct GNUNET_PeerIdentity * - peer_id); - - -/** - * Callback to notify when the target peer given to - * GST_cache_get_handle_transport() is connected. Note that this callback may - * not be called if the target peer is already connected. Use - * GNUNET_TRANSPORT_check_neighbour_connected() to check if the target peer is - * already connected or not. This callback will be called only once or never (in - * case the target cannot be connected). - * - * @param cls the closure given to GST_cache_get_handle_done() for this callback - * @param target the peer identity of the target peer. The pointer should be - * valid until GST_cache_get_handle_done() is called. - */ -typedef void (*GST_cache_peer_connect_notify) (void *cls, - const struct GNUNET_PeerIdentity - * target); - - -/** - * Get a transport handle with the given configuration. If the handle is already - * cached before, it will be retured in the given callback; the peer_id is used to lookup in the - * cache. If not a new operation is started to open the transport handle and - * will be given in the callback when it is available. - * - * @param peer_id the index of the peer - * @param cfg the configuration with which the transport handle has to be - * created if it was not present in the cache - * @param cb the callback to notify when the transport handle is available - * @param cb_cls the closure for the above callback - * @param target the peer identify of the peer whose connection to our TRANSPORT - * subsystem will be notified through the connect_notify_cb. Can be NULL - * @param connect_notify_cb the callback to call when the given target peer is - * connected. This callback will only be called once or never again (in - * case the target peer cannot be connected). Can be NULL - * @param connect_notify_cb_cls the closure for the above callback - * @return the handle which can be used cancel or mark that the handle is no - * longer being used - */ -struct GSTCacheGetHandle * -GST_cache_get_handle_transport (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GST_cache_handle_ready_cb cb, void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_cache_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls); - - -/** - * Get a CORE handle with the given configuration. If the handle is already - * cached before, it will be retured in the given callback; the peer_id is used - * to lookup in the cache. If the handle is not cached before, a new operation - * is started to open the CORE handle and will be given in the callback when it - * is available along with the peer identity - * - * @param peer_id the index of the peer - * @param cfg the configuration with which the transport handle has to be - * created if it was not present in the cache - * @param cb the callback to notify when the transport handle is available - * @param cb_cls the closure for the above callback - * @param target the peer identify of the peer whose connection to our CORE - * subsystem will be notified through the connect_notify_cb. Can be NULL - * @param connect_notify_cb the callback to call when the given target peer is - * connected. This callback will only be called once or never again (in - * case the target peer cannot be connected). Can be NULL - * @param connect_notify_cb_cls the closure for the above callback - * @return the handle which can be used cancel or mark that the handle is no - * longer being used - */ -struct GSTCacheGetHandle * -GST_cache_get_handle_core (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GST_cache_handle_ready_cb cb, void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_cache_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls); - - -/** - * Mark the GetCacheHandle as being done if a handle has been provided already - * or as being cancelled if the callback for the handle hasn't been called. - * - * @param cgh the CacheGetHandle handle - */ -void -GST_cache_get_handle_done (struct GSTCacheGetHandle *cgh); - - -/** * Initialize logging CPU and IO statisticfs. Checks the configuration for * "STATS_DIR" and logs to a file in that directory. The file is name is * generated from the hostname and the process's PID. diff --git a/src/testbed/gnunet-service-testbed_cache.c b/src/testbed/gnunet-service-testbed_cache.c index 9db2155fc9..9f2b15579e 100644 --- a/src/testbed/gnunet-service-testbed_cache.c +++ b/src/testbed/gnunet-service-testbed_cache.c @@ -43,115 +43,6 @@ /** - * Type of cache-get requests - */ -enum CacheGetType -{ - /** - * Get transport handle - */ - CGT_TRANSPORT_HANDLE = 1, - - /** - * Get core handle - */ - CGT_CORE_HANDLE -}; - - -/** - * The cache-get request handle - */ -struct GSTCacheGetHandle; - - -/** - * This context structure is used to maintain a queue of notifications to check - * which of them are to be notified when a peer is connected. - */ -struct ConnectNotifyContext -{ - /** - * The next ptr for the DLL - */ - struct ConnectNotifyContext *next; - - /** - * The prev ptr for the DLL - */ - struct ConnectNotifyContext *prev; - - /** - * The peer identity of the target peer. When this target peer is connected, - * call the notify callback - */ - const struct GNUNET_PeerIdentity *target; - - /** - * The notify callback to be called when the target peer is connected - */ - GST_cache_peer_connect_notify cb; - - /** - * The closure for the notify callback - */ - void *cb_cls; - - /** - * The GSTCacheGetHandle reposible for creating this context - */ - struct GSTCacheGetHandle *cgh; - -}; - - -/** - * The cache-get request handle - */ -struct GSTCacheGetHandle -{ - /** - * The next ptr for the DLL. Used in struct CacheEntry - */ - struct GSTCacheGetHandle *next; - - /** - * The prev ptr for the DLL. Used in struct CacheEntry - */ - struct GSTCacheGetHandle *prev; - - /** - * The cache entry object this handle corresponds to - */ - struct CacheEntry *entry; - - /** - * The cache callback to call when a handle is available - */ - GST_cache_handle_ready_cb cb; - - /** - * The closure for the above callback - */ - void *cb_cls; - - /** - * The peer connect notify context created for this handle; can be NULL - */ - struct ConnectNotifyContext *nctxt; - - /** - * The type of this cache-get request - */ - enum CacheGetType type; - - /** - * Did we call the cache callback already? - */ - int notify_called; -}; - -/** * Cache entry */ struct CacheEntry @@ -167,26 +58,6 @@ struct CacheEntry struct CacheEntry *prev; /** - * The transport handle to the peer corresponding to this entry; can be NULL - */ - struct GNUNET_TRANSPORT_Handle *transport_handle; - - /** - * The operation handle for transport handle - */ - struct GNUNET_TESTBED_Operation *transport_op; - - /** - * The core handle to the peer corresponding to this entry; can be NULL - */ - struct GNUNET_CORE_Handle *core_handle; - - /** - * The operation handle for core handle - */ - struct GNUNET_TESTBED_Operation *core_op; - - /** * The peer identity of this peer. Will be set upon opening a connection to * the peers CORE service. Will be NULL until then and after the CORE * connection is closed @@ -194,12 +65,6 @@ struct CacheEntry struct GNUNET_PeerIdentity *peer_identity; /** - * The configuration of the peer. Should be not NULL as long as the core_handle - * or transport_handle are valid - */ - struct GNUNET_CONFIGURATION_Handle *cfg; - - /** * The key for this entry */ struct GNUNET_HashCode key; @@ -210,44 +75,6 @@ struct CacheEntry struct GNUNET_MessageHeader *hello; /** - * the head of the CacheGetHandle queue - */ - struct GSTCacheGetHandle *cgh_qhead; - - /** - * the tail of the CacheGetHandle queue - */ - struct GSTCacheGetHandle *cgh_qtail; - - /** - * DLL head for the queue of notifications contexts to check which of them are to - * be notified when a peer is connected. - */ - struct ConnectNotifyContext *nctxt_qhead; - - /** - * DLL tail for the queue of notifications contexts to check which of them are to - * be notified when a peer is connected. - */ - struct ConnectNotifyContext *nctxt_qtail; - - /** - * The task that calls the cache callback - */ - GNUNET_SCHEDULER_TaskIdentifier notify_task; - - /** - * The task to expire this cache entry, free any handlers it has opened and - * mark their corresponding operations as done. - */ - GNUNET_SCHEDULER_TaskIdentifier expire_task; - - /** - * Number of operations this cache entry is being used - */ - unsigned int demand; - - /** * The id of the peer this entry corresponds to */ unsigned int peer_id; @@ -316,77 +143,6 @@ cache_lookup (const struct GNUNET_HashCode *key) /** - * Function to disconnect the core and transport handles; free the existing - * configuration; and remove from the LRU cache list. The entry is left to be in - * the hash table so that the HELLO can still be found later - * - * @param entry the cache entry - */ -static void -close_handles (struct CacheEntry *entry) -{ - struct ConnectNotifyContext *ctxt; - - GNUNET_assert (0 == entry->demand); - if (GNUNET_YES == entry->in_lru) - { - GNUNET_assert (0 < lru_cache_size); - if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task) - { - GNUNET_SCHEDULER_cancel (entry->expire_task); - entry->expire_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_CONTAINER_DLL_remove (lru_cache_head, lru_cache_tail, entry); - lru_cache_size--; - entry->in_lru = GNUNET_NO; - } - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == entry->expire_task); - while (NULL != (ctxt = entry->nctxt_qhead)) - { - GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, ctxt); - GNUNET_free (ctxt); - } - LOG_DEBUG ("Cleaning up handles from an entry in cache\n"); - if (NULL != entry->transport_handle) - GNUNET_assert (NULL != entry->transport_op); - if (NULL != entry->transport_op) - { - GNUNET_TESTBED_operation_done (entry->transport_op); - entry->transport_op = NULL; - } - if (NULL != entry->core_op) - { - GNUNET_TESTBED_operation_done (entry->core_op); - entry->core_op = NULL; - } - GNUNET_assert (NULL == entry->core_handle); - if (NULL != entry->cfg) - { - GNUNET_CONFIGURATION_destroy (entry->cfg); - entry->cfg = NULL; - } -} - - -/** - * The task to expire this cache entry, free any handlers it has opened and - * mark their corresponding operations as done. - * - * @param cls the CacheEntry - * @param tc the scheduler task context - */ -static void -expire_cache_entry (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CacheEntry *entry = cls; - - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->expire_task); - entry->expire_task = GNUNET_SCHEDULER_NO_TASK; - close_handles (entry); -} - - -/** * Creates a new cache entry and then puts it into the cache's hashtable. * * @param key the hash code to use for inserting the newly created entry @@ -410,414 +166,6 @@ add_entry (const struct GNUNET_HashCode *key, unsigned int peer_id) /** - * Function to find a suitable GSTCacheGetHandle which is waiting for one of the - * handles in given entry to be available. - * - * @param entry the cache entry whose GSTCacheGetHandle list has to be searched - * @param head the starting list element in the GSTCacheGetHandle where the - * search has to be begin - * @return a suitable GSTCacheGetHandle whose handle ready notify callback - * hasn't been called yet. NULL if no such suitable GSTCacheGetHandle - * is found - */ -static struct GSTCacheGetHandle * -search_suitable_cgh (const struct CacheEntry *entry, - const struct GSTCacheGetHandle *head) -{ - const struct GSTCacheGetHandle *cgh; - - for (cgh = head; NULL != cgh; cgh = cgh->next) - { - if (GNUNET_YES == cgh->notify_called) - return NULL; - switch (cgh->type) - { - case CGT_TRANSPORT_HANDLE: - if (NULL == entry->transport_handle) - continue; - break; - case CGT_CORE_HANDLE: - if (NULL == entry->core_handle) - continue; - if (NULL == entry->peer_identity) /* Our CORE connection isn't ready yet */ - continue; - break; - } - break; - } - return (struct GSTCacheGetHandle *) cgh; -} - - -/** - * Task to call the handle ready notify callback of a queued GSTCacheGetHandle - * of an entry when one or all of its handles are available. - * - * @param cls the cache entry - * @param tc the task context from scheduler - */ -static void -call_cgh_cb (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct CacheEntry *entry = cls; - struct GSTCacheGetHandle *cgh; - const struct GSTCacheGetHandle *cgh2; - - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != entry->notify_task); - entry->notify_task = GNUNET_SCHEDULER_NO_TASK; - cgh = search_suitable_cgh (entry, entry->cgh_qhead); - GNUNET_assert (NULL != cgh); - cgh2 = NULL; - if (NULL != cgh->next) - cgh2 = search_suitable_cgh (entry, cgh->next); - GNUNET_CONTAINER_DLL_remove (entry->cgh_qhead, entry->cgh_qtail, cgh); - cgh->notify_called = GNUNET_YES; - GNUNET_CONTAINER_DLL_insert_tail (entry->cgh_qhead, entry->cgh_qtail, cgh); - if (NULL != cgh2) - entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry); - if (NULL != cgh->nctxt) - { /* Register the peer connect notify callback */ - GNUNET_CONTAINER_DLL_insert_tail (entry->nctxt_qhead, entry->nctxt_qtail, - cgh->nctxt); - } - LOG_DEBUG ("Calling notify for handle type %u\n", cgh->type); - cgh->cb (cgh->cb_cls, entry->core_handle, entry->transport_handle, - entry->peer_identity); -} - - -/** - * Function called from peer connect notify callbacks from CORE and TRANSPORT - * connections. This function calls the pendning peer connect notify callbacks - * which are queued in an entry. - * - * @param cls the cache entry - * @param peer the peer that connected - * @param type the type of the handle this notification corresponds to - */ -static void -peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer, - const enum CacheGetType type) -{ - struct CacheEntry *entry = cls; - struct ConnectNotifyContext *ctxt; - struct ConnectNotifyContext *ctxt2; - GST_cache_peer_connect_notify cb; - void *cb_cls; - - - for (ctxt = entry->nctxt_qhead; NULL != ctxt;) - { - GNUNET_assert (NULL != ctxt->cgh); - if (type != ctxt->cgh->type) - { - ctxt = ctxt->next; - continue; - } - if (0 != memcmp (ctxt->target, peer, sizeof (struct GNUNET_PeerIdentity))) - { - ctxt = ctxt->next; - continue; - } - cb = ctxt->cb; - cb_cls = ctxt->cb_cls; - ctxt->cgh->nctxt = NULL; - ctxt2 = ctxt->next; - GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, ctxt); - GNUNET_free (ctxt); - ctxt = ctxt2; - cb (cb_cls, peer); - } - if (NULL == ctxt) - return; - -} - - -/** - * Function called to notify transport users that another - * peer connected to us. - * - * @param cls closure - * @param peer the peer that connected - */ -static void -transport_peer_connect_notify_cb (void *cls, - const struct GNUNET_PeerIdentity *peer) -{ - peer_connect_notify_cb (cls, peer, CGT_TRANSPORT_HANDLE); -} - - -/** - * Function called when resources for opening a connection to TRANSPORT are - * available. - * - * @param cls the cache entry - */ -static void -opstart_get_handle_transport (void *cls) -{ - struct CacheEntry *entry = cls; - - GNUNET_assert (NULL != entry); - LOG_DEBUG ("Opening a transport connection to peer %u\n", entry->peer_id); - entry->transport_handle = - GNUNET_TRANSPORT_connect (entry->cfg, NULL, entry, NULL, - &transport_peer_connect_notify_cb, NULL); - if (NULL == entry->transport_handle) - { - GNUNET_break (0); - return; - } - if (0 == entry->demand) - return; - if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task) - return; - if (NULL != search_suitable_cgh (entry, entry->cgh_qhead)) - entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry); -} - - -/** - * Function called when the operation responsible for opening a TRANSPORT - * connection is marked as done. - * - * @param cls the cache entry - */ -static void -oprelease_get_handle_transport (void *cls) -{ - struct CacheEntry *entry = cls; - - if (NULL == entry->transport_handle) - return; - GNUNET_TRANSPORT_disconnect (entry->transport_handle); - entry->transport_handle = NULL; -} - - -/** - * Function called after GNUNET_CORE_connect has succeeded (or failed - * for good). Note that the private key of the peer is intentionally - * not exposed here; if you need it, your process should try to read - * the private key file directly (which should work if you are - * authorized...). Implementations of this function must not call - * GNUNET_CORE_disconnect (other than by scheduling a new task to - * do this later). - * - * @param cls closure - * @param my_identity ID of this peer, NULL if we failed - */ -static void -core_startup_cb (void *cls, - const struct GNUNET_PeerIdentity *my_identity) -{ - struct CacheEntry *entry = cls; - - if (NULL == my_identity) - { - GNUNET_break (0); - return; - } - GNUNET_assert (NULL == entry->peer_identity); - // FIXME: why is this dynamically allocated? - entry->peer_identity = GNUNET_new (struct GNUNET_PeerIdentity); - memcpy (entry->peer_identity, my_identity, - sizeof (struct GNUNET_PeerIdentity)); - if (0 == entry->demand) - return; - if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task) - return; - if (NULL != search_suitable_cgh (entry, entry->cgh_qhead)) - entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry); -} - - -/** - * Method called whenever a given peer connects at CORE level - * - * @param cls closure - * @param peer peer identity this notification is about - */ -static void -core_peer_connect_cb (void *cls, const struct GNUNET_PeerIdentity *peer) -{ - peer_connect_notify_cb (cls, peer, CGT_CORE_HANDLE); -} - - -/** - * Function called when resources for opening a connection to CORE are - * available. - * - * @param cls the cache entry - */ -static void -opstart_get_handle_core (void *cls) -{ - struct CacheEntry *entry = cls; - - const struct GNUNET_CORE_MessageHandler no_handlers[] = { - {NULL, 0, 0} - }; - - GNUNET_assert (NULL != entry); - LOG_DEBUG ("Opening a CORE connection to peer %u\n", entry->peer_id); - entry->core_handle = - GNUNET_CORE_connect (entry->cfg, entry, /* closure */ - &core_startup_cb, /* core startup notify */ - &core_peer_connect_cb, /* peer connect notify */ - NULL, /* peer disconnect notify */ - NULL, /* inbound notify */ - GNUNET_NO, /* inbound header only? */ - NULL, /* outbound notify */ - GNUNET_NO, /* outbound header only? */ - no_handlers); -} - - -/** - * Function called when the operation responsible for opening a TRANSPORT - * connection is marked as done. - * - * @param cls the cache entry - */ -static void -oprelease_get_handle_core (void *cls) -{ - struct CacheEntry *entry = cls; - - if (NULL == entry->core_handle) - return; - GNUNET_CORE_disconnect (entry->core_handle); - entry->core_handle = NULL; - GNUNET_free_non_null (entry->peer_identity); - entry->peer_identity = NULL; -} - - -/** - * Function to get a handle with given configuration. The type of the handle is - * implicitly provided in the GSTCacheGetHandle. If the handle is already cached - * before, it will be retured in the given callback; the peer_id is used to - * lookup in the cache; if not, a new operation is started to open the transport - * handle and will be given in the callback when it is available. - * - * @param peer_id the index of the peer - * @param cgh the CacheGetHandle - * @param cfg the configuration with which the transport handle has to be - * created if it was not present in the cache - * @param target the peer identify of the peer whose connection to - * TRANSPORT/CORE (depending on the type of 'cgh') subsystem will be - * notified through the connect_notify_cb. Can be NULL - * @param connect_notify_cb the callback to call when the given target peer is - * connected. This callback will only be called once or never again (in - * case the target peer cannot be connected). Can be NULL - * @param connect_notify_cb_cls the closure for the above callback - * @return the handle which can be used to cancel or mark that the handle is no - * longer being used - */ -static struct GSTCacheGetHandle * -cache_get_handle (unsigned int peer_id, struct GSTCacheGetHandle *cgh, - const struct GNUNET_CONFIGURATION_Handle *cfg, - const struct GNUNET_PeerIdentity *target, - GST_cache_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls) -{ - struct GNUNET_HashCode key; - void *handle; - struct CacheEntry *entry; - struct ConnectNotifyContext *ctxt; - struct GNUNET_TESTBED_Operation *op; - - GNUNET_assert (0 != cgh->type); - GNUNET_CRYPTO_hash (&peer_id, sizeof (peer_id), &key); - handle = NULL; - entry = cache_lookup (&key); - if (NULL != entry) - { - if (GNUNET_YES == entry->in_lru) - { - GNUNET_assert (0 == entry->demand); - GNUNET_assert (0 < lru_cache_size); - if (GNUNET_SCHEDULER_NO_TASK != entry->expire_task) - { - GNUNET_SCHEDULER_cancel (entry->expire_task); - entry->expire_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_CONTAINER_DLL_remove (lru_cache_head, lru_cache_tail, entry); - lru_cache_size--; - entry->in_lru = GNUNET_NO; - } - switch (cgh->type) - { - case CGT_TRANSPORT_HANDLE: - handle = entry->transport_handle; - if (NULL != handle) - LOG_DEBUG ("Found TRANSPORT handle in cache for peer %u\n", - entry->peer_id); - break; - case CGT_CORE_HANDLE: - handle = entry->core_handle; - if (NULL != handle) - LOG_DEBUG ("Found CORE handle in cache for peer %u\n", entry->peer_id); - break; - } - } - if (NULL == entry) - entry = add_entry (&key, peer_id); - if (NULL == entry->cfg) - entry->cfg = GNUNET_CONFIGURATION_dup (cfg); - entry->demand++; - cgh->entry = entry; - GNUNET_CONTAINER_DLL_insert (entry->cgh_qhead, entry->cgh_qtail, cgh); - if ((NULL != target) && (NULL != connect_notify_cb)) - { - ctxt = GNUNET_malloc (sizeof (struct ConnectNotifyContext)); - ctxt->target = target; - ctxt->cb = connect_notify_cb; - ctxt->cb_cls = connect_notify_cb_cls; - GNUNET_assert (NULL == cgh->nctxt); - cgh->nctxt = ctxt; - ctxt->cgh = cgh; - } - if (NULL != handle) - { - if (GNUNET_SCHEDULER_NO_TASK == entry->notify_task) - { - if (NULL != search_suitable_cgh (entry, entry->cgh_qhead)) - entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry); - } - return cgh; - } - op = NULL; - switch (cgh->type) - { - case CGT_TRANSPORT_HANDLE: - if (NULL != entry->transport_op) - return cgh; - op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_transport, - &oprelease_get_handle_transport); - entry->transport_op = op; - break; - case CGT_CORE_HANDLE: - if (NULL != entry->core_op) - return cgh; - op = GNUNET_TESTBED_operation_create_ (entry, &opstart_get_handle_core, - &oprelease_get_handle_core); - entry->core_op = op; - break; - default: - GNUNET_assert (0); - } - GNUNET_TESTBED_operation_queue_insert_ (GST_opq_openfds, op); - GNUNET_TESTBED_operation_begin_wait_ (op); - return cgh; -} - - -/** * Iterator over hash map entries. * * @param cls closure @@ -834,22 +182,10 @@ cache_clear_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) static unsigned int ncleared; GNUNET_assert (NULL != entry); - GNUNET_break (0 == entry->demand); LOG_DEBUG ("Clearing entry %u of %u\n", ++ncleared, cache_size); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (cache, key, value)); - close_handles (entry); GNUNET_free_non_null (entry->hello); - GNUNET_break (GNUNET_SCHEDULER_NO_TASK == entry->expire_task); - GNUNET_assert (NULL == entry->transport_handle); - GNUNET_assert (NULL == entry->transport_op); - GNUNET_assert (NULL == entry->core_handle); - GNUNET_assert (NULL == entry->core_op); - GNUNET_assert (NULL == entry->cfg); - GNUNET_assert (NULL == entry->cgh_qhead); - GNUNET_assert (NULL == entry->cgh_qtail); - GNUNET_assert (NULL == entry->nctxt_qhead); - GNUNET_assert (NULL == entry->nctxt_qtail); GNUNET_free (entry); return GNUNET_YES; } @@ -891,133 +227,6 @@ GST_cache_init (unsigned int size) /** - * Mark the GetCacheHandle as being done if a handle has been provided already - * or as being cancelled if the callback for the handle hasn't been called. - * - * @param cgh the CacheGetHandle handle - */ -void -GST_cache_get_handle_done (struct GSTCacheGetHandle *cgh) -{ - struct CacheEntry *entry; - - entry = cgh->entry; - GNUNET_assert (NULL != entry); - GNUNET_assert (0 < entry->demand); - entry->demand--; - if (GNUNET_SCHEDULER_NO_TASK != entry->notify_task) - { - GNUNET_SCHEDULER_cancel (entry->notify_task); - entry->notify_task = GNUNET_SCHEDULER_NO_TASK; - } - GNUNET_CONTAINER_DLL_remove (entry->cgh_qhead, entry->cgh_qtail, cgh); - if (NULL != cgh->nctxt) - { - GNUNET_assert (cgh == cgh->nctxt->cgh); - if (GNUNET_YES == cgh->notify_called) - GNUNET_CONTAINER_DLL_remove (entry->nctxt_qhead, entry->nctxt_qtail, - cgh->nctxt); - GNUNET_free (cgh->nctxt); - } - GNUNET_free (cgh); - if (0 == entry->demand) - { - entry->expire_task = - GNUNET_SCHEDULER_add_delayed (CACHE_EXPIRY, &expire_cache_entry, entry); - GNUNET_CONTAINER_DLL_insert_tail (lru_cache_head, lru_cache_tail, entry); - lru_cache_size++; - entry->in_lru = GNUNET_YES; - if (lru_cache_size > lru_cache_threshold_size) - close_handles (lru_cache_head); - } - else - { - if (NULL != search_suitable_cgh (entry, entry->cgh_qhead)) - entry->notify_task = GNUNET_SCHEDULER_add_now (&call_cgh_cb, entry); - } -} - - -/** - * Get a transport handle with the given configuration. If the handle is - * already cached before, it will be retured in the given callback; the peer_id - * is used to lookup in the cache; if not, a new operation is started to open the - * transport handle and will be given in the callback when it is available. - * - * @param peer_id the index of the peer - * @param cfg the configuration with which the transport handle has to be - * created if it was not present in the cache - * @param cb the callback to notify when the transport handle is available - * @param cb_cls the closure for the above callback - * @param target the peer identify of the peer whose connection to our TRANSPORT - * subsystem will be notified through the connect_notify_cb. Can be NULL - * @param connect_notify_cb the callback to call when the given target peer is - * connected. This callback will only be called once or never again (in - * case the target peer cannot be connected). Can be NULL - * @param connect_notify_cb_cls the closure for the above callback - * @return the handle which can be used to cancel or mark that the handle is no - * longer being used - */ -struct GSTCacheGetHandle * -GST_cache_get_handle_transport (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GST_cache_handle_ready_cb cb, void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_cache_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls) -{ - struct GSTCacheGetHandle *cgh; - - cgh = GNUNET_malloc (sizeof (struct GSTCacheGetHandle)); - cgh->cb = cb; - cgh->cb_cls = cb_cls; - cgh->type = CGT_TRANSPORT_HANDLE; - return cache_get_handle (peer_id, cgh, cfg, target, connect_notify_cb, - connect_notify_cb_cls); -} - - -/** - * Get a CORE handle with the given configuration. If the handle is already - * cached before, it will be retured in the given callback; the peer_id is used - * to lookup in the cache. If the handle is not cached before, a new operation - * is started to open the CORE handle and will be given in the callback when it - * is available along with the peer identity - * - * @param peer_id the index of the peer - * @param cfg the configuration with which the transport handle has to be - * created if it was not present in the cache - * @param cb the callback to notify when the transport handle is available - * @param cb_cls the closure for the above callback - * @param target the peer identify of the peer whose connection to our CORE - * subsystem will be notified through the connect_notify_cb. Can be NULL - * @param connect_notify_cb the callback to call when the given target peer is - * connected. This callback will only be called once or never again (in - * case the target peer cannot be connected). Can be NULL - * @param connect_notify_cb_cls the closure for the above callback - * @return the handle which can be used to cancel or mark that the handle is no - * longer being used - */ -struct GSTCacheGetHandle * -GST_cache_get_handle_core (unsigned int peer_id, - const struct GNUNET_CONFIGURATION_Handle *cfg, - GST_cache_handle_ready_cb cb, void *cb_cls, - const struct GNUNET_PeerIdentity *target, - GST_cache_peer_connect_notify connect_notify_cb, - void *connect_notify_cb_cls) -{ - struct GSTCacheGetHandle *cgh; - - cgh = GNUNET_malloc (sizeof (struct GSTCacheGetHandle)); - cgh->cb = cb; - cgh->cb_cls = cb_cls; - cgh->type = CGT_CORE_HANDLE; - return cache_get_handle (peer_id, cgh, cfg, target, connect_notify_cb, - connect_notify_cb_cls); -} - - -/** * Looks up in the hello cache and returns the HELLO of the given peer * * @param peer_id the index of the peer whose HELLO has to be looked up diff --git a/src/testbed/gnunet-service-testbed_connectionpool.c b/src/testbed/gnunet-service-testbed_connectionpool.c index f4eb737781..158c9ec3a2 100644 --- a/src/testbed/gnunet-service-testbed_connectionpool.c +++ b/src/testbed/gnunet-service-testbed_connectionpool.c @@ -214,6 +214,11 @@ struct GST_ConnectionPool_GetHandle * Did we call the pool_connection_ready_cb already? */ int connection_ready_called; + + /** + * Are we waiting for any peer connect notifications? + */ + int notify_waiting; }; @@ -253,6 +258,15 @@ static unsigned int max_size; /** + * Cancel the expiration task of the give #PooledConnection object + * + * @param entry the #PooledConnection object + */ +static void +expire_task_cancel (struct PooledConnection *entry); + + +/** * Destroy a #PooledConnection object * * @param entry the #PooledConnection object @@ -264,7 +278,7 @@ destroy_pooled_connection (struct PooledConnection *entry) GNUNET_assert ((NULL == entry->head_waiting) && (NULL == entry->tail_waiting)); GNUNET_assert (0 == entry->demand); - GNUNET_free_non_null (entry->peer_identity); + expire_task_cancel (entry); if (entry->in_lru) GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry); if (entry->in_pool) @@ -308,6 +322,11 @@ expire (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Cancel the expiration task of the give #PooledConnection object + * + * @param entry the #PooledConnection object + */ static void expire_task_cancel (struct PooledConnection *entry) { @@ -399,12 +418,16 @@ connection_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (NULL != gh->next) gh_next = search_waiting (entry, gh->next); GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh); - gh->connection_ready_called = GNUNET_YES; + gh->connection_ready_called = 1; if (NULL != gh_next) entry->notify_task = GNUNET_SCHEDULER_add_now (&connection_ready, entry); if ( (NULL != gh->target) && (NULL != gh->connect_notify_cb) ) - GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify, entry->tail_notify, gh); - LOG_DEBUG ("Calling notify for handle type %u\n", gh->service); + { + GNUNET_CONTAINER_DLL_insert_tail (entry->head_notify, entry->tail_notify, + gh); + gh->notify_waiting = 1; + } + LOG_DEBUG ("Connection ready for handle type %u\n", gh->service); gh->cb (gh->cb_cls, entry->handle_core, entry->handle_transport, entry->peer_identity); } @@ -448,6 +471,8 @@ peer_connect_notify_cb (void *cls, const struct GNUNET_PeerIdentity *peer, cb_cls = gh->connect_notify_cb_cls; gh_next = gh->next; GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh); + gh->notify_waiting = 0; + LOG_DEBUG ("Peer connected to peer %u at service %u\n", entry->index, gh->service); gh = gh_next; cb (cb_cls, peer); } @@ -644,10 +669,6 @@ cleanup_iterator (void *cls, struct PooledConnection *entry = value; GNUNET_assert (NULL != entry); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap32_remove (map, key, entry)); - if (entry->in_lru) - GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry); destroy_pooled_connection (entry); return GNUNET_YES; } @@ -693,6 +714,7 @@ GST_connection_pool_destroy () GNUNET_CONTAINER_DLL_remove (head_lru, tail_lru, entry); destroy_pooled_connection (entry); } + GNUNET_assert (NULL == head_not_pooled); } @@ -744,6 +766,7 @@ GST_connection_pool_get_handle (unsigned int peer_id, uint32_t peer_id32; peer_id32 = (uint32_t) peer_id; + handle = NULL; entry = NULL; if (NULL != map) entry = GNUNET_CONTAINER_multihashmap32_get (map, peer_id32); @@ -853,10 +876,16 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh) struct PooledConnection *entry; entry = gh->entry; + LOG_DEBUG ("Cleaning up get handle %p for service %u, peer %u\n", + gh, + gh->service, entry->index); if (!gh->connection_ready_called) GNUNET_CONTAINER_DLL_remove (entry->head_waiting, entry->tail_waiting, gh); - else if ((NULL != gh->next) || (NULL != gh->prev)) - GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->head_notify, gh); + if (gh->notify_waiting) + { + GNUNET_CONTAINER_DLL_remove (entry->head_notify, entry->tail_notify, gh); + gh->notify_waiting = 0; + } GNUNET_free (gh); gh = NULL; GNUNET_assert (!entry->in_lru); @@ -865,10 +894,12 @@ GST_connection_pool_get_handle_done (struct GST_ConnectionPool_GetHandle *gh) if (GNUNET_YES == GNUNET_CONTAINER_multihashmap32_contains (map, entry->index)) goto unallocate; - if ((GNUNET_CONTAINER_multihashmap32_size (map) == max_size) - && (NULL == head_lru)) - goto unallocate; - destroy_pooled_connection (head_lru); + if (GNUNET_CONTAINER_multihashmap32_size (map) == max_size) + { + if (NULL == head_lru) + goto unallocate; + destroy_pooled_connection (head_lru); + } GNUNET_CONTAINER_DLL_remove (head_not_pooled, tail_not_pooled, entry); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap32_put (map, diff --git a/src/testbed/gnunet-service-testbed_oc.c b/src/testbed/gnunet-service-testbed_oc.c index 55f18d251a..8b7a8a0ee9 100644 --- a/src/testbed/gnunet-service-testbed_oc.c +++ b/src/testbed/gnunet-service-testbed_oc.c @@ -25,6 +25,7 @@ */ #include "gnunet-service-testbed.h" +#include "gnunet-service-testbed_connectionpool.h" /** * Redefine LOG with a changed log component string @@ -54,7 +55,7 @@ struct TryConnectContext /** * The GetCacheHandle for the p1th transport handle */ - struct GSTCacheGetHandle *cgh_th; + struct GST_ConnectionPool_GetHandle *cgh_th; /** * the try connect handle @@ -188,15 +189,15 @@ struct OverlayConnectContext struct GNUNET_TRANSPORT_Handle *p1th_; /** - * The CacheGetHandle for the p1th transport handle + * The #GST_ConnectionPool_GetHandle for the peer1's transport handle */ - struct GSTCacheGetHandle *cgh_p1th; + struct GST_ConnectionPool_GetHandle *cgh_p1th; /** - * The GetCacheHandle for registering callback to notify CORE level peer - * connects and to get our identity. + * The #GST_ConnectionPool_GetHandle for registering callback to notify CORE + * level peer connects and to get our identity. */ - struct GSTCacheGetHandle *cgh_ch; + struct GST_ConnectionPool_GetHandle *cgh_ch; /** * HELLO of the first peer. This should be sent to the second peer. @@ -474,7 +475,7 @@ cleanup_occ_lp2c (struct LocalPeer2Context *lp2c) if (NULL != lp2c->ohh) GNUNET_TRANSPORT_offer_hello_cancel (lp2c->ohh); if (NULL != lp2c->tcc.cgh_th) - GST_cache_get_handle_done (lp2c->tcc.cgh_th); + GST_connection_pool_get_handle_done (lp2c->tcc.cgh_th); if (NULL != lp2c->tcc.tch) GNUNET_TRANSPORT_try_connect_cancel (lp2c->tcc.tch); if (GNUNET_SCHEDULER_NO_TASK != lp2c->tcc.task) @@ -529,11 +530,11 @@ cleanup_occ (struct OverlayConnectContext *occ) if (GNUNET_SCHEDULER_NO_TASK != occ->timeout_task) GNUNET_SCHEDULER_cancel (occ->timeout_task); if (NULL != occ->cgh_ch) - GST_cache_get_handle_done (occ->cgh_ch); + GST_connection_pool_get_handle_done (occ->cgh_ch); if (NULL != occ->ghh) GNUNET_TRANSPORT_get_hello_cancel (occ->ghh); if (NULL != occ->cgh_p1th) - GST_cache_get_handle_done (occ->cgh_p1th); + GST_connection_pool_get_handle_done (occ->cgh_p1th); GNUNET_assert (NULL != GST_peer_list); GNUNET_assert (occ->peer->reference_cnt > 0); occ->peer->reference_cnt--; @@ -921,8 +922,9 @@ p2_transport_connect (struct OverlayConnectContext *occ) { GNUNET_assert (NULL != (peer2 = GST_peer_list[occ->other_peer_id])); occ->p2ctx.local.tcc.cgh_th = - GST_cache_get_handle_transport (occ->other_peer_id, + GST_connection_pool_get_handle (occ->other_peer_id, peer2->details.local.cfg, + GST_CONNECTIONPOOL_SERVICE_TRANSPORT, &p2_transport_connect_cache_callback, occ, NULL, NULL, NULL); return; @@ -985,7 +987,7 @@ hello_update_cb (void *cls, const struct GNUNET_MessageHeader *hello) memcpy (occ->hello, hello, msize); GNUNET_TRANSPORT_get_hello_cancel (occ->ghh); occ->ghh = NULL; - GST_cache_get_handle_done (occ->cgh_p1th); + GST_connection_pool_get_handle_done (occ->cgh_p1th); occ->cgh_p1th = NULL; occ->p1th_ = NULL; GNUNET_free_non_null (occ->emsg); @@ -1086,8 +1088,9 @@ occ_cache_get_handle_core_cb (void *cls, struct GNUNET_CORE_Handle *ch, "0x%llx: Timeout while acquiring TRANSPORT of %s from cache", occ->op_id, GNUNET_i2s (&occ->peer_identity)); occ->cgh_p1th = - GST_cache_get_handle_transport (occ->peer->id, + GST_connection_pool_get_handle (occ->peer->id, occ->peer->details.local.cfg, + GST_CONNECTIONPOOL_SERVICE_TRANSPORT, p1_transport_connect_cache_callback, occ, NULL, NULL, NULL); } @@ -1127,10 +1130,12 @@ overlay_connect_get_config (void *cls, const struct GNUNET_MessageHeader *msg) "0x%llx: Timeout while connecting to CORE of peer with " "id: %u", occ->op_id, occ->peer->id); occ->cgh_ch = - GST_cache_get_handle_core (occ->peer->id, occ->peer->details.local.cfg, - occ_cache_get_handle_core_cb, occ, - &occ->other_peer_identity, - &overlay_connect_notify, occ); + GST_connection_pool_get_handle (occ->peer->id, + occ->peer->details.local.cfg, + GST_CONNECTIONPOOL_SERVICE_CORE, + occ_cache_get_handle_core_cb, occ, + &occ->other_peer_identity, + &overlay_connect_notify, occ); return; } @@ -1481,10 +1486,12 @@ GST_handle_overlay_connect (void *cls, struct GNUNET_SERVER_Client *client, "0x%llx: Timeout while connecting to CORE of peer with " "id: %u", occ->op_id, occ->peer->id); occ->cgh_ch = - GST_cache_get_handle_core (occ->peer->id, occ->peer->details.local.cfg, - occ_cache_get_handle_core_cb, occ, - &occ->other_peer_identity, - &overlay_connect_notify, occ); + GST_connection_pool_get_handle (occ->peer->id, + occ->peer->details.local.cfg, + GST_CONNECTIONPOOL_SERVICE_CORE, + occ_cache_get_handle_core_cb, occ, + &occ->other_peer_identity, + &overlay_connect_notify, occ); break; } GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -1511,8 +1518,7 @@ cleanup_rocc (struct RemoteOverlayConnectCtx *rocc) GNUNET_TRANSPORT_try_connect_cancel (rocc->tcc.tch); if (GNUNET_SCHEDULER_NO_TASK != rocc->tcc.task) GNUNET_SCHEDULER_cancel (rocc->tcc.task); - //GNUNET_TRANSPORT_disconnect (rocc->tcc.th_); - GST_cache_get_handle_done (rocc->tcc.cgh_th); + GST_connection_pool_get_handle_done (rocc->tcc.cgh_th); GNUNET_assert (rocc->peer->reference_cnt > 0); rocc->peer->reference_cnt--; if ((GNUNET_YES == rocc->peer->destroy_flag) && @@ -1752,8 +1758,11 @@ GST_handle_remote_overlay_connect (void *cls, memcpy (rocc->hello, msg->hello, hsize); rocc->tcc.op_id = rocc->op_id; rocc->tcc.cgh_th = - GST_cache_get_handle_transport (peer_id, rocc->peer->details.local.cfg, - &rocc_cache_get_handle_transport_cb, rocc, + GST_connection_pool_get_handle (peer_id, + rocc->peer->details.local.cfg, + GST_CONNECTIONPOOL_SERVICE_TRANSPORT, + &rocc_cache_get_handle_transport_cb, + rocc, &rocc->a_id, &cache_transport_peer_connect_notify, rocc); |