diff options
author | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
---|---|---|
committer | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
commit | 740b30688bd745a527f96f9116c19acb3480971a (patch) | |
tree | 2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/stream/stream_api.c | |
parent | 2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff) |
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r-- | src/stream/stream_api.c | 2185 |
1 files changed, 1648 insertions, 537 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 84fcdfd..dadba33 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -18,17 +18,31 @@ Boston, MA 02111-1307, USA. */ +/* TODO: + * + * Checks for matching the sender and socket->other_peer in server + * message handlers + * + * Add code for write io timeout + * + * Include retransmission for control messages + **/ + /** * @file stream/stream_api.c * @brief Implementation of the stream library * @author Sree Harsha Totakura */ + + #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" #include "gnunet_stream_lib.h" #include "stream_protocol.h" +#define LOG(kind,...) \ + GNUNET_log_from (kind, "stream-api", __VA_ARGS__) /** * The maximum packet size of a stream packet @@ -36,15 +50,15 @@ #define MAX_PACKET_SIZE 64000 /** - * The maximum payload a data message packet can carry + * Receive buffer */ -static size_t max_payload_size = - MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); +#define RECEIVE_BUFFER_SIZE 4096000 /** - * Receive buffer + * The maximum payload a data message packet can carry */ -#define RECEIVE_BUFFER_SIZE 4096000 +static size_t max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); /** * states in the Protocol @@ -150,12 +164,6 @@ struct MessageQueue */ struct GNUNET_STREAM_Socket { - - /** - * The peer identity of the peer at the other end of the stream - */ - struct GNUNET_PeerIdentity other_peer; - /** * Retransmission timeout */ @@ -177,16 +185,6 @@ struct GNUNET_STREAM_Socket struct GNUNET_TIME_Relative ack_time_deadline; /** - * The task for sending timely Acks - */ - GNUNET_SCHEDULER_TaskIdentifier ack_task_id; - - /** - * Task scheduled to continue a read operation. - */ - GNUNET_SCHEDULER_TaskIdentifier read_task; - - /** * The mesh handle */ struct GNUNET_MESH_Handle *mesh; @@ -212,6 +210,16 @@ struct GNUNET_STREAM_Socket struct GNUNET_MESH_TransmitHandle *transmit_handle; /** + * The current act transmit handle (if a pending ack transmit request exists) + */ + struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; + + /** + * Pointer to the current ack message using in ack_task + */ + struct GNUNET_STREAM_AckMessage *ack_msg; + + /** * The current message associated with the transmit handle */ struct MessageQueue *queue_head; @@ -232,14 +240,45 @@ struct GNUNET_STREAM_Socket struct GNUNET_STREAM_IOReadHandle *read_handle; /** + * The shutdown handle associated with this socket + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** * Buffer for storing received messages */ void *receive_buffer; /** + * The listen socket from which this socket is derived. Should be NULL if it + * is not a derived socket + */ + struct GNUNET_STREAM_ListenSocket *lsocket; + + /** + * The peer identity of the peer at the other end of the stream + */ + struct GNUNET_PeerIdentity other_peer; + + /** * Task identifier for the read io timeout task */ - GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; + GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; + + /** + * Task identifier for retransmission task after timeout + */ + GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + + /** + * The task for sending timely Acks + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task_id; + + /** + * Task scheduled to continue a read operation. + */ + GNUNET_SCHEDULER_TaskIdentifier read_task_id; /** * The state of the protocol associated with this socket @@ -257,6 +296,11 @@ struct GNUNET_STREAM_Socket unsigned int retries; /** + * The application port number (type: uint32_t) + */ + GNUNET_MESH_ApplicationType app_port; + + /** * The session id associated with this stream connection * FIXME: Not used currently, may be removed */ @@ -286,7 +330,7 @@ struct GNUNET_STREAM_Socket /** * receiver's available buffer after the last acknowledged packet */ - uint32_t receive_window_available; + uint32_t receiver_window_available; /** * The offset pointer used during write operation @@ -310,7 +354,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_STREAM_ListenSocket { - /** * The mesh handle */ @@ -328,6 +371,7 @@ struct GNUNET_STREAM_ListenSocket /** * The service port + * FIXME: Remove if not required! */ GNUNET_MESH_ApplicationType port; }; @@ -339,9 +383,24 @@ struct GNUNET_STREAM_ListenSocket struct GNUNET_STREAM_IOWriteHandle { /** + * The socket to which this write handle is associated + */ + struct GNUNET_STREAM_Socket *socket; + + /** * The packet_buffers associated with this Handle */ - struct GNUNET_STREAM_DataMessage *messages[64]; + struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; + + /** + * The write continuation callback + */ + GNUNET_STREAM_CompletionContinuation write_cont; + + /** + * Write continuation closure + */ + void *write_cont_cls; /** * The bitmap of this IOHandle; Corresponding bit for a message is set when @@ -350,11 +409,9 @@ struct GNUNET_STREAM_IOWriteHandle GNUNET_STREAM_AckBitmap ack_bitmap; /** - * Number of packets sent before waiting for an ack - * - * FIXME: Do we need this? + * Number of bytes in this write handle */ - unsigned int sent_packets; + size_t size; }; @@ -376,13 +433,45 @@ struct GNUNET_STREAM_IOReadHandle /** + * Handle for Shutdown + */ +struct GNUNET_STREAM_ShutdownHandle +{ + /** + * The socket associated with this shutdown handle + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Shutdown completion callback + */ + GNUNET_STREAM_ShutdownCompletion completion_cb; + + /** + * Closure for completion callback + */ + void *completion_cls; + + /** + * Close message retransmission task id + */ + GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id; + + /** + * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR + */ + int operation; +}; + + +/** * Default value in seconds for various timeouts */ -static unsigned int default_timeout = 300; +static unsigned int default_timeout = 10; /** - * Callback function for sending hello message + * Callback function for sending queued message * * @param cls closure the socket * @param size number of bytes available in buf @@ -401,31 +490,32 @@ send_message_notify (void *cls, size_t size, void *buf) if (NULL == head) return 0; /* just to be safe */ if (0 == size) /* request timed out */ - { - socket->retries++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message sending timed out. Retry %d \n", - socket->retries); - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - return 0; - } + { + socket->retries++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Message sending timed out. Retry %d \n", + GNUNET_i2s (&socket->other_peer), + socket->retries); + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + return 0; + } ret = ntohs (head->message->header.size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); if (NULL != head->finish_cb) - { - head->finish_cb (socket, head->finish_cb_cls); - } + { + head->finish_cb (head->finish_cb_cls, socket); + } GNUNET_CONTAINER_DLL_remove (socket->queue_head, socket->queue_tail, head); @@ -433,19 +523,19 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_free (head); head = socket->queue_head; if (NULL != head) /* more pending messages to send */ - { - socket->retries = 0; - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - } + { + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + } return ret; } @@ -466,6 +556,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket, { struct MessageQueue *queue_entity; + GNUNET_assert + ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) + && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Queueing message of type %d and size %d\n", + GNUNET_i2s (&socket->other_peer), + ntohs (message->header.type), + ntohs (message->header.size)); + GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; queue_entity->finish_cb = finish_cb; @@ -490,6 +590,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket, /** + * Copies a message and queues it for sending using the mesh connection of + * given socket + * + * @param socket the socket whose mesh connection is used + * @param message the message to be sent + * @param finish_cb the callback to be called when the message is sent + * @param finish_cb_cls the closure for the callback + */ +static void +copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_STREAM_MessageHeader *message, + SendFinishCallback finish_cb, + void *finish_cb_cls) +{ + struct GNUNET_STREAM_MessageHeader *msg_copy; + uint16_t size; + + size = ntohs (message->header.size); + msg_copy = GNUNET_malloc (size); + memcpy (msg_copy, message, size); + queue_message (socket, msg_copy, finish_cb, finish_cb_cls); +} + + +/** * Callback function for sending ack message * * @param cls closure the ACK message created in ack_task @@ -500,21 +625,56 @@ queue_message (struct GNUNET_STREAM_Socket *socket, static size_t send_ack_notify (void *cls, size_t size, void *buf) { - struct GNUNET_STREAM_AckMessage *ack_msg = cls; + struct GNUNET_STREAM_Socket *socket = cls; if (0 == size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s called with size 0\n", __func__); - return 0; - } - GNUNET_assert (ack_msg->header.header.size <= size); + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s called with size 0\n", __func__); + return 0; + } + GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); + + size = ntohs (socket->ack_msg->header.header.size); + memcpy (buf, socket->ack_msg, size); - size = ack_msg->header.header.size; - memcpy (buf, ack_msg, size); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; return size; } +/** + * Writes data using the given socket. The amount of data written is limited by + * the receiver_window_size + * + * @param socket the socket to use + */ +static void +write_data (struct GNUNET_STREAM_Socket *socket); + +/** + * Task for retransmitting data messages if they aren't ACK before their ack + * deadline + * + * @param cls the socket + * @param tc the Task context + */ +static void +retransmission_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + return; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer)); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + write_data (socket); +} + /** * Task for sending ACK message @@ -530,12 +690,10 @@ ack_task (void *cls, struct GNUNET_STREAM_AckMessage *ack_msg; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) - { - return; - } - - socket->ack_task_id = 0; - + { + return; + } + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); ack_msg->header.header.size = htons (sizeof (struct @@ -545,18 +703,61 @@ ack_task (void *cls, ack_msg->base_sequence_number = htonl (socket->read_sequence_number); ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); - + socket->ack_msg = ack_msg; /* Request MESH for sending ACK */ - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (ack_msg->header.header.size), - &send_ack_notify, - ack_msg); + socket->ack_transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (ack_msg->header.header.size), + &send_ack_notify, + socket); +} - + +/** + * Retransmission task for shutdown messages + * + * @param cls the shutdown handle + * @param tc the Task Context + */ +static void +close_msg_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; + struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_Socket *socket; + + GNUNET_assert (NULL != shutdown_handle); + socket = shutdown_handle->socket; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (shutdown_handle->operation) + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } + queue_message (socket, msg, NULL, NULL); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + shutdown_handle); } @@ -572,7 +773,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit, int value) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); if (GNUNET_YES == value) *bitmap |= (1LL << bit); else @@ -591,31 +792,14 @@ static uint8_t ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); return 0 != (*bitmap & (1LL << bit)); } - -/** - * Function called when Data Message is sent - * - * @param cls the io_handle corresponding to the Data Message - * @param socket the socket which was used - */ -static void -write_data_finish_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; - - io_handle->sent_packets++; -} - - /** * Writes data using the given socket. The amount of data written is limited by - * the receive_window_size + * the receiver_window_size * * @param socket the socket to use */ @@ -623,43 +807,59 @@ static void write_data (struct GNUNET_STREAM_Socket *socket) { struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle; - unsigned int packet; + int packet; /* Although an int, should never be negative */ int ack_packet; ack_packet = -1; /* Find the last acknowledged packet */ - for (packet=0; packet < 64; packet++) - { - if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - ack_packet = packet; - else if (NULL == io_handle->messages[packet]) - break; - } + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + ack_packet = packet; + else if (NULL == io_handle->messages[packet]) + break; + } /* Resend packets which weren't ack'ed */ for (packet=0; packet < ack_packet; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) { - if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - { - queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); } + } packet = ack_packet + 1; /* Now send new packets if there is enough buffer space */ while ( (NULL != io_handle->messages[packet]) && - (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) - { - socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); - queue_message (socket, - &io_handle->messages[packet]->header, - &write_data_finish_cb, - io_handle); - packet++; - } + (socket->receiver_window_available + >= ntohs (io_handle->messages[packet]->header.header.size)) ) + { + socket->receiver_window_available -= + ntohs (io_handle->messages[packet]->header.header.size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); + packet++; + } + if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 8), + &retransmission_timeout_task, + socket); } @@ -671,7 +871,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) */ static void call_read_processor (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; size_t read_size; @@ -680,87 +880,95 @@ call_read_processor (void *cls, uint32_t sequence_increase; uint32_t offset_increase; - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (NULL == socket->receive_buffer) + return; + GNUNET_assert (NULL != socket->read_handle); GNUNET_assert (NULL != socket->read_handle->proc); /* Check the bitmap for any holes */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, - packet)) - break; - } + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } /* We only call read processor if we have the first packet */ GNUNET_assert (0 < packet); - valid_read_size = socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; - GNUNET_assert (0 != valid_read_size); - /* Cancel the read_io_timeout_task */ - GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - + GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Calling read processor\n", + GNUNET_i2s (&socket->other_peer)); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor read %d bytes\n", + GNUNET_i2s (&socket->other_peer), read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor completed successfully\n", + GNUNET_i2s (&socket->other_peer)); /* Free the read handle */ GNUNET_free (socket->read_handle); socket->read_handle = NULL; - GNUNET_assert (read_size <= valid_read_size); socket->copy_offset += read_size; - /* Determine upto which packet we can remove from the buffer */ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) + { packet++; break; } if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) break; + } /* If no packets can be removed we can't move the buffer */ if (0 == packet) return; - sequence_increase = packet; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Sequence increase after read processor completion: %u\n", + GNUNET_i2s (&socket->other_peer), sequence_increase); /* Shift the data in the receive buffer */ memmove (socket->receive_buffer, socket->receive_buffer + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); - + socket->receive_buffer_size + - socket->receive_buffer_boundaries[sequence_increase-1]); /* Shift the bitmap */ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; - /* Set read_sequence_number */ socket->read_sequence_number += sequence_increase; - /* Set read_offset */ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1]; socket->read_offset += offset_increase; - /* Fix copy_offset */ GNUNET_assert (offset_increase <= socket->copy_offset); socket->copy_offset -= offset_increase; - /* Fix relative boundaries */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) { - if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) - { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; - } - else - socket->receive_buffer_boundaries[packet] = 0; + socket->receive_buffer_boundaries[packet] = + socket->receive_buffer_boundaries[packet + sequence_increase] + - offset_increase; } + else + socket->receive_buffer_boundaries[packet] = 0; + } } @@ -772,20 +980,32 @@ call_read_processor (void *cls, */ static void read_io_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; + GNUNET_STREAM_DataProcessor proc; + void *proc_cls; - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read task timedout - Cancelling it\n", + GNUNET_i2s (&socket->other_peer)); + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } GNUNET_assert (NULL != socket->read_handle); - + proc = socket->read_handle->proc; + proc_cls = socket->read_handle->proc_cls; + GNUNET_free (socket->read_handle); socket->read_handle = NULL; + /* Call the read processor to signal timeout */ + proc (proc_cls, + GNUNET_STREAM_TIMEOUT, + NULL, + 0); } @@ -815,96 +1035,150 @@ handle_data (struct GNUNET_STREAM_Socket *socket, size = htons (msg->header.header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } switch (socket->state) + { + case STATE_ESTABLISHED: + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + + /* check if the message's sequence number is in the range we are + expecting */ + relative_sequence_number = + ntohl (msg->sequence_number) - socket->read_sequence_number; + if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) { - case STATE_ESTABLISHED: - case STATE_TRANSMIT_CLOSED: - case STATE_TRANSMIT_CLOSE_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } + + /* Check if we have already seen this message */ + if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, + relative_sequence_number)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring already received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } - /* check if the message's sequence number is in the range we are - expecting */ - relative_sequence_number = - ntohl (msg->sequence_number) - socket->read_sequence_number; - if ( relative_sequence_number > 64) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring received message with sequence number %d", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - - /* Check if we have to allocate the buffer */ - size -= sizeof (struct GNUNET_STREAM_DataMessage); - relative_offset = ntohl (msg->offset) - socket->read_offset; - bytes_needed = relative_offset + size; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Receiving DATA with sequence number: %u and size: %d from %s\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number), + ntohs (msg->header.header.size), + GNUNET_i2s (&socket->other_peer)); - if (bytes_needed > socket->receive_buffer_size) - { - if (bytes_needed <= RECEIVE_BUFFER_SIZE) - { - socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, - bytes_needed); - socket->receive_buffer_size = bytes_needed; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cannot accommodate packet %d as buffer is full\n", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - } + /* Check if we have to allocate the buffer */ + size -= sizeof (struct GNUNET_STREAM_DataMessage); + relative_offset = ntohl (msg->offset) - socket->read_offset; + bytes_needed = relative_offset + size; + if (bytes_needed > socket->receive_buffer_size) + { + if (bytes_needed <= RECEIVE_BUFFER_SIZE) + { + socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, + bytes_needed); + socket->receive_buffer_size = bytes_needed; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Cannot accommodate packet %d as buffer is full\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + return GNUNET_YES; + } + } - /* Copy Data to buffer */ - payload = &msg[1]; - GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); - memcpy (socket->receive_buffer + relative_offset, - payload, - size); - socket->receive_buffer_boundaries[relative_sequence_number] = - relative_offset + size; + /* Copy Data to buffer */ + payload = &msg[1]; + GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); + memcpy (socket->receive_buffer + relative_offset, + payload, + size); + socket->receive_buffer_boundaries[relative_sequence_number] = + relative_offset + size; - /* Modify the ACK bitmap */ - ackbitmap_modify_bit (&socket->ack_bitmap, - relative_sequence_number, - GNUNET_YES); + /* Modify the ACK bitmap */ + ackbitmap_modify_bit (&socket->ack_bitmap, + relative_sequence_number, + GNUNET_YES); - /* Start ACK sending task if one is not already present */ - if (0 == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - - if ((NULL != socket->read_handle) /* A read handle is waiting */ - /* There is no current read task */ |