aboutsummaryrefslogtreecommitdiff
path: root/src/util/mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/mq.c')
-rw-r--r--src/util/mq.c107
1 files changed, 66 insertions, 41 deletions
diff --git a/src/util/mq.c b/src/util/mq.c
index 6f8c042246..4170338add 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -74,13 +74,7 @@ struct GNUNET_MQ_Handle
/**
* Handlers array, or NULL if the queue should not receive messages
*/
- const struct GNUNET_MQ_MessageHandler *handlers;
-
- /**
- * Closure for the handler callbacks,
- * as well as for the error handler.
- */
- void *handlers_cls;
+ struct GNUNET_MQ_MessageHandler *handlers;
/**
* Actual implementation of message sending,
@@ -109,6 +103,11 @@ struct GNUNET_MQ_Handle
GNUNET_MQ_ErrorHandler error_handler;
/**
+ * Closure for the error handler.
+ */
+ void *error_handler_cls;
+
+ /**
* Linked list of messages pending to be sent
*/
struct GNUNET_MQ_Envelope *envelope_head;
@@ -133,7 +132,7 @@ struct GNUNET_MQ_Handle
/**
* Task scheduled during #GNUNET_MQ_impl_send_continue.
*/
- struct GNUNET_SCHEDULER_Task * continue_task;
+ struct GNUNET_SCHEDULER_Task *continue_task;
/**
* Next id that should be used for the @e assoc_map,
@@ -206,23 +205,42 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
{
const struct GNUNET_MQ_MessageHandler *handler;
int handled = GNUNET_NO;
-
+ uint16_t ms = ntohs (mh->size);
+
if (NULL == mq->handlers)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "No handler for message of type %d\n",
- ntohs (mh->type));
- return;
- }
+ goto done;
for (handler = mq->handlers; NULL != handler->cb; handler++)
{
if (handler->type == ntohs (mh->type))
{
- handler->cb (mq->handlers_cls, mh);
handled = GNUNET_YES;
+ if ( (handler->expected_size > ms) ||
+ ( (handler->expected_size != ms) &&
+ (NULL == handler->mv) ) )
+ {
+ /* Too small, or not an exact size and
+ no 'mv' handler to check rest */
+ GNUNET_MQ_inject_error (mq,
+ GNUNET_MQ_ERROR_MALFORMED);
+ break;
+ }
+ if ( (NULL == handler->mv) ||
+ (GNUNET_OK ==
+ handler->mv (handler->cls, mh)) )
+ {
+ /* message well-formed, pass to handler */
+ handler->cb (handler->cls, mh);
+ }
+ else
+ {
+ /* Message rejected by check routine */
+ GNUNET_MQ_inject_error (mq,
+ GNUNET_MQ_ERROR_MALFORMED);
+ }
break;
}
}
+ done:
if (GNUNET_NO == handled)
LOG (GNUNET_ERROR_TYPE_WARNING,
"No handler for message of type %d\n",
@@ -251,7 +269,7 @@ GNUNET_MQ_inject_error (struct GNUNET_MQ_Handle *mq,
(int) error);
return;
}
- mq->error_handler (mq->handlers_cls, error);
+ mq->error_handler (mq->error_handler_cls, error);
}
@@ -355,7 +373,7 @@ GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
* @param impl_state for the queue, passed to 'send' and 'destroy'
* @param handlers array of message handlers
* @param error_handler handler for read and write errors
- * @param cls closure for message handlers and error handler
+ * @param error_handler_cls closure for @a error_handler
* @return a new message queue
*/
struct GNUNET_MQ_Handle *
@@ -365,16 +383,26 @@ GNUNET_MQ_queue_for_callbacks (GNUNET_MQ_SendImpl send,
void *impl_state,
const struct GNUNET_MQ_MessageHandler *handlers,
GNUNET_MQ_ErrorHandler error_handler,
- void *cls)
+ void *error_handler_cls)
{
struct GNUNET_MQ_Handle *mq;
+ unsigned int i;
mq = GNUNET_new (struct GNUNET_MQ_Handle);
mq->send_impl = send;
mq->destroy_impl = destroy;
mq->cancel_impl = cancel;
- mq->handlers = handlers;
- mq->handlers_cls = cls;
+ if (NULL != handlers)
+ {
+ for (i=0;NULL != handlers[i].cb; i++) ;
+ mq->handlers = GNUNET_new_array (i,
+ struct GNUNET_MQ_MessageHandler);
+ memcpy (mq->handlers,
+ handlers,
+ i * sizeof (struct GNUNET_MQ_MessageHandler));
+ }
+ mq->error_handler = error_handler;
+ mq->error_handler_cls = error_handler_cls;
mq->impl_state = impl_state;
return mq;
@@ -572,7 +600,6 @@ handle_client_message (void *cls,
struct ClientConnectionState *state;
state = mq->impl_state;
-
if (NULL == msg)
{
GNUNET_MQ_inject_error (mq, GNUNET_MQ_ERROR_READ);
@@ -617,7 +644,9 @@ connection_client_transmit_queued (void *cls,
(GNUNET_NO == state->receive_active) )
{
state->receive_active = GNUNET_YES;
- GNUNET_CLIENT_receive (state->connection, handle_client_message, mq,
+ GNUNET_CLIENT_receive (state->connection,
+ &handle_client_message,
+ mq,
GNUNET_TIME_UNIT_FOREVER_REL);
}
@@ -673,17 +702,24 @@ struct GNUNET_MQ_Handle *
GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connection,
const struct GNUNET_MQ_MessageHandler *handlers,
GNUNET_MQ_ErrorHandler error_handler,
- void *cls)
+ void *error_handler_cls)
{
struct GNUNET_MQ_Handle *mq;
struct ClientConnectionState *state;
-
- GNUNET_assert (NULL != connection);
+ unsigned int i;
mq = GNUNET_new (struct GNUNET_MQ_Handle);
- mq->handlers = handlers;
+ if (NULL != handlers)
+ {
+ for (i=0;NULL != handlers[i].cb; i++) ;
+ mq->handlers = GNUNET_new_array (i,
+ struct GNUNET_MQ_MessageHandler);
+ memcpy (mq->handlers,
+ handlers,
+ i * sizeof (struct GNUNET_MQ_MessageHandler));
+ }
mq->error_handler = error_handler;
- mq->handlers_cls = cls;
+ mq->error_handler_cls = error_handler_cls;
state = GNUNET_new (struct ClientConnectionState);
state->connection = connection;
mq->impl_state = state;
@@ -697,18 +733,6 @@ GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection *connecti
}
-void
-GNUNET_MQ_replace_handlers (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MQ_MessageHandler *new_handlers,
- void *cls)
-{
- /* FIXME: notify implementation? */
- /* FIXME: what about NULL handlers? abort receive? */
- mq->handlers = new_handlers;
- mq->handlers_cls = cls;
-}
-
-
/**
* Associate the assoc_data in mq with a unique request id.
*
@@ -784,6 +808,7 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
while (NULL != mq->envelope_head)
{
struct GNUNET_MQ_Envelope *ev;
+
ev = mq->envelope_head;
ev->parent_queue = NULL;
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
@@ -804,7 +829,7 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
GNUNET_CONTAINER_multihashmap32_destroy (mq->assoc_map);
mq->assoc_map = NULL;
}
-
+ GNUNET_free_non_null (mq->handlers);
GNUNET_free (mq);
}