diff options
-rw-r--r-- | src/include/gnunet_protocols.h | 2 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 43 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 10 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 724 | ||||
-rw-r--r-- | src/psyc/psyc.h | 29 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 4 | ||||
-rw-r--r-- | src/psyc/psyc_common.c | 38 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 13 | ||||
-rw-r--r-- | src/psycstore/psycstore_api.c | 4 |
9 files changed, 592 insertions, 275 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 715f918094..93965fd9cd 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2148,7 +2148,7 @@ extern "C" /** * C: client * S: service - * M: muticast + * M: multicast */ /** S->C: result of an operation */ diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 018f012f49..928e05242f 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -167,7 +167,23 @@ enum GNUNET_PSYC_MessageFlags /** * Request from slave to master. */ - GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1 + GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1, + + /** + * Message can be delivered out of order. + */ + GNUNET_PSYC_MESSAGE_ORDER_ANY = 1 << 2 +}; + + +/** + * Values for the @a state_delta field of GNUNET_PSYC_MessageHeader. + */ +enum GNUNET_PSYC_StateDeltaValues +{ + GNUNET_PSYC_STATE_RESET = 0, + + GNUNET_PSYC_STATE_NOT_MODIFIED = UINT64_MAX }; @@ -175,6 +191,8 @@ GNUNET_NETWORK_STRUCT_BEGIN /** * Header of a PSYC message. + * + * Only present when receiving a message. */ struct GNUNET_PSYC_MessageHeader { @@ -223,6 +241,12 @@ struct GNUNET_PSYC_MessageMethod */ uint32_t flags GNUNET_PACKED; + /** + * Number of message IDs since the last message that contained state + * operations. @see enum GNUNET_PSYC_StateDeltaValues + */ + uint64_t state_delta GNUNET_PACKED; + /* Followed by NUL-terminated method name. */ }; @@ -479,22 +503,29 @@ typedef int enum GNUNET_PSYC_MasterTransmitFlags { GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0, + /** * Whether this message should reset the channel state, * i.e. remove all previously stored state variables. */ - GNUNET_PSYC_MASTER_TRANSMIT_RESET_STATE = 1 << 0, + + GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET = 1 << 0, /** - * Whether we need to increment the group generation counter after - * transmitting this message. + * Whether this message contains any state modifiers. */ - GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 1, + GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY = 1 << 1, /** * Add PSYC header variable with the hash of the current channel state. */ - GNUNET_PSYC_MASTER_TRANSMIT_ADD_STATE_HASH = 1 << 2 + GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH = 1 << 2, + + /** + * Whether we need to increment the group generation counter after + * transmitting this message. + */ + GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 3 }; diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index b23320c725..da81de4862 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -182,7 +182,7 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Calling origin's message callback " - "for a message of type %u and size %u.\n", + "with a message of type %u and size %u.\n", ntohs (msg->type), ntohs (msg->size)); struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) grp; orig->message_cb (orig->cls, msg); @@ -190,8 +190,8 @@ message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash, else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling slave's message callback " - "for a message of type %u and size %u.\n", + "Calling member's message callback " + "with a message of type %u and size %u.\n", ntohs (msg->type), ntohs (msg->size)); struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) grp; mem->message_cb (mem->cls, msg); @@ -477,8 +477,8 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc msg->group_generation = mh->group_generation; /* FIXME: add fragment ID and signature in the service instead of here */ - msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++); - msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset); + msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++); + msg->fragment_offset = GNUNET_htonll (mh->fragment_offset); mh->fragment_offset += sizeof (*msg) + buf_size; msg->purpose.size = htonl (sizeof (*msg) + buf_size - sizeof (msg->header) diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index fb3aa65f7c..3a29c8ffdc 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -73,19 +73,21 @@ struct TransmitMessage struct TransmitMessage *next; /** - * Buffer with message to be transmitted. + * ID assigned to the message. */ - char *buf; + uint64_t id; /** * Size of @a buf */ - uint16_t size -; + uint16_t size; + /** * @see enum MessageState */ uint8_t state; + + /* Followed by message */ }; @@ -100,9 +102,9 @@ static struct GNUNET_CONTAINER_MultiHashMap *recv_cache; /** * Entry in the chan_msgs hashmap of @a recv_cache: - * fragment_id -> FragmentEntry + * fragment_id -> RecvCacheEntry */ -struct FragmentEntry +struct RecvCacheEntry { struct GNUNET_MULTICAST_MessageHeader *mmsg; uint16_t ref_count; @@ -110,20 +112,48 @@ struct FragmentEntry /** - * Entry in the @a recv_msgs hash map of a @a Channel. - * message_id -> FragmentCache + * Entry in the @a recv_frags hash map of a @a Channel. + * message_id -> FragmentQueue */ -struct FragmentCache +struct FragmentQueue { /** - * Total size of header fragments (METHOD & MODIFIERs) + * Fragment IDs stored in @a recv_cache. + */ + struct GNUNET_CONTAINER_Heap *fragments; + + /** + * Total size of received fragments. + */ + uint64_t size; + + /** + * Total size of received header fragments (METHOD & MODIFIERs) */ uint64_t header_size; /** - * Fragment IDs stored in @a recv_cache. + * The @a state_delta field from struct GNUNET_PSYC_MessageMethod. */ - struct GNUNET_CONTAINER_Heap *fragments; + uint64_t state_delta; + + /** + * The @a flags field from struct GNUNET_PSYC_MessageMethod. + */ + uint32_t flags; + + /** + * Receive state of message. + * + * @see MessageFragmentState + */ + uint8_t state; + + /** + * Is the message queued for delivery to the client? + * i.e. added to the recv_msgs queue + */ + uint8_t queued; }; @@ -139,12 +169,17 @@ struct Channel /** * Received fragments not yet sent to the client. - * message_id -> FragmentCache + * message_id -> FragmentQueue + */ + struct GNUNET_CONTAINER_MultiHashMap *recv_frags; + + /** + * Received message IDs not yet sent to the client. */ - struct GNUNET_CONTAINER_MultiHashMap *recv_msgs; + struct GNUNET_CONTAINER_Heap *recv_msgs; /** - * FIXME + * FIXME: needed? */ GNUNET_SCHEDULER_TaskIdentifier tmit_task; @@ -159,6 +194,19 @@ struct Channel struct GNUNET_HashCode pub_key_hash; /** + * Last message ID sent to the client. + * 0 if there is no such message. + */ + uint64_t max_message_id; + + /** + * ID of the last stateful message, where the state operations has been + * processed and saved to PSYCstore and which has been sent to the client. + * 0 if there is no such message. + */ + uint64_t max_state_message_id; + + /** * Expected value size for the modifier being received from the PSYC service. */ uint32_t tmit_mod_value_size_expected; @@ -174,7 +222,7 @@ struct Channel uint8_t tmit_state; /** - * FIXME + * FIXME: needed? */ uint8_t in_transmit; @@ -221,7 +269,7 @@ struct Master struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; /** - * Maximum message ID for this channel. + * Last message ID transmitted to this channel. * * Incremented before sending a message, thus the message_id in messages sent * starts from 1. @@ -229,13 +277,13 @@ struct Master uint64_t max_message_id; /** - * ID of the last message that contains any state operations. + * ID of the last message with state operations transmitted to the channel. * 0 if there is no such message. */ uint64_t max_state_message_id; /** - * Maximum group generation for this channel. + * Maximum group generation transmitted to the channel. */ uint64_t max_group_generation; @@ -292,11 +340,6 @@ struct Slave struct GNUNET_MessageHeader *join_req; /** - * Maximum message ID for this channel. - */ - uint64_t max_message_id; - - /** * Maximum request ID for this channel. */ uint64_t max_request_id; @@ -304,7 +347,7 @@ struct Slave static inline void -transmit_message (struct Channel *ch, uint8_t inc_msg_id); +transmit_message (struct Channel *ch); /** @@ -386,7 +429,7 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) /* Send pending messages to multicast before cleanup. */ if (NULL != ch->tmit_head) { - transmit_message (ch, GNUNET_NO); + transmit_message (ch); } else { @@ -484,6 +527,7 @@ static inline void hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n) { /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int */ + /* TODO: use built-in byte swap functions if available */ n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) & 0x00FF00FF00FF00FFULL); n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 0x0000FFFF0000FFFFULL); @@ -512,29 +556,40 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) } +/** + * Insert a multicast message fragment into the queue belonging to the message. + * + * @param ch Channel. + * @param mmsg Multicast message fragment. + * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. + * @param first_ptype First PSYC message part type in @a mmsg. + * @param last_ptype Last PSYC message part type in @a mmsg. + */ static void -fragment_cache_insert (struct Channel *ch, - const struct GNUNET_HashCode *msg_id, - struct FragmentCache *frag_cache, +fragment_queue_insert (struct Channel *ch, const struct GNUNET_MULTICAST_MessageHeader *mmsg, - uint16_t last_part_type) + uint16_t first_ptype, uint16_t last_ptype) { - uint16_t size = ntohs (mmsg->header.size); + const uint16_t size = ntohs (mmsg->header.size); + const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, &ch->pub_key_hash); - if (NULL == frag_cache) + struct GNUNET_HashCode msg_id_hash; + hash_key_from_nll (&msg_id_hash, mmsg->message_id); + + struct FragmentQueue + *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); + + if (NULL == fragq) { - frag_cache = GNUNET_new (struct FragmentCache); - frag_cache->fragments + fragq = GNUNET_new (struct FragmentQueue); + fragq->state = MSG_FRAG_STATE_HEADER; + fragq->fragments = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - if (NULL == ch->recv_msgs) - { - ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - } - GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache, + GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); if (NULL == chan_msgs) @@ -545,190 +600,335 @@ fragment_cache_insert (struct Channel *ch, } } - struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode); - hash_key_from_nll (frag_id, mmsg->fragment_id); - struct FragmentEntry - *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); - if (NULL == frag_entry) + struct GNUNET_HashCode frag_id_hash; + hash_key_from_nll (&frag_id_hash, mmsg->fragment_id); + struct RecvCacheEntry + *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); + if (NULL == cache_entry) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Adding message fragment to cache. " - "fragment_id: %" PRIu64 ", " - "header_size: %" PRIu64 " + %" PRIu64 ").\n", - ch, GNUNET_ntohll (mmsg->fragment_id), - frag_cache->header_size, size); - frag_entry = GNUNET_new (struct FragmentEntry); - frag_entry->ref_count = 1; - frag_entry->mmsg = GNUNET_malloc (size); - memcpy (frag_entry->mmsg, mmsg, size); - GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry, + "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " + "header_size: %" PRIu64 " + %u).\n", + ch, GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_id), + fragq->header_size, size); + cache_entry = GNUNET_new (struct RecvCacheEntry); + cache_entry->ref_count = 1; + cache_entry->mmsg = GNUNET_malloc (size); + memcpy (cache_entry->mmsg, mmsg, size); + GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } else { - frag_entry->ref_count++; + cache_entry->ref_count++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Message fragment already in cache. " - "fragment_id: %" PRIu64 ", ref_count: %u\n", - ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count); + "%p Message fragment is already in cache. " + "message_id: %" PRIu64 ", fragment_id: %" PRIu64 + ", ref_count: %u\n", + ch, GNUNET_ntohll (mmsg->message_id), + GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); + } + + if (MSG_FRAG_STATE_HEADER == fragq->state) + { + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + { + struct GNUNET_PSYC_MessageMethod * + pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1]; + fragq->state_delta = GNUNET_ntohll (pmeth->state_delta); + fragq->flags = ntohl (pmeth->flags); + } + + if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA) + { + fragq->header_size += size; + } + else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype + || frag_offset == fragq->header_size) + { /* header is now complete */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Header of message %" PRIu64 " is complete.\n", + ch, GNUNET_ntohll (mmsg->message_id)); + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Adding message %" PRIu64 " to queue.\n", + ch, GNUNET_ntohll (mmsg->message_id)); + fragq->state = MSG_FRAG_STATE_DATA; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Header of message %" PRIu64 " is NOT complete yet: " + "%" PRIu64 " != %" PRIu64 "\n", + ch, GNUNET_ntohll (mmsg->message_id), frag_offset, + fragq->header_size); + } } - switch (last_part_type) + switch (last_ptype) { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - frag_cache->header_size += size; + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: + if (frag_offset == fragq->size) + fragq->state = MSG_FRAG_STATE_END; + else + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Message %" PRIu64 " is NOT complete yet: " + "%" PRIu64 " != %" PRIu64 "\n", + ch, GNUNET_ntohll (mmsg->message_id), frag_offset, + fragq->size); + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: + /* Drop message without delivering to client if it's a single fragment */ + fragq->state = + (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + ? MSG_FRAG_STATE_DROP + : MSG_FRAG_STATE_CANCEL; + } + + switch (fragq->state) + { + case MSG_FRAG_STATE_DATA: + case MSG_FRAG_STATE_END: + case MSG_FRAG_STATE_CANCEL: + if (GNUNET_NO == fragq->queued) + { + GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL, + GNUNET_ntohll (mmsg->message_id)); + fragq->queued = GNUNET_YES; + } } - GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id, + + fragq->size += size; + GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL, GNUNET_ntohll (mmsg->fragment_id)); } +/** + * Run fragment queue of a message. + * + * Send fragments of a message in order to client, after all modifiers arrived + * from multicast. + * + * @param ch Channel. + * @param msg_id ID of the message @a fragq belongs to. + * @param fragq Fragment queue of the message. + * @param drop Drop message without delivering to client? + * #GNUNET_YES or #GNUNET_NO. + */ static void -fragment_cache_clear (struct Channel *ch, - const struct GNUNET_HashCode *msg_id, - struct FragmentCache *frag_cache, - uint8_t send_to_client) +fragment_queue_run (struct Channel *ch, uint64_t msg_id, + struct FragmentQueue *fragq, uint8_t drop) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Clearing message fragment cache.\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Running message fragment queue for message %" PRIu64 + " (state: %u).\n", + ch, msg_id, fragq->state); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, &ch->pub_key_hash); GNUNET_assert (NULL != chan_msgs); - struct GNUNET_HashCode *frag_id; + uint64_t frag_id; - while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments))) + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL, + &frag_id)) { - struct FragmentEntry - *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id); - if (frag_entry != NULL) + struct GNUNET_HashCode frag_id_hash; + hash_key_from_hll (&frag_id_hash, frag_id); + struct RecvCacheEntry *cache_entry + = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash); + if (cache_entry != NULL) { - if (GNUNET_YES == send_to_client) + if (GNUNET_NO == drop) { - message_to_client (ch, frag_entry->mmsg); + message_to_client (ch, cache_entry->mmsg); } - if (1 == frag_entry->ref_count) + if (cache_entry->ref_count <= 1) { - GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry); - GNUNET_free (frag_entry->mmsg); - GNUNET_free (frag_entry); + GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash, + cache_entry); + GNUNET_free (cache_entry->mmsg); + GNUNET_free (cache_entry); } else { - frag_entry->ref_count--; + cache_entry->ref_count--; } } - GNUNET_free (frag_id); +#if CACHE_AGING_IMPLEMENTED + else if (GNUNET_NO == drop) + { + /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */ + } +#endif + + GNUNET_CONTAINER_heap_remove_root (fragq->fragments); } - GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache); - GNUNET_CONTAINER_heap_destroy (frag_cache->fragments); - GNUNET_free (frag_cache); + if (MSG_FRAG_STATE_END <= fragq->state) + { + struct GNUNET_HashCode msg_id_hash; + hash_key_from_nll (&msg_id_hash, msg_id); + + GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq); + GNUNET_CONTAINER_heap_destroy (fragq->fragments); + GNUNET_free (fragq); + } + else + { + fragq->queued = GNUNET_NO; + } } /** - * Incoming message fragment from multicast. + * Run message queue. * - * Store it using PSYCstore and send it to the client of the channel. + * Send messages in queue to client in order after a message has arrived from + * multicast, according to the following: + * - A message is only sent if all of its modifiers arrived. + * - A stateful message is only sent if the previous stateful message + * has already been delivered to the client. + * + * @param ch Channel. + * @return Number of messages removed from queue and sent to client. */ -static void -message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +static uint64_t +message_queue_run (struct Channel *ch) { - struct Channel *ch = cls; - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message of type %u and size %u from multicast.\n", - ch, type, size); - - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Running message queue.\n", ch); + uint64_t n = 0; + uint64_t msg_id; + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, + &msg_id)) { - GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, - (const struct - GNUNET_MULTICAST_MessageHeader *) msg, - 0, NULL, NULL); - -#if TODO - /* FIXME: apply modifiers to state in PSYCstore */ - GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, - GNUNET_ntohll (mmsg->message_id), - meth->mod_count, mods, - rcb, rcb_cls); -#endif - - const struct GNUNET_MULTICAST_MessageHeader - *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id); + struct GNUNET_HashCode msg_id_hash; + hash_key_from_hll (&msg_id_hash, msg_id); - uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg), - (const char *) &mmsg[1]); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Last message part type %u\n", ptype); + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); - if (GNUNET_NO == ptype) + if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Received message with invalid parts from multicast. " - "Dropping message.\n", ch); - GNUNET_break_op (0); + "%p No fragq (%p) or header not complete.\n", + ch, fragq); break; } - struct GNUNET_HashCode msg_id; - hash_key_from_nll (&msg_id, mmsg->message_id); - - struct FragmentCache *frag_cache = NULL; - if (NULL != ch->recv_msgs) - frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id); - - switch (ptype) + if (MSG_FRAG_STATE_HEADER == fragq->state) { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - /* FIXME: check state flag / max_state_message_id */ - if (NULL == frag_cache) + /* Check if there's a missing message before the current one */ + if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) { - message_to_client (ch, mmsg); - break; + if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) + && msg_id - 1 != ch->max_message_id) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order message. " + "(%" PRIu64 " - 1 != %" PRIu64 ")\n", + ch, msg_id, ch->max_message_id); + break; + } } else { - if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size) - { /* first data fragment after the header, send cached fragments */ - fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES); - message_to_client (ch, mmsg); + if (msg_id - fragq->state_delta != ch->max_state_message_id) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Out of order stateful message. " + "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", + ch, msg_id, fragq->state_delta, ch->max_state_message_id); break; } - else - { /* still missing fragments from the header, cache data fragment */ - /* fall thru */ - } +#if TODO + /* FIXME: apply modifiers to state in PSYCstore */ + GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id, + state_modify_result_cb, cls); +#endif + ch->max_state_message_id = msg_id; } + ch->max_message_id = msg_id; + } + fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); + n++; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Removed %" PRIu64 " messages from queue.\n", ch, n); + return n; +} - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - /* not all modifiers arrived yet, cache fragment */ - fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - if (NULL != frag_cache) - { /* fragments not yet sent to client, remove from cache */ - fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO); - } - else - { - message_to_client (ch, mmsg); - } - break; - } +/** + * Handle incoming message from multicast. + * + * @param ch Channel. + * @param mmsg Multicast message. + * + * @return #GNUNET_OK or #GNUNET_SYSERR + */ +static int +handle_multicast_message (struct Channel *ch, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) +{ + GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); + + uint16_t size = ntohs (mmsg->header.size); + uint16_t first_ptype = 0, last_ptype = 0; + + if (GNUNET_SYSERR + == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), + (const char *) &mmsg[1], + &first_ptype, &last_ptype)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Received message with invalid parts from multicast. " + "Dropping message.\n", ch); + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message parts: first: type %u, last: type %u\n", + first_ptype, last_ptype); + + fragment_queue_insert (ch, mmsg, first_ptype, last_ptype); + message_queue_run (ch); + + return GNUNET_OK; +} + + +/** + * Incoming message fragment from multicast. + * + * Store it using PSYCstore and send it to the client of the channel. + */ +static void +message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct Channel *ch = cls; + uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received message of type %u and size %u from multicast.\n", + ch, type, size); + + switch (type) + { + case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: + { + handle_multicast_message (ch, (const struct + GNUNET_MULTICAST_MessageHeader *) msg); break; } default: @@ -770,8 +970,9 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, = (const struct GNUNET_MULTICAST_RequestHeader *) msg; /* FIXME: see message_cb() */ - if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req), - (const char *) &req[1])) + if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req), + (const char *) &req[1], + NULL, NULL)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Dropping message with invalid parts " @@ -825,7 +1026,8 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, if (GNUNET_OK == result || GNUNET_NO == result) { mst->max_message_id = max_message_id; - mst->max_state_message_id = max_state_message_id; + ch->max_message_id = max_message_id; + ch->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; mst->origin = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, @@ -860,7 +1062,8 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, if (GNUNET_OK == result || GNUNET_NO == result) { - slv->max_message_id = max_message_id; + ch->max_message_id = max_message_id; + ch->max_state_message_id = max_state_message_id; slv->member = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key, &slv->origin, @@ -879,6 +1082,15 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, } +static void +channel_init (struct Channel *ch) +{ + ch->recv_msgs + = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); + ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); +} + + /** * Handle a connecting client starting a channel master. */ @@ -888,14 +1100,18 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, { const struct MasterStartRequest *req = (const struct MasterStartRequest *) msg; + struct Master *mst = GNUNET_new (struct Master); + mst->policy = ntohl (req->policy); + mst->priv_key = req->channel_key; + struct Channel *ch = &mst->channel; ch->client = client; ch->is_master = GNUNET_YES; - mst->policy = ntohl (req->policy); - mst->priv_key = req->channel_key; GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key); GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash); + channel_init (ch); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Master connected to channel %s.\n", mst, GNUNET_h2s (&ch->pub_key_hash)); @@ -919,13 +1135,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, const struct SlaveJoinRequest *req = (const struct SlaveJoinRequest *) msg; struct Slave *slv = GNUNET_new (struct Slave); - struct Channel *ch = &slv->channel; - slv->channel.client = client; - slv->channel.is_master = GNUNET_NO; slv->slave_key = req->slave_key; - ch->pub_key = req->channel_key; - GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), - &ch->pub_key_hash); slv->origin = req->origin; slv->relay_count = ntohl (req->relay_count); if (0 < slv->relay_count) @@ -939,6 +1149,14 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); } + struct Channel *ch = &slv->channel; + ch->client = client; + ch->is_master = GNUNET_NO; + ch->pub_key = req->channel_key; + GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), + &ch->pub_key_hash); + channel_init (ch); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Slave connected to channel %s.\n", slv, GNUNET_h2s (&ch->pub_key_hash)); @@ -991,7 +1209,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); *data_size = tmit_msg->size; - memcpy (data, tmit_msg->buf, *data_size); + memcpy (data, &tmit_msg[1], *data_size); GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); GNUNET_free (tmit_msg); @@ -1003,7 +1221,7 @@ transmit_notify (void *cls, size_t *data_size, void *data) { if (NULL != ch->tmit_head) { - transmit_message (ch, GNUNET_NO); + transmit_message (ch); } else if (ch->disconnected) { @@ -1054,14 +1272,12 @@ slave_transmit_notify (void *cls, size_t *data_size, void *data) * Transmit a message from a channel master to the multicast group. */ static void -master_transmit_message (struct Master *mst, uint8_t inc_msg_id) +master_transmit_message (struct Master *mst) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); mst->channel.tmit_task = 0; if (NULL == mst->tmit_handle) { - if (GNUNET_YES == inc_msg_id) - mst->max_message_id++; mst->tmit_handle = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, mst->max_group_generation, @@ -1078,13 +1294,11 @@ master_transmit_message (struct Master *mst, uint8_t inc_msg_id) * Transmit a message from a channel slave to the multicast group. */ static void -slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) +slave_transmit_message (struct Slave *slv) { slv->channel.tmit_task = 0; if (NULL == slv->tmit_handle) { - if (GNUNET_YES == inc_msg_id) - slv->max_message_id++; slv->tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id, slave_transmit_notify, slv); @@ -1097,29 +1311,94 @@ slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id) static inline void -transmit_message (struct Channel *ch, uint8_t inc_msg_id) +transmit_message (struct Channel *ch) { ch->is_master - ? master_transmit_message ((struct Master *) ch, inc_msg_id) - : slave_transmit_message ((struct Slave *) ch, inc_msg_id); + ? master_transmit_message ((struct Master *) ch) + : slave_transmit_message ((struct Slave *) ch); } +/** + * Queue a message from a channel master for sending to the multicast group. + */ static void -transmit_error (struct Channel *ch) +master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg, + uint16_t first_ptype, uint16_t last_ptype) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst); + + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + { + tmit_msg->id = ++mst->max_message_id; + struct GNUNET_PSYC_MessageMethod *pmeth + = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; + + if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET) + { + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET); + } + else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY) + { + pmeth->state_delta = GNUNET_htonll (tmit_msg->id + - mst->max_state_message_id); + } + else + { + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); + } + } +} + + +/** + * Queue a message from a channel slave for sending to the multicast group. + */ +static void +slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, + uint16_t first_ptype, uint16_t last_ptype) +{ + if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype) + { + struct GNUNET_PSYC_MessageMethod *pmeth + = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1]; + pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED); + tmit_msg->id = ++slv->max_request_id; + } +} + + +static void +queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg, + uint16_t first_ptype, uint16_t last_ptype) { - struct GNUNET_MessageHeader *msg; - struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) - + sizeof (*msg)); - msg = (struct GNUNET_MessageHeader *) &tmit_msg[1]; - msg->size = ntohs (sizeof (*msg)); - msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - - tmit_msg->buf = (char *) &tmit_msg[1]; - tmit_msg->size = sizeof (*msg); + uint16_t size = ntohs (msg->size) - sizeof (*msg); + struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); + memcpy (&tmit_msg[1], &msg[1], size); + tmit_msg->size = size; tmit_msg->state = ch->tmit_state; + GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - transmit_message (ch, GNUNET_NO); + + ch->is_master + ? master_queue_message ((struct Master *) ch, tmit_msg, + first_ptype, last_ptype) + : slave_queue_message ((struct Slave *) ch, tmit_msg, + first_ptype, last_ptype); +} + + +static void +transmit_error (struct Channel *ch) +{ + uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; + + struct GNUNET_MessageHeader msg; + msg.size = ntohs (sizeof (msg)); + msg.type = ntohs (type); + + queue_message (ch, &msg, type, type); + transmit_message (ch); /* FIXME: cleanup */ } @@ -1136,6 +1415,10 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Received message from client.\n", ch); + GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); + if (GNUNET_YES != ch->ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -1145,10 +1428,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, return; } - uint8_t inc_msg_id = GNUNET_NO; uint16_t size = ntohs (msg->size); - uint16_t psize = 0, ptype = 0, pos = 0; - if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); @@ -1158,42 +1438,22 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message from client.\n", ch); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); - - for (pos = 0; sizeof (*msg) + pos < size; pos += psize) + uint16_t first_ptype = 0, last_ptype = 0; + if (GNUNET_SYSERR + == GNUNET_PSYC_check_message_parts (size - sizeof (*msg), + (const char *) &msg[1], + &first_ptype, &last_ptype)) { - const struct GNUNET_MessageHeader *pmsg - = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - psize = ntohs (pmsg->size); - ptype = ntohs (pmsg->type); - if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p Received invalid message part of type %u and size %u " - "from client.\n", ch, ptype, psize); - GNUNET_break (0); - transmit_error (ch); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message part from client.\n", ch); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); - - if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype) - inc_msg_id = GNUNET_YES; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "%p Received invalid message part from client.\n", ch); + GNUNET_break (0); + transmit_error (ch); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; } - size -= sizeof (*msg); - struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size); - tmit_msg->buf = (char *) &tmit_msg[1]; - memcpy (tmit_msg->buf, &msg[1], size); - tmit_msg->size = size; - tmit_msg->state = ch->tmit_state; - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - transmit_message (ch, inc_msg_id); + queue_message (ch, msg, first_ptype, last_ptype); + transmit_message (ch); GNUNET_SERVER_receive_done (client, GNUNET_OK); }; diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 582a8e168b..f2d3865481 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -31,8 +31,9 @@ #include "gnunet_psyc_service.h" -uint16_t -GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data); +int +GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data, + uint16_t *first_ptype, uint16_t *last_ptype); void GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, @@ -41,14 +42,26 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, enum MessageState { - MSG_STATE_START = 0, - MSG_STATE_HEADER = 1, - MSG_STATE_METHOD = 2, + MSG_STATE_START = 0, + MSG_STATE_HEADER = 1, + MSG_STATE_METHOD = 2, MSG_STATE_MODIFIER = 3, MSG_STATE_MOD_CONT = 4, - MSG_STATE_DATA = 5, - MSG_STATE_END = 6, - MSG_STATE_CANCEL = 7, + MSG_STATE_DATA = 5, + MSG_STATE_END = 6, + MSG_STATE_CANCEL = 7, + MSG_STATE_ERROR = 8, +}; + + +enum MessageFragmentState +{ + MSG_FRAG_STATE_START = 0, + MSG_FRAG_STATE_HEADER = 1, + MSG_FRAG_STATE_DATA = 2, + MSG_FRAG_STATE_END = 3, + MSG_FRAG_STATE_CANCEL = 4, + MSG_FRAG_STATE_DROP = 5, }; diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 16e8106d45..22f1da0695 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -1502,7 +1502,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, slvadd = (struct ChannelSlaveAdd *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) slvadd; - slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD; + slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); slvadd->header.size = htons (sizeof (*slvadd)); slvadd->announced_at = GNUNET_htonll (announced_at); slvadd->effective_since = GNUNET_htonll (effective_since); @@ -1544,7 +1544,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, slvrm = (struct ChannelSlaveRemove *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) slvrm; - slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM; + slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); slvrm->header.size = htons (sizeof (*slvrm)); slvrm->announced_at = GNUNET_htonll (announced_at); GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, diff --git a/src/psyc/psyc_common.c b/src/psyc/psyc_common.c index bf5643ff23..74729aca27 100644 --- a/src/psyc/psyc_common.c +++ b/src/psyc/psyc_common.c @@ -30,36 +30,48 @@ /** * Check if @a data contains a series of valid message parts. * - * @param data_size Size of @a data. - * @param data Data. + * @param data_size Size of @a data. + * @param data Data. + * @param[out] first_ptype Type of first message part. + * @param[out] last_ptype Type of last message part. * - * @return Message type number - * or GNUNET_NO if the message contains invalid or no parts. + * @return Number of message parts found in @a data. + * or GNUNET_SYSERR if the message contains invalid parts. */ -uint16_t -GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data) +int +GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data, + uint16_t *first_ptype, uint16_t *last_ptype) { const struct GNUNET_MessageHeader *pmsg; - uint16_t ptype = GNUNET_NO; - uint16_t psize = 0; - uint16_t pos = 0; + uint16_t parts = 0, ptype = 0, psize = 0, pos = 0; + if (NULL != first_ptype) + *first_ptype = 0; + if (NULL != last_ptype) + *last_ptype = 0; - for (pos = 0; pos < data_size; pos += psize) + for (pos = 0; pos < data_size; pos += psize, parts++) { pmsg = (const struct GNUNET_MessageHeader *) (data + pos); psize = ntohs (pmsg->size); ptype = ntohs (pmsg->type); - if (psize < sizeof (*pmsg) || pos + psize > data_size + if (0 == parts && NULL != first_ptype) + *first_ptype = ptype; + if (NULL != last_ptype + && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END) + *last_ptype = ptype; + if (psize < sizeof (*pmsg) + || pos + psize > data_size || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Invalid message part of type %u and size %u.\n", ptype, psize); - return GNUNET_NO; + return GNUNET_SYSERR; } + /* FIXME: check message part order */ } - return ptype; + return parts; } diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 360d56c064..f58ecb7f67 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -35,9 +35,9 @@ #include "gnunet_env_lib.h" #include "gnunet_psyc_service.h" -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) -#define DEBUG_SERVICE 1 +#define DEBUG_SERVICE 0 /** @@ -120,7 +120,7 @@ cleanup () /** - * Terminate the testcase (failure). + * Terminate the test case (failure). * * @param cls NULL * @param tc scheduler context @@ -134,7 +134,7 @@ end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** - * Terminate the testcase (success). + * Terminate the test case (success). * * @param cls NULL * @param tc scheduler context @@ -148,7 +148,7 @@ end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** - * Finish the testcase (successfully). + * Finish the test case (successfully). */ static void end () @@ -518,7 +518,8 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, - &master_message, &join_request, &master_started, NULL); + &master_message, &join_request, + &master_started, NULL); } diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c index 88ae1185b9..a5fdea10b8 100644 --- a/src/psycstore/psycstore_api.c +++ b/src/psycstore/psycstore_api.c @@ -1099,9 +1099,9 @@ GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, req->message_id = GNUNET_htonll (message_id); req->name_size = htons (name_size); req->flags - = 0 == i + = (0 == i) ? STATE_OP_FIRST - : modifier_count - 1 == i + : (modifier_count - 1 == i) ? STATE_OP_LAST : 0; |