diff options
Diffstat (limited to 'src/lockmanager/lockmanager_api.c')
-rw-r--r-- | src/lockmanager/lockmanager_api.c | 325 |
1 files changed, 190 insertions, 135 deletions
diff --git a/src/lockmanager/lockmanager_api.c b/src/lockmanager/lockmanager_api.c index 99f5ab5..3b9e70e 100644 --- a/src/lockmanager/lockmanager_api.c +++ b/src/lockmanager/lockmanager_api.c @@ -24,11 +24,6 @@ * @author Sree Harsha Totakura */ -/** - * To be fixed: - * Should the handle be freed when the connection to service is lost? - * Should cancel_request have a call back (else simultaneous calls break) - */ #include "platform.h" #include "gnunet_common.h" @@ -63,11 +58,17 @@ struct MessageQueue * The prev pointer for doubly linked list */ struct MessageQueue *prev; - + /** * The LOCKMANAGER Message */ struct GNUNET_LOCKMANAGER_Message *msg; + + /** + * If this is a AQUIRE_LOCK message, this is the + * affiliated locking request. + */ + struct GNUNET_LOCKMANAGER_LockingRequest *lr; }; @@ -100,6 +101,11 @@ struct GNUNET_LOCKMANAGER_Handle * Double linked list tail for message queue */ struct MessageQueue *mq_tail; + + /** + * Are we currently handling replies? + */ + int in_replies; }; @@ -119,6 +125,12 @@ struct GNUNET_LOCKMANAGER_LockingRequest GNUNET_LOCKMANAGER_StatusCallback status_cb; /** + * Entry in the request message queue for aquiring this + * lock; NULL after request has been sent. + */ + struct MessageQueue *mqe; + + /** * Closure for the status callback */ void *status_cb_cls; @@ -127,7 +139,7 @@ struct GNUNET_LOCKMANAGER_LockingRequest * The locking domain of this request */ char *domain; - + /** * The lock */ @@ -137,6 +149,11 @@ struct GNUNET_LOCKMANAGER_LockingRequest * The status of the lock */ enum GNUNET_LOCKMANAGER_Status status; + + /** + * set to GNUNET_YES if acquire message for this lock is till in messga queue + */ + int acquire_sent; }; @@ -163,6 +180,16 @@ struct LockingRequestMatch /** + * Handler for server replies + * + * @param cls the LOCKMANAGER_Handle + * @param msg received message, NULL on timeout or fatal error + */ +static void +handle_replies (void *cls, const struct GNUNET_MessageHeader *msg); + + +/** * Transmit notify for sending message to server * * @param cls the lockmanager handle @@ -170,7 +197,7 @@ struct LockingRequestMatch * @param buf where the callee should write the message * @return number of bytes written to buf */ -static size_t +static size_t transmit_notify (void *cls, size_t size, void *buf) { struct GNUNET_LOCKMANAGER_Handle *handle = cls; @@ -178,34 +205,48 @@ transmit_notify (void *cls, size_t size, void *buf) uint16_t msg_size; handle->transmit_handle = NULL; + queue_entity = handle->mq_head; + GNUNET_assert (NULL != queue_entity); if ((0 == size) || (NULL == buf)) { - /* FIXME: Timed out -- requeue? */ + handle->transmit_handle = + GNUNET_CLIENT_notify_transmit_ready (handle->conn, + ntohs (queue_entity->msg-> + header.size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_YES, &transmit_notify, + handle); return 0; } - queue_entity = handle->mq_head; - GNUNET_assert (NULL != queue_entity); msg_size = ntohs (queue_entity->msg->header.size); GNUNET_assert (size >= msg_size); memcpy (buf, queue_entity->msg, msg_size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Message of size %u sent\n", msg_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Message of size %u sent\n", msg_size); + if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE == + ntohs (queue_entity->msg->header.type)) + { + GNUNET_break (GNUNET_NO == queue_entity->lr->acquire_sent); + queue_entity->lr->acquire_sent = GNUNET_YES; + queue_entity->lr->mqe = NULL; + } GNUNET_free (queue_entity->msg); - GNUNET_CONTAINER_DLL_remove (handle->mq_head, - handle->mq_tail, - queue_entity); + GNUNET_CONTAINER_DLL_remove (handle->mq_head, handle->mq_tail, queue_entity); GNUNET_free (queue_entity); queue_entity = handle->mq_head; if (NULL != queue_entity) { handle->transmit_handle = - GNUNET_CLIENT_notify_transmit_ready (handle->conn, - ntohs - (queue_entity->msg->header.size), - TIMEOUT, - GNUNET_YES, - &transmit_notify, - handle); + GNUNET_CLIENT_notify_transmit_ready (handle->conn, + ntohs (queue_entity->msg-> + header.size), TIMEOUT, + GNUNET_YES, &transmit_notify, + handle); + } + if (GNUNET_NO == handle->in_replies) + { + handle->in_replies = GNUNET_YES; + GNUNET_CLIENT_receive (handle->conn, &handle_replies, handle, + GNUNET_TIME_UNIT_FOREVER_REL); } return msg_size; } @@ -216,29 +257,31 @@ transmit_notify (void *cls, size_t size, void *buf) * * @param handle the lockmanager handle whose queue will be used * @param msg the message to be queued + * @param request the locking reqeust responsible for queueing this message + * @return the MessageQueue entity that has been queued */ -static void +static struct MessageQueue * queue_message (struct GNUNET_LOCKMANAGER_Handle *handle, - struct GNUNET_LOCKMANAGER_Message *msg) + struct GNUNET_LOCKMANAGER_Message *msg, + struct GNUNET_LOCKMANAGER_LockingRequest *request) { struct MessageQueue *queue_entity; GNUNET_assert (NULL != msg); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head, - handle->mq_tail, + queue_entity->lr = request; + GNUNET_CONTAINER_DLL_insert_tail (handle->mq_head, handle->mq_tail, queue_entity); if (NULL == handle->transmit_handle) { handle->transmit_handle = - GNUNET_CLIENT_notify_transmit_ready (handle->conn, - ntohs (msg->header.size), - TIMEOUT, - GNUNET_YES, - &transmit_notify, - handle); + GNUNET_CLIENT_notify_transmit_ready (handle->conn, + ntohs (msg->header.size), TIMEOUT, + GNUNET_YES, &transmit_notify, + handle); } + return queue_entity; } @@ -250,15 +293,12 @@ queue_message (struct GNUNET_LOCKMANAGER_Handle *handle, * @param key set to the key */ static void -get_key (const char *domain_name, - uint32_t lock_number, - struct GNUNET_HashCode *key) +get_key (const char *domain_name, uint32_t lock_number, + struct GNUNET_HashCode *key) { uint32_t *last_32; - GNUNET_CRYPTO_hash (domain_name, - strlen (domain_name), - key); + GNUNET_CRYPTO_hash (domain_name, strlen (domain_name), key); last_32 = (uint32_t *) key; *last_32 ^= lock_number; } @@ -272,17 +312,17 @@ get_key (const char *domain_name, * @param value value in the hash map (struct GNUNET_LOCKMANAGER_LockingRequest) * @return GNUNET_YES if we should continue to * iterate, - * GNUNET_NO if not. + * GNUNET_NO if not. */ static int -match_iterator (void *cls, const GNUNET_HashCode *key, void *value) +match_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { struct LockingRequestMatch *match = cls; struct GNUNET_LOCKMANAGER_LockingRequest *lr = value; - if ( (match->lock == lr->lock) && (0 == strcmp (match->domain, lr->domain)) ) + if ((match->lock == lr->lock) && (0 == strcmp (match->domain, lr->domain))) { - match->matched_entry = lr; + match->matched_entry = lr; return GNUNET_NO; } return GNUNET_YES; @@ -297,12 +337,11 @@ match_iterator (void *cls, const GNUNET_HashCode *key, void *value) * @param domain the locking domain name * @param lock the lock number * @return the found LockingRequest; NULL if a matching LockingRequest wasn't - * found + * found */ static struct GNUNET_LOCKMANAGER_LockingRequest * hashmap_find_lockingrequest (const struct GNUNET_CONTAINER_MultiHashMap *map, - const char *domain, - uint32_t lock) + const char *domain, uint32_t lock) { struct GNUNET_HashCode hash; struct LockingRequestMatch lock_match; @@ -311,9 +350,7 @@ hashmap_find_lockingrequest (const struct GNUNET_CONTAINER_MultiHashMap *map, lock_match.domain = domain; lock_match.lock = lock; get_key (domain, lock, &hash); - GNUNET_CONTAINER_multihashmap_get_multiple (map, - &hash, - &match_iterator, + GNUNET_CONTAINER_multihashmap_get_multiple (map, &hash, &match_iterator, &lock_match); return lock_match.matched_entry; } @@ -335,16 +372,40 @@ call_status_cb_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling status change for SUCCESS on lock num: %d, domain: %s\n", r->lock, r->domain); - r->status_cb (r->status_cb_cls, - r->domain, - r->lock, - r->status); + r->status_cb (r->status_cb_cls, r->domain, r->lock, r->status); } } /** - * Iterator to call relase and free all LockingRequest entries + * Function to generate acquire message for a lock + * + * @param domain_name the domain name of the lock + * @param lock the lock number + * @return the generated GNUNET_LOCKMANAGER_Message + */ +static struct GNUNET_LOCKMANAGER_Message * +generate_acquire_msg (const char *domain_name, uint32_t lock) +{ + struct GNUNET_LOCKMANAGER_Message *msg; + size_t domain_name_len; + uint16_t msg_size; + + domain_name_len = strlen (domain_name) + 1; + msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) + domain_name_len; + msg = GNUNET_malloc (msg_size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE); + msg->header.size = htons (msg_size); + msg->lock = htonl (lock); + memcpy (&msg[1], domain_name, domain_name_len); + return msg; +} + + +/** + * Iterator to call relase on locks; acquire messages are sent for all + * locks. In addition, if a lock is acquired before, it is not released and its + * status callback is called to signal its release * * @param cls the lockmanager handle * @param key current key code @@ -354,29 +415,29 @@ call_status_cb_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * GNUNET_NO if not. */ static int -release_iterator(void *cls, - const GNUNET_HashCode * key, - void *value) +release_n_retry_iterator (void *cls, const struct GNUNET_HashCode *key, + void *value) { - struct GNUNET_LOCKMANAGER_Handle *h = cls; struct GNUNET_LOCKMANAGER_LockingRequest *r = value; + struct GNUNET_LOCKMANAGER_Handle *h = cls; + struct GNUNET_LOCKMANAGER_Message *msg; + if (GNUNET_NO == r->acquire_sent) /* an acquire is still in queue */ + return GNUNET_YES; + r->acquire_sent = GNUNET_NO; + msg = generate_acquire_msg (r->domain, r->lock); + r->mqe = queue_message (h, msg, r); + if (GNUNET_LOCKMANAGER_RELEASE == r->status) + return GNUNET_YES; if (NULL != r->status_cb) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling status change for RELEASE on lock num: %d, domain: %s\n", r->lock, r->domain); - r->status_cb (r->status_cb_cls, - r->domain, - r->lock, + r->status = GNUNET_LOCKMANAGER_RELEASE; + r->status_cb (r->status_cb_cls, r->domain, r->lock, GNUNET_LOCKMANAGER_RELEASE); } - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (h->hashmap, - key, - value)); - GNUNET_free (r->domain); - GNUNET_free (r); return GNUNET_YES; } @@ -387,9 +448,8 @@ release_iterator(void *cls, * @param cls the LOCKMANAGER_Handle * @param msg received message, NULL on timeout or fatal error */ -static void -handle_replies (void *cls, - const struct GNUNET_MessageHeader *msg) +static void +handle_replies (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_LOCKMANAGER_Handle *handle = cls; const struct GNUNET_LOCKMANAGER_Message *m; @@ -398,22 +458,21 @@ handle_replies (void *cls, struct GNUNET_HashCode hash; uint32_t lock; uint16_t msize; - + + handle->in_replies = GNUNET_NO; if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Lockmanager service not available or went down\n"); - /* Should release all locks and free its locking requests */ + /* Should release all locks and retry to acquire them */ GNUNET_CONTAINER_multihashmap_iterate (handle->hashmap, - &release_iterator, - handle); + &release_n_retry_iterator, handle); return; } - GNUNET_CLIENT_receive (handle->conn, - &handle_replies, - handle, + handle->in_replies = GNUNET_YES; + GNUNET_CLIENT_receive (handle->conn, &handle_replies, handle, GNUNET_TIME_UNIT_FOREVER_REL); - if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_SUCCESS != ntohs(msg->type)) + if (GNUNET_MESSAGE_TYPE_LOCKMANAGER_SUCCESS != ntohs (msg->type)) { GNUNET_break (0); return; @@ -427,20 +486,18 @@ handle_replies (void *cls, m = (const struct GNUNET_LOCKMANAGER_Message *) msg; domain = (const char *) &m[1]; msize -= sizeof (struct GNUNET_LOCKMANAGER_Message); - if ('\0' != domain[msize-1]) + if ('\0' != domain[msize - 1]) { GNUNET_break (0); return; } lock = ntohl (m->lock); - get_key (domain, lock, &hash); + get_key (domain, lock, &hash); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received SUCCESS message for lock: %d, domain %s\n", - lock, domain); - if (NULL == (lr = hashmap_find_lockingrequest (handle->hashmap, - domain, - lock))) + "Received SUCCESS message for lock: %d, domain %s\n", lock, domain); + if (NULL == + (lr = hashmap_find_lockingrequest (handle->hashmap, domain, lock))) { GNUNET_break (0); return; @@ -451,11 +508,10 @@ handle_replies (void *cls, return; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Changing status for lock: %d in domain: %s to SUCCESS\n", - lr->lock, lr->domain); + "Changing status for lock: %d in domain: %s to SUCCESS\n", lr->lock, + lr->domain); lr->status = GNUNET_LOCKMANAGER_SUCCESS; - GNUNET_SCHEDULER_add_continuation (&call_status_cb_task, - lr, + GNUNET_SCHEDULER_add_continuation (&call_status_cb_task, lr, GNUNET_SCHEDULER_REASON_PREREQ_DONE); } @@ -471,19 +527,14 @@ handle_replies (void *cls, * GNUNET_NO if not. */ static int -free_iterator(void *cls, - const GNUNET_HashCode * key, - void *value) +free_iterator (void *cls, const struct GNUNET_HashCode *key, void *value) { struct GNUNET_LOCKMANAGER_Handle *h = cls; struct GNUNET_LOCKMANAGER_LockingRequest *r = value; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Clearing locking request\n"); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (h->hashmap, - key, - value)); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Clearing locking request\n"); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (h->hashmap, key, value)); GNUNET_free (r->domain); GNUNET_free (r); return GNUNET_YES; @@ -515,14 +566,12 @@ GNUNET_LOCKMANAGER_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) GNUNET_free (h); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); return NULL; - } - h->hashmap = GNUNET_CONTAINER_multihashmap_create (15); + } + h->hashmap = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_NO); GNUNET_assert (NULL != h->hashmap); - GNUNET_CLIENT_receive (h->conn, - &handle_replies, - h, + h->in_replies = GNUNET_YES; + GNUNET_CLIENT_receive (h->conn, &handle_replies, h, GNUNET_TIME_UNIT_FOREVER_REL); - LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); return h; } @@ -544,8 +593,7 @@ GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) LOG (GNUNET_ERROR_TYPE_WARNING, "Some locking requests are still present. Cancel them before " "calling %s\n", __func__); - GNUNET_CONTAINER_multihashmap_iterate (handle->hashmap, - &free_iterator, + GNUNET_CONTAINER_multihashmap_iterate (handle->hashmap, &free_iterator, handle); } GNUNET_CONTAINER_multihashmap_destroy (handle->hashmap); @@ -557,9 +605,7 @@ GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) head = handle->mq_head; while (NULL != head) { - GNUNET_CONTAINER_DLL_remove (handle->mq_head, - handle->mq_tail, - head); + GNUNET_CONTAINER_DLL_remove (handle->mq_head, handle->mq_tail, head); GNUNET_free (head->msg); GNUNET_free (head); head = handle->mq_head; @@ -595,18 +641,15 @@ GNUNET_LOCKMANAGER_disconnect (struct GNUNET_LOCKMANAGER_Handle *handle) */ struct GNUNET_LOCKMANAGER_LockingRequest * GNUNET_LOCKMANAGER_acquire_lock (struct GNUNET_LOCKMANAGER_Handle *handle, - const char *domain_name, - uint32_t lock, - GNUNET_LOCKMANAGER_StatusCallback - status_cb, + const char *domain_name, uint32_t lock, + GNUNET_LOCKMANAGER_StatusCallback status_cb, void *status_cb_cls) { struct GNUNET_LOCKMANAGER_LockingRequest *r; struct GNUNET_LOCKMANAGER_Message *msg; struct GNUNET_HashCode hash; - uint16_t msg_size; size_t domain_name_length; - + LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__); r = GNUNET_malloc (sizeof (struct GNUNET_LOCKMANAGER_LockingRequest)); domain_name_length = strlen (domain_name) + 1; @@ -616,30 +659,25 @@ GNUNET_LOCKMANAGER_acquire_lock (struct GNUNET_LOCKMANAGER_Handle *handle, r->status = GNUNET_LOCKMANAGER_RELEASE; r->status_cb = status_cb; r->status_cb_cls = status_cb_cls; + r->acquire_sent = GNUNET_NO; memcpy (r->domain, domain_name, domain_name_length); - msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) + domain_name_length; - msg = GNUNET_malloc (msg_size); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_LOCKMANAGER_ACQUIRE); - msg->header.size = htons (msg_size); - msg->lock = htonl (lock); - memcpy (&msg[1], r->domain, domain_name_length); + msg = generate_acquire_msg (r->domain, r->lock); LOG (GNUNET_ERROR_TYPE_DEBUG, "Queueing ACQUIRE message\n"); - queue_message (handle, msg); + r->mqe = queue_message (handle, msg, r); get_key (r->domain, r->lock, &hash); - GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap, - &hash, - r, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (r->handle->hashmap, &hash, + r, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); return r; } - /** * Function to cancel the locking request generated by - * GNUNET_LOCKMANAGER_acquire_lock. If the lock is acquired us then the lock is - * released. GNUNET_LOCKMANAGER_StatusCallback will not be called upon any + * GNUNET_LOCKMANAGER_acquire_lock. If the lock is acquired by us then the lock + * is released. GNUNET_LOCKMANAGER_StatusCallback will not be called upon any * status changes resulting due to this call. * * @param request the LockingRequest to cancel @@ -654,24 +692,41 @@ GNUNET_LOCKMANAGER_cancel_request (struct GNUNET_LOCKMANAGER_LockingRequest size_t domain_name_length; LOG (GNUNET_ERROR_TYPE_DEBUG, "%s()\n", __func__); - /* FIXME: Stop ACQUIRE retransmissions */ + if (GNUNET_NO == request->acquire_sent) + { + GNUNET_assert (NULL != request->mqe); + if ((NULL != request->handle->transmit_handle) && + (request->handle->mq_head == request->mqe)) + { + GNUNET_CLIENT_notify_transmit_ready_cancel (request-> + handle->transmit_handle); + request->handle->transmit_handle = NULL; + } + GNUNET_CONTAINER_DLL_remove (request->handle->mq_head, + request->handle->mq_tail, request->mqe); + GNUNET_free (request->mqe->msg); + GNUNET_free (request->mqe); + request->status = GNUNET_LOCKMANAGER_RELEASE; + } if (GNUNET_LOCKMANAGER_SUCCESS == request->status) { domain_name_length = strlen (request->domain) + 1; - msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) - + domain_name_length; + msg_size = sizeof (struct GNUNET_LOCKMANAGER_Message) + domain_name_length; msg = GNUNET_malloc (msg_size); msg->header.type = htons (GNUNET_MESSAGE_TYPE_LOCKMANAGER_RELEASE); msg->header.size = htons (msg_size); msg->lock = htonl (request->lock); memcpy (&msg[1], request->domain, domain_name_length); - queue_message (request->handle, msg); + GNUNET_assert (NULL == request->mqe); + (void) queue_message (request->handle, msg, request); } get_key (request->domain, request->lock, &hash); GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove - (request->handle->hashmap, &hash, request)); + GNUNET_CONTAINER_multihashmap_remove (request->handle->hashmap, + &hash, request)); GNUNET_free (request->domain); GNUNET_free (request); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__); } + +/* end of lockmanager_api.c */ |