aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/multicast/gnunet-service-multicast.c15
-rw-r--r--src/multicast/multicast_api.c10
-rw-r--r--src/psyc/gnunet-service-psyc.c54
-rw-r--r--src/psyc/psyc_api.c2
-rw-r--r--src/psyc/test_psyc.c95
-rw-r--r--src/util/client_manager.c3
6 files changed, 118 insertions, 61 deletions
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index e7ee92cdfa..d197a3ef0f 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -509,7 +509,7 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p User context is NULL in client_disconnect()\n", grp);
- GNUNET_assert (0);
+ GNUNET_break (0);
return;
}
@@ -969,7 +969,7 @@ client_recv_member_join (void *cls, struct GNUNET_SERVER_Client *client,
mem->join_dcsn,
GNUNET_NO);
}
- else if (grp->clients_head == grp->clients_tail)
+ else
{ /* First client of the group, send join request. */
struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
uint32_t relay_count = ntohl (msg->relay_count);
@@ -1042,16 +1042,7 @@ client_send_join_decision (struct Member *mem,
memcpy (mem->join_dcsn, dcsn, dcsn_size);
}
else
- { /* Refused entry, disconnect clients. */
-#if FIXME
- struct ClientList *cl = mem->grp.clients_head;
- while (NULL != cl)
- {
- struct GNUNET_SERVER_Client *client = cl->client;
- cl = cl->next;
- GNUNET_SERVER_client_disconnect (client);
- }
-#endif
+ { /* Refused entry, but replay would be still possible for past members. */
}
}
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index c437b71c94..aa6dd3d988 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -1033,12 +1033,19 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
GNUNET_ContinuationCallback part_cb,
void *part_cls)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
grp->is_disconnecting = GNUNET_YES;
grp->disconnect_cb = part_cb;
grp->disconnect_cls = part_cls;
+ mem->join_dcsn_cb = NULL;
+ grp->join_req_cb = NULL;
+ grp->message_cb = NULL;
+ grp->replay_msg_cb = NULL;
+ grp->replay_frag_cb = NULL;
+
GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES,
member_cleanup, mem);
}
@@ -1157,7 +1164,8 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
|| GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
- "MemberTransmitNotify() returned error or invalid message size.\n");
+ "MemberTransmitNotify() returned error or invalid message size. "
+ "ret=%d, buf_size=%u\n", ret, buf_size);
/* FIXME: handle error */
GNUNET_free (req);
return;
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 3671a7d6f9..36e3a7764b 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -98,6 +98,16 @@ struct TransmitMessage
uint16_t size;
/**
+ * Type of first message part.
+ */
+ uint16_t first_ptype;
+
+ /**
+ * Type of last message part.
+ */
+ uint16_t last_ptype;
+
+ /**
* @see enum MessageState
*/
uint8_t state;
@@ -483,7 +493,7 @@ cleanup_master (struct Master *mst)
if (NULL != mst->origin)
GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
- GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
+ GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, mst);
}
@@ -523,7 +533,7 @@ cleanup_slave (struct Slave *slv)
GNUNET_MULTICAST_member_part (slv->member, NULL, NULL); // FIXME
slv->member = NULL;
}
- GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
}
@@ -582,8 +592,6 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
GNUNET_h2s (&chn->pub_key_hash));
- chn->is_disconnected = GNUNET_YES;
-
struct Client *cli = chn->clients_head;
while (NULL != cli)
{
@@ -609,6 +617,11 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
if (NULL == chn->clients_head)
{ /* Last client disconnected. */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Last client (%s) disconnected from channel %s\n",
+ chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+ GNUNET_h2s (&chn->pub_key_hash));
+ chn->is_disconnected = GNUNET_YES;
if (NULL != chn->tmit_head)
{ /* Send pending messages to multicast before cleanup. */
transmit_message (chn);
@@ -789,6 +802,11 @@ mcast_recv_join_decision (void *cls, int is_admitted,
struct Channel *chn = &slv->chn;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision: %d\n", slv, is_admitted);
+ if (GNUNET_YES == chn->is_ready)
+ {
+ /* Already admitted */
+ return;
+ }
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
struct GNUNET_PSYC_JoinDecisionMessage *
@@ -805,10 +823,6 @@ mcast_recv_join_decision (void *cls, int is_admitted,
{
chn->is_ready = GNUNET_YES;
}
- else
- {
- slv->member = NULL;
- }
}
@@ -2011,7 +2025,8 @@ transmit_notify (void *cls, size_t *data_size, void *data)
{
transmit_message (chn);
}
- else if (GNUNET_YES == chn->is_disconnected)
+ else if (GNUNET_YES == chn->is_disconnected
+ && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
{
/* FIXME: handle partial message (when still in_transmit) */
return GNUNET_SYSERR;
@@ -2106,12 +2121,11 @@ transmit_message (struct Channel *chn)
* Queue a message from a channel master for sending to the multicast group.
*/
static void
-master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p master_queue_message()\n", mst);
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
tmit_msg->id = ++mst->max_message_id;
struct GNUNET_PSYC_MessageMethod *pmeth
@@ -2149,10 +2163,9 @@ master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
* Queue a message from a channel slave for sending to the multicast group.
*/
static void
-slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
- uint16_t first_ptype, uint16_t last_ptype)
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg)
{
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
{
struct GNUNET_PSYC_MessageMethod *pmeth
= (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
@@ -2185,16 +2198,16 @@ queue_message (struct Channel *chn,
tmit_msg->client = client;
tmit_msg->size = data_size;
tmit_msg->state = chn->tmit_state;
+ tmit_msg->first_ptype = first_ptype;
+ tmit_msg->last_ptype = last_ptype;
/* FIXME: separate queue per message ID */
GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
chn->is_master
- ? master_queue_message ((struct Master *) chn, tmit_msg,
- first_ptype, last_ptype)
- : slave_queue_message ((struct Slave *) chn, tmit_msg,
- first_ptype, last_ptype);
+ ? master_queue_message ((struct Master *) chn, tmit_msg)
+ : slave_queue_message ((struct Slave *) chn, tmit_msg);
return tmit_msg;
}
@@ -2295,7 +2308,8 @@ store_recv_membership_store_result (void *cls, int64_t result,
"%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 " (%.s)\n",
op->chn, result, err_msg_size, err_msg);
- client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
+ if (NULL != op->client)
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
op_remove (op);
}
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index 6128e4d828..9392deeef8 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -464,7 +464,7 @@ master_recv_join_request (void *cls,
if (sizeof (*req) + sizeof (*join_msg) <= ntohs (req->header.size))
{
join_msg = (struct GNUNET_PSYC_Message *) &req[1];
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received join_msg of type %u and size %u.\n",
ntohs (join_msg->header.type), ntohs (join_msg->header.size));
}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 863eaab88f..e2e6cfc879 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -86,19 +86,22 @@ uint8_t join_req_count, end_count;
enum
{
- TEST_NONE = 0,
- TEST_MASTER_START = 1,
- TEST_SLAVE_JOIN = 2,
- TEST_SLAVE_TRANSMIT = 3,
- TEST_MASTER_TRANSMIT = 4,
- TEST_MASTER_HISTORY_REPLAY_LATEST = 5,
- TEST_SLAVE_HISTORY_REPLAY_LATEST = 6,
- TEST_MASTER_HISTORY_REPLAY = 7,
- TEST_SLAVE_HISTORY_REPLAY = 8,
- TEST_MASTER_STATE_GET = 9,
- TEST_SLAVE_STATE_GET = 10,
- TEST_MASTER_STATE_GET_PREFIX = 11,
- TEST_SLAVE_STATE_GET_PREFIX = 12,
+ TEST_NONE = 0,
+ TEST_MASTER_START = 1,
+ TEST_SLAVE_JOIN_REJECT = 2,
+ TEST_SLAVE_JOIN_ACCEPT = 3,
+ TEST_SLAVE_ADD = 4,
+ TEST_SLAVE_REMOVE = 5,
+ TEST_SLAVE_TRANSMIT = 6,
+ TEST_MASTER_TRANSMIT = 7,
+ TEST_MASTER_HISTORY_REPLAY_LATEST = 8,
+ TEST_SLAVE_HISTORY_REPLAY_LATEST = 9,
+ TEST_MASTER_HISTORY_REPLAY = 10,
+ TEST_SLAVE_HISTORY_REPLAY = 11,
+ TEST_MASTER_STATE_GET = 12,
+ TEST_SLAVE_STATE_GET = 13,
+ TEST_MASTER_STATE_GET_PREFIX = 14,
+ TEST_SLAVE_STATE_GET_PREFIX = 15,
} test;
@@ -204,6 +207,7 @@ void
master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
const struct GNUNET_PSYC_MessageHeader *msg)
{
+ GNUNET_assert (NULL != msg);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Test #%d: Master got PSYC message fragment of size %u "
"belonging to message ID %" PRIu64 " with flags %x\n",
@@ -718,17 +722,46 @@ slave_remove_cb (void *cls, int64_t result,
void
+slave_remove ()
+{
+ test = TEST_SLAVE_REMOVE;
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
+ &slave_remove_cb, chn);
+}
+
+
+void
slave_add_cb (void *cls, int64_t result,
const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"slave_add:\t%" PRId64 " (%.*s)\n",
result, err_msg_size, err_msg);
+ slave_remove ();
+}
- struct GNUNET_PSYC_Channel *chn = cls;
- GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
- &slave_remove_cb, chn);
+void
+slave_add ()
+{
+ test = TEST_SLAVE_ADD;
+ struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
+ GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
+}
+
+
+void first_slave_parted (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "First slave parted.\n");
+ slave_join (TEST_SLAVE_JOIN_ACCEPT);
+}
+
+
+void
+schedule_slave_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_PSYC_slave_part (slv, GNUNET_NO, first_slave_parted, NULL);
}
@@ -741,15 +774,23 @@ join_decision_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Slave got join decision: %d\n", is_admitted);
- if (GNUNET_YES != is_admitted)
- { /* First join request is refused, retry. */
+ switch (test)
+ {
+ case TEST_SLAVE_JOIN_REJECT:
+ GNUNET_assert (0 == is_admitted);
GNUNET_assert (1 == join_req_count);
- slave_join ();
- return;
- }
+ GNUNET_SCHEDULER_add_now (schedule_slave_part, NULL);
+ break;
- struct GNUNET_PSYC_Channel *chn = GNUNET_PSYC_master_get_channel (mst);
- GNUNET_PSYC_channel_slave_add (chn, &slave_pub_key, 2, 2, &slave_add_cb, chn);
+ case TEST_SLAVE_JOIN_ACCEPT:
+ GNUNET_assert (1 == is_admitted);
+ GNUNET_assert (2 == join_req_count);
+ slave_add ();
+ break;
+
+ default:
+ GNUNET_break (0);
+ }
}
@@ -778,16 +819,16 @@ slave_connect_cb (void *cls, int result, uint64_t max_message_id)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Slave connected: %d, max_message_id: %" PRIu64 "\n",
result, max_message_id);
- GNUNET_assert (TEST_SLAVE_JOIN == test);
+ GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == test);
GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
}
static void
-slave_join ()
+slave_join (int t)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
- test = TEST_SLAVE_JOIN;
+ test = t;
struct GNUNET_PeerIdentity origin = this_peer;
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
@@ -870,7 +911,7 @@ master_start_cb (void *cls, int result, uint64_t max_message_id)
result, max_message_id);
GNUNET_assert (TEST_MASTER_START == test);
GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
- slave_join ();
+ slave_join (TEST_SLAVE_JOIN_REJECT);
}
diff --git a/src/util/client_manager.c b/src/util/client_manager.c
index f38d0f8864..2fd52705e7 100644
--- a/src/util/client_manager.c
+++ b/src/util/client_manager.c
@@ -196,6 +196,9 @@ recv_message (void *cls, const struct GNUNET_MessageHeader *msg)
mgr->client_tmit = NULL;
}
+ if (GNUNET_YES == mgr->is_disconnecting)
+ return;
+
size_t i = 0;
while (NULL != mgr->handlers[i].callback)
{