diff options
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 15 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 10 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 54 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 2 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 95 | ||||
-rw-r--r-- | src/util/client_manager.c | 3 |
6 files changed, 118 insertions, 61 deletions
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index e7ee92cdfa..d197a3ef0f 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -509,7 +509,7 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p User context is NULL in client_disconnect()\n", grp); - GNUNET_assert (0); + GNUNET_break (0); return; } @@ -969,7 +969,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client, mem->join_dcsn, GNUNET_NO); } - else if (grp->clients_head == grp->clients_tail) + else { /* First client of the group, send join request. */ struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; uint32_t relay_count = ntohl (msg->relay_count); @@ -1042,16 +1042,7 @@ client_send_join_decision (struct Member *mem, memcpy (mem->join_dcsn, dcsn, dcsn_size); } else - { /* Refused entry, disconnect clients. */ -#if FIXME - struct ClientList *cl = mem->grp.clients_head; - while (NULL != cl) - { - struct GNUNET_SERVER_Client *client = cl->client; - cl = cl->next; - GNUNET_SERVER_client_disconnect (client); - } -#endif + { /* Refused entry, but replay would be still possible for past members. */ } } diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index c437b71c94..aa6dd3d988 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -1033,12 +1033,19 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, GNUNET_ContinuationCallback part_cb, void *part_cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); struct GNUNET_MULTICAST_Group *grp = &mem->grp; grp->is_disconnecting = GNUNET_YES; grp->disconnect_cb = part_cb; grp->disconnect_cls = part_cls; + mem->join_dcsn_cb = NULL; + grp->join_req_cb = NULL; + grp->message_cb = NULL; + grp->replay_msg_cb = NULL; + grp->replay_frag_cb = NULL; + GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES, member_cleanup, mem); } @@ -1157,7 +1164,8 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, - "MemberTransmitNotify() returned error or invalid message size.\n"); + "MemberTransmitNotify() returned error or invalid message size. " + "ret=%d, buf_size=%u\n", ret, buf_size); /* FIXME: handle error */ GNUNET_free (req); return; diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 3671a7d6f9..36e3a7764b 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -98,6 +98,16 @@ struct TransmitMessage uint16_t size; /** + * Type of first message part. + */ + uint16_t first_ptype; + + /** + * Type of last message part. + */ + uint16_t last_ptype; + + /** * @see enum MessageState */ uint8_t state; @@ -483,7 +493,7 @@ cleanup_master (struct Master *mst) if (NULL != mst->origin) GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); - GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); + GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst); } @@ -523,7 +533,7 @@ cleanup_slave (struct Slave *slv) GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME slv->member = NULL; } - GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); } @@ -582,8 +592,6 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", GNUNET_h2s (&chn->pub_key_hash)); - chn->is_disconnected = GNUNET_YES; - struct Client *cli = chn->clients_head; while (NULL != cli) { @@ -609,6 +617,11 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) if (NULL == chn->clients_head) { /* Last client disconnected. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Last client (%s) disconnected from channel %s\n", + chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", + GNUNET_h2s (&chn->pub_key_hash)); + chn->is_disconnected = GNUNET_YES; if (NULL != chn->tmit_head) { /* Send pending messages to multicast before cleanup. */ transmit_message (chn); @@ -789,6 +802,11 @@ mcast_recv_join_decision (void *cls, int is_admitted, struct Channel *chn = &slv->chn; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision: %d\n", slv, is_admitted); + if (GNUNET_YES == chn->is_ready) + { + /* Already admitted */ + return; + } uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; struct GNUNET_PSYC_JoinDecisionMessage * @@ -805,10 +823,6 @@ mcast_recv_join_decision (void *cls, int is_admitted, { chn->is_ready = GNUNET_YES; } - else - { - slv->member = NULL; - } } @@ -2011,7 +2025,8 @@ transmit_notify (void *cls, size_t *data_size, void *data) { transmit_message (chn); } - else if (GNUNET_YES == chn->is_disconnected) + else if (GNUNET_YES == chn->is_disconnected + && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) { /* FIXME: handle partial message (when still in_transmit) */ return GNUNET_SYSERR; @@ -2106,12 +2121,11 @@ transmit_message (struct Channel *chn) * Queue a message from a channel master for sending to the multicast group. */ static void -master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, - uint16_t first_ptype, uint16_t last_ptype) +master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst); - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { tmit_msg->id = ++mst->max_message_id; struct GNUNET_PSYC_MessageMethod *pmeth @@ -2149,10 +2163,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, * Queue a message from a channel slave for sending to the multicast group. */ static void -slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, - uint16_t first_ptype, uint16_t last_ptype) +slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg) { - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype) { struct GNUNET_PSYC_MessageMethod *pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; @@ -2185,16 +2198,16 @@ queue_message (struct Channel *chn, tmit_msg->client = client; tmit_msg->size = data_size; tmit_msg->state = chn->tmit_state; + tmit_msg->first_ptype = first_ptype; + tmit_msg->last_ptype = last_ptype; /* FIXME: separate queue per message ID */ GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); chn->is_master - ? master_queue_message ((struct Master *) chn, tmit_msg, - first_ptype, last_ptype) - : slave_queue_message ((struct Slave *) chn, tmit_msg, - first_ptype, last_ptype); + ? master_queue_message ((struct Master *) chn, tmit_msg) + : slave_queue_message ((struct Slave *) chn, tmit_msg); return tmit_msg; } @@ -2295,7 +2308,8 @@ store_recv_membership_store_result (void *cls, int64_t result, "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n", op->chn, result, err_msg_size, err_msg); - client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); + if (NULL != op->client) + client_send_result (op->client, op->op_id, result, err_msg, err_msg_size); op_remove (op); } diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 6128e4d828..9392deeef8 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -464,7 +464,7 @@ master_recv_join_request (void *cls, if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size)) { join_msg = (struct GNUNET_PSYC_Message *) &req[1]; - LOG (GNUNET_ERROR_TYPE_ERROR, + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received join_msg of type %u and size %u.\n", ntohs (join_msg->header.type), ntohs (join_msg->header.size)); } diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 863eaab88f..e2e6cfc879 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -86,19 +86,22 @@ uint8_t join_req_count, end_count; enum { - TEST_NONE = 0, - TEST_MASTER_START = 1, - TEST_SLAVE_JOIN = 2, - TEST_SLAVE_TRANSMIT = 3, - TEST_MASTER_TRANSMIT = 4, - TEST_MASTER_HISTORY_REPLAY_LATEST = 5, - TEST_SLAVE_HISTORY_REPLAY_LATEST = 6, - TEST_MASTER_HISTORY_REPLAY = 7, - TEST_SLAVE_HISTORY_REPLAY = 8, - TEST_MASTER_STATE_GET = 9, - TEST_SLAVE_STATE_GET = 10, - TEST_MASTER_STATE_GET_PREFIX = 11, - TEST_SLAVE_STATE_GET_PREFIX = 12, + TEST_NONE = 0, + TEST_MASTER_START = 1, + TEST_SLAVE_JOIN_REJECT = 2, + TEST_SLAVE_JOIN_ACCEPT = 3, + TEST_SLAVE_ADD = 4, + TEST_SLAVE_REMOVE = 5, + TEST_SLAVE_TRANSMIT = 6, + TEST_MASTER_TRANSMIT = 7, + TEST_MASTER_HISTORY_REPLAY_LATEST = 8, + TEST_SLAVE_HISTORY_REPLAY_LATEST = 9, + TEST_MASTER_HISTORY_REPLAY = 10, + TEST_SLAVE_HISTORY_REPLAY = 11, + TEST_MASTER_STATE_GET = 12, + TEST_SLAVE_STATE_GET = 13, + TEST_MASTER_STATE_GET_PREFIX = 14, + TEST_SLAVE_STATE_GET_PREFIX = 15, } test; @@ -204,6 +207,7 @@ void master_message_cb (void *cls, uint64_t message_id, uint32_t flags, const struct GNUNET_PSYC_MessageHeader *msg) { + GNUNET_assert (NULL != msg); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Master got PSYC message fragment of size %u " "belonging to message ID %" PRIu64 " with flags %x\n", @@ -718,17 +722,46 @@ slave_remove_cb (void *cls, int64_t result, void +slave_remove () +{ + test = TEST_SLAVE_REMOVE; + struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); + GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, + &slave_remove_cb, chn); +} + + +void slave_add_cb (void *cls, int64_t result, const void *err_msg, uint16_t err_msg_size) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "slave_add:\t%" PRId64 " (%.*s)\n", result, err_msg_size, err_msg); + slave_remove (); +} - struct GNUNET_PSYC_Channel *chn = cls; - GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2, - &slave_remove_cb, chn); +void +slave_add () +{ + test = TEST_SLAVE_ADD; + struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); + GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); +} + + +void first_slave_parted (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n"); + slave_join (TEST_SLAVE_JOIN_ACCEPT); +} + + +void +schedule_slave_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_PSYC_slave_part (slv, GNUNET_NO, first_slave_parted, NULL); } @@ -741,15 +774,23 @@ join_decision_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave got join decision: %d\n", is_admitted); - if (GNUNET_YES != is_admitted) - { /* First join request is refused, retry. */ + switch (test) + { + case TEST_SLAVE_JOIN_REJECT: + GNUNET_assert (0 == is_admitted); GNUNET_assert (1 == join_req_count); - slave_join (); - return; - } + GNUNET_SCHEDULER_add_now (schedule_slave_part, NULL); + break; - struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst); - GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn); + case TEST_SLAVE_JOIN_ACCEPT: + GNUNET_assert (1 == is_admitted); + GNUNET_assert (2 == join_req_count); + slave_add (); + break; + + default: + GNUNET_break (0); + } } @@ -778,16 +819,16 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave connected: %d, max_message_id: %" PRIu64 "\n", result, max_message_id); - GNUNET_assert (TEST_SLAVE_JOIN == test); + GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test); GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); } static void -slave_join () +slave_join (int t) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); - test = TEST_SLAVE_JOIN; + test = t; struct GNUNET_PeerIdentity origin = this_peer; struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); @@ -870,7 +911,7 @@ master_start_cb (void *cls, int result, uint64_t max_message_id) result, max_message_id); GNUNET_assert (TEST_MASTER_START == test); GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result); - slave_join (); + slave_join (TEST_SLAVE_JOIN_REJECT); } diff --git a/src/util/client_manager.c b/src/util/client_manager.c index f38d0f8864..2fd52705e7 100644 --- a/src/util/client_manager.c +++ b/src/util/client_manager.c @@ -196,6 +196,9 @@ recv_message (void *cls, const struct GNUNET_MessageHeader *msg) mgr->client_tmit = NULL; } + if (GNUNET_YES == mgr->is_disconnecting) + return; + size_t i = 0; while (NULL != mgr->handlers[i].callback) { |