diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-26 10:06:52 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-26 10:06:52 +0000 |
commit | 23479fc50d94f0c29cad3b92fe8fc53e358d4025 (patch) | |
tree | 240efe571ce83c65ac27b5d885c6c2a71e61117f /src | |
parent | f5a3f1dc90c9949c8c426f2cb2e822603b137dae (diff) |
- fixed tunnel context
- moved logic out of specific operations
Diffstat (limited to 'src')
-rw-r--r-- | src/set/gnunet-service-set.c | 178 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 173 | ||||
-rw-r--r-- | src/set/gnunet-service-set_intersection.c | 19 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 231 | ||||
-rw-r--r-- | src/set/set_protocol.h | 30 |
5 files changed, 351 insertions, 280 deletions
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index cfc0068aba..a85093bcd7 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -28,6 +28,55 @@ /** + * Peer that has connected to us, but is not yet evaluating a set operation. + * Once the peer has sent a request, and the client has + * accepted or rejected it, this information will be deleted. + */ +struct Incoming +{ + /** + * Incoming peers are held in a linked list + */ + struct Incoming *next; + + /** + * Incoming peers are held in a linked list + */ + struct Incoming *prev; + + /** + * Detail information about the operation. + */ + struct OperationSpecification *spec; + + /** + * The identity of the requesting peer. + */ + struct GNUNET_PeerIdentity peer; + + /** + * Tunnel to the peer. + */ + struct GNUNET_MESH_Tunnel *tunnel; + + /** + * Unique request id for the request from + * a remote peer, sent to the client, which will + * accept or reject the request. + * Set to '0' iff the request has not been + * suggested yet. + */ + uint32_t suggest_id; + + /** + * Timeout task, if the incoming peer has not been accepted + * after the timeout, it will be disconnected. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; +}; + + +/** * Configuration of our local peer. * (Not declared 'static' as also needed in gnunet-service-set_union.c) */ @@ -77,7 +126,7 @@ static struct Incoming *incoming_tail; * used to identify incoming operation requests from remote peers, * that the client can choose to accept or refuse. */ -static uint32_t accept_id = 1; +static uint32_t suggest_id = 1; /** @@ -131,7 +180,7 @@ get_incoming (uint32_t id) struct Incoming *incoming; for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) - if (incoming->accept_id == id) + if (incoming->suggest_id == id) return incoming; return NULL; } @@ -145,6 +194,14 @@ get_incoming (uint32_t id) static void listener_destroy (struct Listener *listener) { + /* If the client is not dead yet, destroy it. + * The client's destroy callback will destroy the listener again. */ + if (NULL != listener->client) + { + GNUNET_SERVER_client_disconnect (listener->client); + listener->client = NULL; + return; + } if (NULL != listener->client_mq) { GNUNET_MQ_destroy (listener->client_mq); @@ -163,6 +220,14 @@ listener_destroy (struct Listener *listener) static void set_destroy (struct Set *set) { + /* If the client is not dead yet, destroy it. + * The client's destroy callback will destroy the set again. */ + if (NULL != set->client) + { + GNUNET_SERVER_client_disconnect (set->client); + set->client = NULL; + return; + } switch (set->operation) { case GNUNET_SET_OPERATION_INTERSECTION: @@ -195,10 +260,16 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) set = set_get (client); if (NULL != set) + { + set->client = NULL; set_destroy (set); + } listener = listener_get (client); if (NULL != listener) + { + listener->client = NULL; listener_destroy (listener); + } } @@ -210,6 +281,13 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) static void incoming_destroy (struct Incoming *incoming) { + if (NULL != incoming->tunnel) + { + struct GNUNET_MESH_Tunnel *t = incoming->tunnel; + incoming->tunnel = NULL; + GNUNET_MESH_tunnel_destroy (t); + return; + } GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming); GNUNET_free (incoming); } @@ -246,16 +324,17 @@ incoming_suggest (struct Incoming *incoming, struct Listener *listener) struct GNUNET_MQ_Envelope *mqm; struct GNUNET_SET_RequestMessage *cmsg; - GNUNET_assert (GNUNET_NO == incoming->suggested); - incoming->suggested = GNUNET_YES; + GNUNET_assert (0 == incoming->suggest_id); + GNUNET_assert (NULL != incoming->spec); + incoming->suggest_id = suggest_id++; GNUNET_SCHEDULER_cancel (incoming->timeout_task); mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, - incoming->context_msg); + incoming->spec->context_msg); GNUNET_assert (NULL != mqm); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->accept_id); - cmsg->accept_id = htonl (incoming->accept_id); - cmsg->peer_id = incoming->tc->peer; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id %u\n", incoming->suggest_id); + cmsg->accept_id = htonl (incoming->suggest_id); + cmsg->peer_id = incoming->spec->peer; GNUNET_MQ_send (listener->client_mq, mqm); } @@ -280,6 +359,7 @@ handle_p2p_operation_request (void *cls, struct Incoming *incoming; const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; struct Listener *listener; + struct OperationSpecification *spec; if (CONTEXT_INCOMING != tc->type) { @@ -289,21 +369,27 @@ handle_p2p_operation_request (void *cls, return GNUNET_SYSERR; } - incoming = tc->data; + incoming = tc->data.incoming; - if (GNUNET_YES == incoming->received_request) + if (NULL != incoming->spec) { /* double operation request */ GNUNET_break_op (0); return GNUNET_SYSERR; } - incoming->accept_id = accept_id++; - incoming->context_msg = + spec = GNUNET_new (struct OperationSpecification); + spec->context_msg = GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg)); + spec->operation = ntohl (msg->operation); + spec->app_id = msg->app_id; + spec->salt = ntohl (msg->salt); + spec->peer = incoming->peer; + + incoming->spec = spec; - if ( (NULL != incoming->context_msg) && - (ntohs (incoming->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) + if ( (NULL != spec->context_msg) && + (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) ) { GNUNET_break_op (0); return GNUNET_SYSERR; @@ -405,12 +491,12 @@ handle_client_listen (void *cls, listener->operation, GNUNET_h2s (&listener->app_id)); for (incoming = incoming_head; NULL != incoming; incoming = incoming->next) { - if ( (GNUNET_NO == incoming->received_request) || - (GNUNET_YES == incoming->suggested) ) + if ( (NULL == incoming->spec) || + (0 != incoming->suggest_id) ) continue; - if (listener->operation != incoming->operation) + if (listener->operation != incoming->spec->operation) continue; - if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->app_id)) + if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->spec->app_id)) continue; incoming_suggest (incoming, listener); } @@ -483,8 +569,7 @@ handle_client_reject (void *cls, return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); - /* the incoming peer will be destroyed in the tunnel end handler */ - GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel); + GNUNET_MESH_tunnel_destroy (incoming->tunnel); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -542,6 +627,10 @@ handle_client_evaluate (void *cls, const struct GNUNET_MessageHeader *m) { struct Set *set; + struct TunnelContext *tc; + struct GNUNET_MESH_Tunnel *tunnel; + struct GNUNET_SET_EvaluateMessage *msg; + struct OperationSpecification *spec; set = set_get (client); if (NULL == set) @@ -551,13 +640,27 @@ handle_client_evaluate (void *cls, return; } + msg = (struct GNUNET_SET_EvaluateMessage *) m; + tc = GNUNET_new (struct TunnelContext); + spec = GNUNET_new (struct OperationSpecification); + spec->operation = set->operation; + spec->app_id = msg->app_id; + spec->salt = ntohl (msg->salt); + spec->peer = msg->target_peer; + + tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer, + GNUNET_APPLICATION_TYPE_SET); + switch (set->operation) { case GNUNET_SET_OPERATION_INTERSECTION: + tc->type = CONTEXT_OPERATION_INTERSECTION; //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); break; case GNUNET_SET_OPERATION_UNION: - _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); + tc->type = CONTEXT_OPERATION_UNION; + tc->data.union_op = + _GSS_union_evaluate (spec, tunnel); break; default: GNUNET_assert (0); @@ -601,6 +704,9 @@ handle_client_accept (void *cls, struct Set *set; struct Incoming *incoming; struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; + struct GNUNET_MESH_Tunnel *tunnel; + struct TunnelContext *tc; + struct OperationSpecification *spec; incoming = get_incoming (ntohl (msg->accept_reject_id)); @@ -623,13 +729,20 @@ handle_client_accept (void *cls, return; } + tc = GNUNET_new (struct TunnelContext); + tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &incoming->spec->peer, + GNUNET_APPLICATION_TYPE_SET); + spec = GNUNET_new (struct OperationSpecification); + switch (set->operation) { case GNUNET_SET_OPERATION_INTERSECTION: + tc->type = CONTEXT_OPERATION_INTERSECTION; // _GSS_intersection_accept (msg, set, incoming); break; case GNUNET_SET_OPERATION_UNION: - _GSS_union_accept (msg, set, incoming); + tc->type = CONTEXT_OPERATION_UNION; + tc->data.union_op = _GSS_union_accept (spec, tunnel); break; default: GNUNET_assert (0); @@ -719,11 +832,9 @@ tunnel_new_cb (void *cls, GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET); tc = GNUNET_new (struct TunnelContext); incoming = GNUNET_new (struct Incoming); - incoming->tc = tc; - tc->peer = *initiator; - tc->tunnel = tunnel; - tc->mq = GNUNET_MESH_mq_create (tunnel); - tc->data = incoming; + incoming->peer = *initiator; + incoming->tunnel = tunnel; + tc->data.incoming = incoming; tc->type = CONTEXT_INCOMING; incoming->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, incoming_timeout_cb, incoming); @@ -750,22 +861,13 @@ tunnel_end_cb (void *cls, { struct TunnelContext *ctx = tunnel_ctx; - /* tunnel is dead already */ - ctx->tunnel = NULL; - - if (NULL != ctx->mq) - { - GNUNET_MQ_destroy (ctx->mq); - ctx->mq = NULL; - } - switch (ctx->type) { case CONTEXT_INCOMING: - incoming_destroy ((struct Incoming *) ctx->data); + incoming_destroy (ctx->data.incoming); break; case CONTEXT_OPERATION_UNION: - _GSS_union_operation_destroy ((struct UnionEvaluateOperation *) ctx->data); + _GSS_union_operation_destroy (ctx->data.union_op); break; case CONTEXT_OPERATION_INTERSECTION: GNUNET_assert (0); diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index 533fd0ef72..574b343d6c 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -42,14 +42,22 @@ struct IntersectionState; +/* FIXME: cfuchs */ +struct IntersectionOperation; + + /** * Extra state required for set union. */ struct UnionState; +/** + * State of a union operation being evaluated. + */ struct UnionEvaluateOperation; + /** * A set that supports a specific operation * with other peers. @@ -94,6 +102,50 @@ struct Set /** + * Detail information about an operation. + */ +struct OperationSpecification +{ + /** + * The type of the operation. + */ + enum GNUNET_SET_OperationType operation; + + /** + * The remove peer we evaluate the operation with + */ + struct GNUNET_PeerIdentity peer; + + /** + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. + */ + struct GNUNET_HashCode app_id; + + /** + * Context message, may be NULL. + */ + struct GNUNET_MessageHeader *context_msg; + + /** + * Salt to use for the operation. + */ + uint32_t salt; + + /** + * ID used to identify responses to a client. + */ + uint32_t client_request_id; + + /** + * Set associated with the operation, NULL until the spec has been associated + * with a set. + */ + struct Set *set; +}; + + +/** * A listener is inhabited by a client, and * waits for evaluation requests from remote peers. */ @@ -121,12 +173,13 @@ struct Listener struct GNUNET_MQ_Handle *client_mq; /** - * Type of operation supported for this set + * The type of the operation. */ enum GNUNET_SET_OperationType operation; /** - * Application id of intereset for this listener. + * Application ID for the operation, used to distinguish + * multiple operations of the same type with the same peer. */ struct GNUNET_HashCode app_id; }; @@ -137,79 +190,51 @@ struct Listener * Once the peer has sent a request, and the client has * accepted or rejected it, this information will be deleted. */ -struct Incoming -{ - /** - * Incoming peers are held in a linked list - */ - struct Incoming *next; - - /** - * Incoming peers are held in a linked list - */ - struct Incoming *prev; +struct Incoming; - /** - * Tunnel context, stores information about - * the tunnel and its peer. - */ - struct TunnelContext *tc; - - /** - * GNUNET_YES if the incoming peer has sent - * an operation request (and we are waiting - * for the client to ack/nack), GNUNET_NO otherwise. - */ - int received_request; +/** + * Different types a tunnel can be. + */ +enum TunnelContextType { /** - * App code, set once the peer has - * requested an operation + * Tunnel is waiting for a set request from the tunnel, + * or for the ack/nack of the client for a received request. */ - struct GNUNET_HashCode app_id; + CONTEXT_INCOMING, /** - * Context message, set once the peer - * has requested an operation. + * The tunnel performs a union operation. */ - struct GNUNET_MessageHeader *context_msg; + CONTEXT_OPERATION_UNION, /** - * Salt the peer has requested to use for the - * operation + * The tunnel performs an intersection operation. */ - uint16_t salt; + CONTEXT_OPERATION_INTERSECTION, +}; - /** - * Operation the other peer wants to do - */ - enum GNUNET_SET_OperationType operation; +/** + * State associated with the tunnel, dependent on + * tunnel type. + */ +union TunnelContextData +{ /** - * Has the incoming request been suggested to - * a client listener yet? + * Valid for tag 'CONTEXT_INCOMING' */ - int suggested; + struct Incoming *incoming; /** - * Unique request id for the request from - * a remote peer, sent to the client, which will - * accept or reject the request. + * Valid for tag 'CONTEXT_OPERATION_UNION' */ - uint32_t accept_id; + struct UnionEvaluateOperation *union_op; /** - * Timeout task, if the incoming peer has not been accepted - * after the timeout, it will be disconnected. + * Valid for tag 'CONTEXT_OPERATION_INTERSECTION' */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; -}; - - -enum TunnelContextType { - CONTEXT_INCOMING, - CONTEXT_OPERATION_UNION, - CONTEXT_OPERATION_INTERSECTION, + struct IntersectionEvaluateOperation *intersection_op; }; /** @@ -219,21 +244,6 @@ enum TunnelContextType { struct TunnelContext { /** - * The mesh tunnel that has this context - */ - struct GNUNET_MESH_Tunnel *tunnel; - - /** - * The peer on the other side. - */ - struct GNUNET_PeerIdentity peer; - - /** - * Handle to the message queue for the tunnel. - */ - struct GNUNET_MQ_Handle *mq; - - /** * Type of the tunnel. */ enum TunnelContextType type; @@ -242,7 +252,7 @@ struct TunnelContext * State associated with the tunnel, dependent on * tunnel type. */ - void *data; + union TunnelContextData data; }; @@ -268,11 +278,14 @@ _GSS_union_set_create (void); * Evaluate a union operation with * a remote peer. * - * @param m the evaluate request message from the client + * @param spec specification of the operation the evaluate + * @param tunnel tunnel already connected to the partner peer * @param set the set to evaluate the operation with + * @return a handle to the operation */ -void -_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set); +struct UnionEvaluateOperation * +_GSS_union_evaluate (struct OperationSpecification *spec, + struct GNUNET_MESH_Tunnel *tunnel); /** @@ -308,13 +321,13 @@ _GSS_union_set_destroy (struct Set *set); /** * Accept an union operation request from a remote peer * - * @param m the accept message from the client - * @param set the set of the client - * @param incoming information about the requesting remote peer + * @param spec all necessary information about the operation + * @param tunnel open tunnel to the partner's peer + * @return operation */ -void -_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, - struct Incoming *incoming); +struct UnionEvaluateOperation * +_GSS_union_accept (struct OperationSpecification *spec, + struct GNUNET_MESH_Tunnel *tunnel); /** diff --git a/src/set/gnunet-service-set_intersection.c b/src/set/gnunet-service-set_intersection.c index fe3ba56eaf..57028c0dd8 100644 --- a/src/set/gnunet-service-set_intersection.c +++ b/src/set/gnunet-service-set_intersection.c @@ -124,10 +124,9 @@ struct IntersectionEvaluateOperation struct GNUNET_MessageHeader *context_msg; /** - * Tunnel context for the peer we - * evaluate the union operation with. + * Tunnel to the other peer. */ - struct TunnelContext *tc; + struct GNUNET_MESH_Tunnel *tunnel; /** * Request ID to multiplex set operations to @@ -397,12 +396,11 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); - if (NULL != eo->tc) + if (NULL != eo->tunnel) { - GNUNET_MQ_destroy (eo->tc->mq); - GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); - GNUNET_free (eo->tc); - eo->tc = NULL; + GNUNET_MESH_tunnel_destroy (eo->tunnel); + /* wait for the final destruction by the tunnel cleaner */ + return; } if (NULL != eo->remote_ibf) @@ -432,10 +430,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) eo); GNUNET_free (eo); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); - /* FIXME: do a garbage collection of the set generations */ } @@ -1355,7 +1351,6 @@ _GSS_intersection_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) * @param cls closure * @param tunnel mesh tunnel * @param tunnel_ctx tunnel context - * @param sender ??? * @param mh message to process * @return ??? */ @@ -1363,7 +1358,6 @@ int _GSS_union_handle_p2p_message (void *cls, struct GNUNET_MESH_Tunnel *tunnel, void **tunnel_ctx, - const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *mh) { struct TunnelContext *tc = *tunnel_ctx; @@ -1371,7 +1365,6 @@ _GSS_union_handle_p2p_message (void *cls, if (CONTEXT_OPERATION_UNION != tc->type) { - /* FIXME: kill the tunnel */ /* never kill mesh */ return GNUNET_OK; } diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index 5b1f28cf43..f9756bd5ba 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -103,37 +103,20 @@ enum UnionOperationPhase struct UnionEvaluateOperation { /** - * Local set the operation is evaluated on. + * Tunnel to the remote peer. */ - struct Set *set; - - /** - * Peer with the remote set - */ - struct GNUNET_PeerIdentity peer; - - /** - * Application-specific identifier - */ - struct GNUNET_HashCode app_id; + struct GNUNET_MESH_Tunnel *tunnel; /** - * Context message, given to us - * by the client, may be NULL. + * Detail information about the set operation, + * including the set to use. */ - struct GNUNET_MessageHeader *context_msg; + struct OperationSpecification *spec; /** - * Tunnel context for the peer we - * evaluate the union operation with. + * Message queue for the peer. */ - struct TunnelContext *tc; - - /** - * Request ID to multiplex set operations to - * the client inhabiting the set. - */ - uint32_t request_id; + struct GNUNET_MQ_Handle *mq; /** * Number of ibf buckets received @@ -167,11 +150,6 @@ struct UnionEvaluateOperation enum UnionOperationPhase phase; /** - * Salt to use for this operation. - */ - uint16_t salt; - - /** * Generation in which the operation handle * was created. */ @@ -395,16 +373,17 @@ destroy_key_to_element_iter (void *cls, void _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) { + struct UnionState *st = eo->spec->set->state.u; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); - - if (NULL != eo->tc) + + if (NULL != eo->tunnel) { - GNUNET_MQ_destroy (eo->tc->mq); - GNUNET_MESH_tunnel_destroy (eo->tc->tunnel); - GNUNET_free (eo->tc); - eo->tc = NULL; + struct GNUNET_MESH_Tunnel *t = eo->tunnel; + eo->tunnel = NULL; + GNUNET_MESH_tunnel_destroy (t); } - + if (NULL != eo->remote_ibf) { ibf_destroy (eo->remote_ibf); @@ -427,8 +406,8 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) eo->key_to_element = NULL; } - GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, + GNUNET_CONTAINER_DLL_remove (st->ops_head, + st->ops_tail, eo); GNUNET_free (eo); @@ -449,13 +428,13 @@ _GSS_union_operation_destroy (struct UnionEvaluateOperation *eo) static void fail_union_operation (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *msg; - mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); - msg->request_id = htonl (eo->request_id); - GNUNET_MQ_send (eo->set->client_mq, mqm); + msg->request_id = htonl (eo->spec->client_request_id); + GNUNET_MQ_send (eo->spec->set->client_mq, ev); _GSS_union_operation_destroy (eo); } @@ -490,27 +469,27 @@ get_ibf_key (struct GNUNET_HashCode *src, uint16_t salt) static void send_operation_request (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct OperationRequestMessage *msg; - mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, - eo->context_msg); + ev = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, + eo->spec->context_msg); - if (NULL == mqm) + if (NULL == ev) { /* the context message is too large */ GNUNET_break (0); - GNUNET_SERVER_client_disconnect (eo->set->client); + GNUNET_SERVER_client_disconnect (eo->spec->set->client); return; } msg->operation = htons (GNUNET_SET_OPERATION_UNION); - msg->app_id = eo->app_id; - GNUNET_MQ_send (eo->tc->mq, mqm); + msg->app_id = eo->spec->app_id; + GNUNET_MQ_send (eo->mq, ev); - if (NULL != eo->context_msg) + if (NULL != eo->spec->context_msg) { - GNUNET_free (eo->context_msg); - eo->context_msg = NULL; + GNUNET_free (eo->spec->context_msg); + eo->spec->context_msg = NULL; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); @@ -565,7 +544,7 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) struct IBF_Key ibf_key; struct KeyEntry *k; - ibf_key = get_ibf_key (&ee->element_hash, eo->salt); + ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt); k = GNUNET_new (struct KeyEntry); k->element = ee; k->ibf_key = ibf_key; @@ -644,9 +623,9 @@ prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size) if (NULL == eo->key_to_element) { unsigned int len; - len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements); + len = GNUNET_CONTAINER_multihashmap_size (eo->spec->set->state.u->elements); eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1); - GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements, + GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements, init_key_to_element_iterator, eo); } if (NULL != eo->local_ibf) @@ -678,7 +657,7 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) while (buckets_sent < (1 << ibf_order)) { unsigned int buckets_in_message; - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct IBFMessage *msg; buckets_in_message = (1 << ibf_order) - buckets_sent; @@ -686,14 +665,14 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE) buckets_in_message = MAX_BUCKETS_PER_MESSAGE; - mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, + ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE, GNUNET_MESSAGE_TYPE_SET_P2P_IBF); msg->order = ibf_order; msg->offset = htons (buckets_sent); ibf_write_slice (ibf, buckets_sent, buckets_in_message, &msg[1]); buckets_sent += buckets_in_message; - GNUNET_MQ_send (eo->tc->mq, mqm); + GNUNET_MQ_send (eo->mq, ev); } eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS; @@ -708,14 +687,15 @@ send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order) static void send_strata_estimator (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_MessageHeader *strata_msg; + struct UnionState *st = eo->spec->set->state.u; - mqm = GNUNET_MQ_msg_header_extra (strata_msg, + ev = GNUNET_MQ_msg_header_extra (strata_msg, SE_STRATA_COUNT * IBF_BUCKET_SIZE * SE_IBF_SIZE, GNUNET_MESSAGE_TYPE_SET_P2P_SE); - strata_estimator_write (eo->set->state.u->se, &strata_msg[1]); - GNUNET_MQ_send (eo->tc->mq, mqm); + strata_estimator_write (st->se, &strata_msg[1]); + GNUNET_MQ_send (eo->mq, ev); eo->phase = PHASE_EXPECT_IBF; } @@ -797,12 +777,12 @@ send_element_iterator (void *cls, while (NULL != ke) { const struct GNUNET_SET_Element *const element = &ke->element->element; - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_MessageHeader *mh; GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (NULL == mqm) + ev = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + if (NULL == ev) { /* element too large */ GNUNET_break (0); @@ -810,7 +790,7 @@ send_element_iterator (void *cls, } memcpy (&mh[1], element->data, element->size); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); - GNUNET_MQ_send (eo->tc->mq, mqm); + GNUNET_MQ_send (eo->mq, ev); ke = ke->next_colliding; } return GNUNET_NO; @@ -882,11 +862,11 @@ decode_and_send (struct UnionEvaluateOperation *eo) } if (GNUNET_NO == res) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending DONE\n"); - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_send (eo->tc->mq, mqm); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + GNUNET_MQ_send (eo->mq, ev); break; } if (1 == side) @@ -895,15 +875,15 @@ decode_and_send (struct UnionEvaluateOperation *eo) } else { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_MessageHeader *msg; /* FIXME: before sending the request, check if we may just have the element */ /* FIXME: merge multiple requests */ - mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), + ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key), GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS); *(struct IBF_Key *) &msg[1] = key; - GNUNET_MQ_send (eo->tc->mq, mqm); + GNUNET_MQ_send (eo->mq, ev); } } ibf_destroy (diff_ibf); @@ -980,21 +960,21 @@ static void send_client_element (struct UnionEvaluateOperation *eo, struct GNUNET_SET_Element *element) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_assert (0 != eo->request_id); - mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); - if (NULL == mqm) + GNUNET_assert (0 != eo->spec->client_request_id); + ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == ev) { - GNUNET_MQ_discard (mqm); + GNUNET_MQ_discard (ev); GNUNET_break (0); return; } rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (eo->request_id); + rm->request_id = htonl (eo->spec->client_request_id); memcpy (&rm[1], element->data, element->size); - GNUNET_MQ_send (eo->set->client_mq, mqm); + GNUNET_MQ_send (eo->spec->set->client_mq, ev); } @@ -1009,14 +989,13 @@ send_client_element (struct UnionEvaluateOperation *eo, static void send_client_done_and_destroy (struct UnionEvaluateOperation *eo) { - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; struct GNUNET_SET_ResultMessage *rm; - GNUNET_assert (0 != eo->request_id); - mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->request_id = htonl (eo->request_id); + ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); + rm->request_id = htonl (eo->spec->client_request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); - GNUNET_MQ_send (eo->set->client_mq, mqm); + GNUNET_MQ_send (eo->spec->set->client_mq, ev); } @@ -1123,13 +1102,13 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) { /* we got all requests, but still have to send our elements as response */ - struct GNUNET_MQ_Envelope *mqm; + struct GNUNET_MQ_Envelope *ev; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after elements\n"); eo->phase = PHASE_FINISHED; - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); - GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo); - GNUNET_MQ_send (eo->tc->mq, mqm); + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE); + GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo); + GNUNET_MQ_send (eo->mq, ev); return; } if (eo->phase == PHASE_EXPECT_ELEMENTS) @@ -1148,80 +1127,69 @@ handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh) * Evaluate a union operation with * a remote peer. * - * @param m the evaluate request message from the client + * @param spec specification of the operation the evaluate + * @param tunnel tunnel already connected to the partner peer * @param set the set to evaluate the operation with + * @return a handle to the operation */ -void -_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) +struct UnionEvaluateOperation * +_GSS_union_evaluate (struct OperationSpecification *spec, + struct GNUNET_MESH_Tunnel *tunnel) { struct UnionEvaluateOperation *eo; - struct GNUNET_MessageHeader *context_msg; + struct UnionState *st = spec->set->state.u; eo = GNUNET_new (struct UnionEvaluateOperation); - eo->peer = m->target_peer; - eo->set = set; - eo->request_id = htonl (m->request_id); - GNUNET_assert (0 != eo->request_id); - eo->se = strata_estimator_dup (set->state.u->se); - eo->salt = ntohs (m->salt); - eo->app_id = m->app_id; - - context_msg = GNUNET_MQ_extract_nested_mh (m); - if (NULL != context_msg) - { - eo->context_msg = GNUNET_copy_message (context_msg); - } + eo->se = strata_estimator_dup (spec->set->state.u->se); + eo->spec = spec; + eo->tunnel = tunnel; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n", - GNUNET_h2s (&eo->app_id)); - - eo->tc = GNUNET_new (struct TunnelContext); - eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer, - GNUNET_APPLICATION_TYPE_SET); - GNUNET_assert (NULL != eo->tc->tunnel); - eo->tc->peer = eo->peer; - eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel); + GNUNET_h2s (&eo->spec->app_id)); + /* we started the operation, thus we have to send the operation request */ eo->phase = PHASE_EXPECT_SE; - GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, + GNUNET_CONTAINER_DLL_insert (st->ops_head, + st->ops_tail, eo); send_operation_request (eo); + + return eo; } /** * Accept an union operation request from a remote peer * - * @param m the accept message from the client - * @param set the set of the client - * @param incoming information about the requesting remote peer + * @param spec all necessary information about the operation + * @param tunnel open tunnel to the partner's peer + * @return operation */ -void -_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, - struct Incoming *incoming) +struct UnionEvaluateOperation * +_GSS_union_accept (struct OperationSpecification *spec, + struct GNUNET_MESH_Tunnel *tunnel) { struct UnionEvaluateOperation *eo; + struct UnionState *st = spec->set->state.u; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n"); eo = GNUNET_new (struct UnionEvaluateOperation); - eo->tc = incoming->tc; - eo->generation_created = set->state.u->current_generation++; - eo->set = set; - eo->salt = ntohs (incoming->salt); - GNUNET_assert (0 != ntohl (m->request_id)); - eo->request_id = ntohl (m->request_id); - eo->se = strata_estimator_dup (set->state.u->se); + eo->generation_created = st->current_generation++; + eo->spec = spec; + eo->tunnel = tunnel; + eo->se = strata_estimator_dup (st->se); /* transfer ownership of mq and socket from incoming to eo */ - GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head, - eo->set->state.u->ops_tail, + GNUNET_CONTAINER_DLL_insert (st->ops_head, + st->ops_tail, eo); /* kick off the operation */ send_strata_estimator (eo); + + return eo; } @@ -1370,11 +1338,10 @@ _GSS_union_handle_p2p_message (void *cls, if (CONTEXT_OPERATION_UNION != tc->type) { - GNUNET_break_op (0); return GNUNET_SYSERR; } - eo = tc->data; + eo = tc->data.union_op; switch (ntohs (mh->type)) { diff --git a/src/set/set_protocol.h b/src/set/set_protocol.h index 543e2a002e..9455421517 100644 --- a/src/set/set_protocol.h +++ b/src/set/set_protocol.h @@ -42,30 +42,21 @@ struct OperationRequestMessage /** * Operation to request, values from 'enum GNUNET_SET_OperationType' */ - uint32_t operation; + uint32_t operation GNUNET_PACKED; /** - * Application-specific identifier of the request. + * Salt to use for this operation. */ - struct GNUNET_HashCode app_id; - - /* rest: optional message */ -}; + uint32_t salt; -struct ElementRequestMessage -{ /** - * Type: GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS + * Application-specific identifier of the request. */ - struct GNUNET_MessageHeader header; + struct GNUNET_HashCode app_id; - /** - * Salt the keys in the body use - */ - uint8_t salt; + /* rest: optional message */ }; - struct IBFMessage { /** @@ -80,15 +71,20 @@ struct IBFMessage uint8_t order; /** - * Salt used when hashing elements for this IBF. + * Padding, must be 0. */ - uint8_t salt; + uint8_t reserved; /** * Offset of the strata in the rest of the message */ uint16_t offset GNUNET_PACKED; + /** + * Salt used when hashing elements for this IBF. + */ + uint32_t salt; + /* rest: strata */ }; |