diff options
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r-- | src/stream/stream_api.c | 2185 |
1 files changed, 1648 insertions, 537 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 84fcdfd..dadba33 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -18,17 +18,31 @@ Boston, MA 02111-1307, USA. */ +/* TODO: + * + * Checks for matching the sender and socket->other_peer in server + * message handlers + * + * Add code for write io timeout + * + * Include retransmission for control messages + **/ + /** * @file stream/stream_api.c * @brief Implementation of the stream library * @author Sree Harsha Totakura */ + + #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" #include "gnunet_stream_lib.h" #include "stream_protocol.h" +#define LOG(kind,...) \ + GNUNET_log_from (kind, "stream-api", __VA_ARGS__) /** * The maximum packet size of a stream packet @@ -36,15 +50,15 @@ #define MAX_PACKET_SIZE 64000 /** - * The maximum payload a data message packet can carry + * Receive buffer */ -static size_t max_payload_size = - MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); +#define RECEIVE_BUFFER_SIZE 4096000 /** - * Receive buffer + * The maximum payload a data message packet can carry */ -#define RECEIVE_BUFFER_SIZE 4096000 +static size_t max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); /** * states in the Protocol @@ -150,12 +164,6 @@ struct MessageQueue */ struct GNUNET_STREAM_Socket { - - /** - * The peer identity of the peer at the other end of the stream - */ - struct GNUNET_PeerIdentity other_peer; - /** * Retransmission timeout */ @@ -177,16 +185,6 @@ struct GNUNET_STREAM_Socket struct GNUNET_TIME_Relative ack_time_deadline; /** - * The task for sending timely Acks - */ - GNUNET_SCHEDULER_TaskIdentifier ack_task_id; - - /** - * Task scheduled to continue a read operation. - */ - GNUNET_SCHEDULER_TaskIdentifier read_task; - - /** * The mesh handle */ struct GNUNET_MESH_Handle *mesh; @@ -212,6 +210,16 @@ struct GNUNET_STREAM_Socket struct GNUNET_MESH_TransmitHandle *transmit_handle; /** + * The current act transmit handle (if a pending ack transmit request exists) + */ + struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; + + /** + * Pointer to the current ack message using in ack_task + */ + struct GNUNET_STREAM_AckMessage *ack_msg; + + /** * The current message associated with the transmit handle */ struct MessageQueue *queue_head; @@ -232,14 +240,45 @@ struct GNUNET_STREAM_Socket struct GNUNET_STREAM_IOReadHandle *read_handle; /** + * The shutdown handle associated with this socket + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** * Buffer for storing received messages */ void *receive_buffer; /** + * The listen socket from which this socket is derived. Should be NULL if it + * is not a derived socket + */ + struct GNUNET_STREAM_ListenSocket *lsocket; + + /** + * The peer identity of the peer at the other end of the stream + */ + struct GNUNET_PeerIdentity other_peer; + + /** * Task identifier for the read io timeout task */ - GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; + GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; + + /** + * Task identifier for retransmission task after timeout + */ + GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + + /** + * The task for sending timely Acks + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task_id; + + /** + * Task scheduled to continue a read operation. + */ + GNUNET_SCHEDULER_TaskIdentifier read_task_id; /** * The state of the protocol associated with this socket @@ -257,6 +296,11 @@ struct GNUNET_STREAM_Socket unsigned int retries; /** + * The application port number (type: uint32_t) + */ + GNUNET_MESH_ApplicationType app_port; + + /** * The session id associated with this stream connection * FIXME: Not used currently, may be removed */ @@ -286,7 +330,7 @@ struct GNUNET_STREAM_Socket /** * receiver's available buffer after the last acknowledged packet */ - uint32_t receive_window_available; + uint32_t receiver_window_available; /** * The offset pointer used during write operation @@ -310,7 +354,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_STREAM_ListenSocket { - /** * The mesh handle */ @@ -328,6 +371,7 @@ struct GNUNET_STREAM_ListenSocket /** * The service port + * FIXME: Remove if not required! */ GNUNET_MESH_ApplicationType port; }; @@ -339,9 +383,24 @@ struct GNUNET_STREAM_ListenSocket struct GNUNET_STREAM_IOWriteHandle { /** + * The socket to which this write handle is associated + */ + struct GNUNET_STREAM_Socket *socket; + + /** * The packet_buffers associated with this Handle */ - struct GNUNET_STREAM_DataMessage *messages[64]; + struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; + + /** + * The write continuation callback + */ + GNUNET_STREAM_CompletionContinuation write_cont; + + /** + * Write continuation closure + */ + void *write_cont_cls; /** * The bitmap of this IOHandle; Corresponding bit for a message is set when @@ -350,11 +409,9 @@ struct GNUNET_STREAM_IOWriteHandle GNUNET_STREAM_AckBitmap ack_bitmap; /** - * Number of packets sent before waiting for an ack - * - * FIXME: Do we need this? + * Number of bytes in this write handle */ - unsigned int sent_packets; + size_t size; }; @@ -376,13 +433,45 @@ struct GNUNET_STREAM_IOReadHandle /** + * Handle for Shutdown + */ +struct GNUNET_STREAM_ShutdownHandle +{ + /** + * The socket associated with this shutdown handle + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Shutdown completion callback + */ + GNUNET_STREAM_ShutdownCompletion completion_cb; + + /** + * Closure for completion callback + */ + void *completion_cls; + + /** + * Close message retransmission task id + */ + GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id; + + /** + * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR + */ + int operation; +}; + + +/** * Default value in seconds for various timeouts */ -static unsigned int default_timeout = 300; +static unsigned int default_timeout = 10; /** - * Callback function for sending hello message + * Callback function for sending queued message * * @param cls closure the socket * @param size number of bytes available in buf @@ -401,31 +490,32 @@ send_message_notify (void *cls, size_t size, void *buf) if (NULL == head) return 0; /* just to be safe */ if (0 == size) /* request timed out */ - { - socket->retries++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message sending timed out. Retry %d \n", - socket->retries); - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - return 0; - } + { + socket->retries++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Message sending timed out. Retry %d \n", + GNUNET_i2s (&socket->other_peer), + socket->retries); + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + return 0; + } ret = ntohs (head->message->header.size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); if (NULL != head->finish_cb) - { - head->finish_cb (socket, head->finish_cb_cls); - } + { + head->finish_cb (head->finish_cb_cls, socket); + } GNUNET_CONTAINER_DLL_remove (socket->queue_head, socket->queue_tail, head); @@ -433,19 +523,19 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_free (head); head = socket->queue_head; if (NULL != head) /* more pending messages to send */ - { - socket->retries = 0; - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - } + { + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + } return ret; } @@ -466,6 +556,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket, { struct MessageQueue *queue_entity; + GNUNET_assert + ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) + && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Queueing message of type %d and size %d\n", + GNUNET_i2s (&socket->other_peer), + ntohs (message->header.type), + ntohs (message->header.size)); + GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; queue_entity->finish_cb = finish_cb; @@ -490,6 +590,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket, /** + * Copies a message and queues it for sending using the mesh connection of + * given socket + * + * @param socket the socket whose mesh connection is used + * @param message the message to be sent + * @param finish_cb the callback to be called when the message is sent + * @param finish_cb_cls the closure for the callback + */ +static void +copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_STREAM_MessageHeader *message, + SendFinishCallback finish_cb, + void *finish_cb_cls) +{ + struct GNUNET_STREAM_MessageHeader *msg_copy; + uint16_t size; + + size = ntohs (message->header.size); + msg_copy = GNUNET_malloc (size); + memcpy (msg_copy, message, size); + queue_message (socket, msg_copy, finish_cb, finish_cb_cls); +} + + +/** * Callback function for sending ack message * * @param cls closure the ACK message created in ack_task @@ -500,21 +625,56 @@ queue_message (struct GNUNET_STREAM_Socket *socket, static size_t send_ack_notify (void *cls, size_t size, void *buf) { - struct GNUNET_STREAM_AckMessage *ack_msg = cls; + struct GNUNET_STREAM_Socket *socket = cls; if (0 == size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s called with size 0\n", __func__); - return 0; - } - GNUNET_assert (ack_msg->header.header.size <= size); + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s called with size 0\n", __func__); + return 0; + } + GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); + + size = ntohs (socket->ack_msg->header.header.size); + memcpy (buf, socket->ack_msg, size); - size = ack_msg->header.header.size; - memcpy (buf, ack_msg, size); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; return size; } +/** + * Writes data using the given socket. The amount of data written is limited by + * the receiver_window_size + * + * @param socket the socket to use + */ +static void +write_data (struct GNUNET_STREAM_Socket *socket); + +/** + * Task for retransmitting data messages if they aren't ACK before their ack + * deadline + * + * @param cls the socket + * @param tc the Task context + */ +static void +retransmission_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + return; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer)); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + write_data (socket); +} + /** * Task for sending ACK message @@ -530,12 +690,10 @@ ack_task (void *cls, struct GNUNET_STREAM_AckMessage *ack_msg; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) - { - return; - } - - socket->ack_task_id = 0; - + { + return; + } + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); ack_msg->header.header.size = htons (sizeof (struct @@ -545,18 +703,61 @@ ack_task (void *cls, ack_msg->base_sequence_number = htonl (socket->read_sequence_number); ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); - + socket->ack_msg = ack_msg; /* Request MESH for sending ACK */ - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (ack_msg->header.header.size), - &send_ack_notify, - ack_msg); + socket->ack_transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (ack_msg->header.header.size), + &send_ack_notify, + socket); +} - + +/** + * Retransmission task for shutdown messages + * + * @param cls the shutdown handle + * @param tc the Task Context + */ +static void +close_msg_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; + struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_Socket *socket; + + GNUNET_assert (NULL != shutdown_handle); + socket = shutdown_handle->socket; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (shutdown_handle->operation) + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } + queue_message (socket, msg, NULL, NULL); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + shutdown_handle); } @@ -572,7 +773,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit, int value) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); if (GNUNET_YES == value) *bitmap |= (1LL << bit); else @@ -591,31 +792,14 @@ static uint8_t ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); return 0 != (*bitmap & (1LL << bit)); } - -/** - * Function called when Data Message is sent - * - * @param cls the io_handle corresponding to the Data Message - * @param socket the socket which was used - */ -static void -write_data_finish_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; - - io_handle->sent_packets++; -} - - /** * Writes data using the given socket. The amount of data written is limited by - * the receive_window_size + * the receiver_window_size * * @param socket the socket to use */ @@ -623,43 +807,59 @@ static void write_data (struct GNUNET_STREAM_Socket *socket) { struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle; - unsigned int packet; + int packet; /* Although an int, should never be negative */ int ack_packet; ack_packet = -1; /* Find the last acknowledged packet */ - for (packet=0; packet < 64; packet++) - { - if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - ack_packet = packet; - else if (NULL == io_handle->messages[packet]) - break; - } + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + ack_packet = packet; + else if (NULL == io_handle->messages[packet]) + break; + } /* Resend packets which weren't ack'ed */ for (packet=0; packet < ack_packet; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) { - if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - { - queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); } + } packet = ack_packet + 1; /* Now send new packets if there is enough buffer space */ while ( (NULL != io_handle->messages[packet]) && - (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) - { - socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); - queue_message (socket, - &io_handle->messages[packet]->header, - &write_data_finish_cb, - io_handle); - packet++; - } + (socket->receiver_window_available + >= ntohs (io_handle->messages[packet]->header.header.size)) ) + { + socket->receiver_window_available -= + ntohs (io_handle->messages[packet]->header.header.size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); + packet++; + } + if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 8), + &retransmission_timeout_task, + socket); } @@ -671,7 +871,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) */ static void call_read_processor (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; size_t read_size; @@ -680,87 +880,95 @@ call_read_processor (void *cls, uint32_t sequence_increase; uint32_t offset_increase; - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (NULL == socket->receive_buffer) + return; + GNUNET_assert (NULL != socket->read_handle); GNUNET_assert (NULL != socket->read_handle->proc); /* Check the bitmap for any holes */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, - packet)) - break; - } + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } /* We only call read processor if we have the first packet */ GNUNET_assert (0 < packet); - valid_read_size = socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; - GNUNET_assert (0 != valid_read_size); - /* Cancel the read_io_timeout_task */ - GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - + GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Calling read processor\n", + GNUNET_i2s (&socket->other_peer)); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor read %d bytes\n", + GNUNET_i2s (&socket->other_peer), read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor completed successfully\n", + GNUNET_i2s (&socket->other_peer)); /* Free the read handle */ GNUNET_free (socket->read_handle); socket->read_handle = NULL; - GNUNET_assert (read_size <= valid_read_size); socket->copy_offset += read_size; - /* Determine upto which packet we can remove from the buffer */ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) + { packet++; break; } if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) break; + } /* If no packets can be removed we can't move the buffer */ if (0 == packet) return; - sequence_increase = packet; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Sequence increase after read processor completion: %u\n", + GNUNET_i2s (&socket->other_peer), sequence_increase); /* Shift the data in the receive buffer */ memmove (socket->receive_buffer, socket->receive_buffer + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); - + socket->receive_buffer_size + - socket->receive_buffer_boundaries[sequence_increase-1]); /* Shift the bitmap */ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; - /* Set read_sequence_number */ socket->read_sequence_number += sequence_increase; - /* Set read_offset */ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1]; socket->read_offset += offset_increase; - /* Fix copy_offset */ GNUNET_assert (offset_increase <= socket->copy_offset); socket->copy_offset -= offset_increase; - /* Fix relative boundaries */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) { - if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) - { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; - } - else - socket->receive_buffer_boundaries[packet] = 0; + socket->receive_buffer_boundaries[packet] = + socket->receive_buffer_boundaries[packet + sequence_increase] + - offset_increase; } + else + socket->receive_buffer_boundaries[packet] = 0; + } } @@ -772,20 +980,32 @@ call_read_processor (void *cls, */ static void read_io_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; + GNUNET_STREAM_DataProcessor proc; + void *proc_cls; - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read task timedout - Cancelling it\n", + GNUNET_i2s (&socket->other_peer)); + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } GNUNET_assert (NULL != socket->read_handle); - + proc = socket->read_handle->proc; + proc_cls = socket->read_handle->proc_cls; + GNUNET_free (socket->read_handle); socket->read_handle = NULL; + /* Call the read processor to signal timeout */ + proc (proc_cls, + GNUNET_STREAM_TIMEOUT, + NULL, + 0); } @@ -815,96 +1035,150 @@ handle_data (struct GNUNET_STREAM_Socket *socket, size = htons (msg->header.header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } switch (socket->state) + { + case STATE_ESTABLISHED: + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + + /* check if the message's sequence number is in the range we are + expecting */ + relative_sequence_number = + ntohl (msg->sequence_number) - socket->read_sequence_number; + if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) { - case STATE_ESTABLISHED: - case STATE_TRANSMIT_CLOSED: - case STATE_TRANSMIT_CLOSE_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } + + /* Check if we have already seen this message */ + if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, + relative_sequence_number)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring already received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } - /* check if the message's sequence number is in the range we are - expecting */ - relative_sequence_number = - ntohl (msg->sequence_number) - socket->read_sequence_number; - if ( relative_sequence_number > 64) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring received message with sequence number %d", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - - /* Check if we have to allocate the buffer */ - size -= sizeof (struct GNUNET_STREAM_DataMessage); - relative_offset = ntohl (msg->offset) - socket->read_offset; - bytes_needed = relative_offset + size; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Receiving DATA with sequence number: %u and size: %d from %s\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number), + ntohs (msg->header.header.size), + GNUNET_i2s (&socket->other_peer)); - if (bytes_needed > socket->receive_buffer_size) - { - if (bytes_needed <= RECEIVE_BUFFER_SIZE) - { - socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, - bytes_needed); - socket->receive_buffer_size = bytes_needed; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cannot accommodate packet %d as buffer is full\n", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - } + /* Check if we have to allocate the buffer */ + size -= sizeof (struct GNUNET_STREAM_DataMessage); + relative_offset = ntohl (msg->offset) - socket->read_offset; + bytes_needed = relative_offset + size; + if (bytes_needed > socket->receive_buffer_size) + { + if (bytes_needed <= RECEIVE_BUFFER_SIZE) + { + socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, + bytes_needed); + socket->receive_buffer_size = bytes_needed; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Cannot accommodate packet %d as buffer is full\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + return GNUNET_YES; + } + } - /* Copy Data to buffer */ - payload = &msg[1]; - GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); - memcpy (socket->receive_buffer + relative_offset, - payload, - size); - socket->receive_buffer_boundaries[relative_sequence_number] = - relative_offset + size; + /* Copy Data to buffer */ + payload = &msg[1]; + GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); + memcpy (socket->receive_buffer + relative_offset, + payload, + size); + socket->receive_buffer_boundaries[relative_sequence_number] = + relative_offset + size; - /* Modify the ACK bitmap */ - ackbitmap_modify_bit (&socket->ack_bitmap, - relative_sequence_number, - GNUNET_YES); + /* Modify the ACK bitmap */ + ackbitmap_modify_bit (&socket->ack_bitmap, + relative_sequence_number, + GNUNET_YES); - /* Start ACK sending task if one is not already present */ - if (0 == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - - if ((NULL != socket->read_handle) /* A read handle is waiting */ - /* There is no current read task */ - && (GNUNET_SCHEDULER_NO_TASK == socket->read_task) - /* We have the first packet */ - && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, - 0))) - { - socket->read_task = - GNUNET_SCHEDULER_add_now (&call_read_processor, + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, socket); - } - - break; + } - default: - /* FIXME: call statistics */ - break; + if ((NULL != socket->read_handle) /* A read handle is waiting */ + /* There is no current read task */ + && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) + /* We have the first packet */ + && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, + 0))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Scheduling read processor\n", + GNUNET_i2s (&socket->other_peer)); + + socket->read_task_id = + GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); } + + break; + + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received data message when it cannot be handled\n", + GNUNET_i2s (&socket->other_peer)); + break; + } return GNUNET_YES; } + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -919,11 +1193,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket, */ static int client_handle_data (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - void **tunnel_ctx, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; @@ -945,10 +1219,31 @@ static void set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaining ESTABLISHED state\n", + GNUNET_i2s (&socket->other_peer)); socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; + /* FIXME: What if listen_cb is NULL */ + if (NULL != socket->lsocket) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Calling listen callback\n", + GNUNET_i2s (&socket->other_peer)); + if (GNUNET_SYSERR == + socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls, + socket, + &socket->other_peer)) + { + socket->state = STATE_CLOSED; + /* FIXME: We should close in a decent way */ + GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */ + GNUNET_free (socket); + } + } + else if (socket->open_cb) + socket->open_cb (socket->open_cls, socket); } @@ -963,12 +1258,115 @@ set_state_hello_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_assert (STATE_INIT == socket->state); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaining HELLO_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); socket->state = STATE_HELLO_WAIT; } /** + * Callback to set state to CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to RECEIVE_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_receive_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing RECEIVE_CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_RECEIVE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to TRANSMIT_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_transmit_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing TRANSMIT_CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_TRANSMIT_CLOSE_WAIT; +} + + +/** + * Callback to set state to CLOSED + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_closed (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + socket->state = STATE_CLOSED; +} + +/** + * Returns a new HelloAckMessage. Also sets the write sequence number for the + * socket + * + * @param socket the socket for which this HelloAckMessage has to be generated + * @return the HelloAckMessage + */ +static struct GNUNET_STREAM_HelloAckMessage * +generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) +{ + struct GNUNET_STREAM_HelloAckMessage *msg; + + /* Get the random sequence number */ + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Generated write sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->write_sequence_number); + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + msg->header.header.size = + htons (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + msg->sequence_number = htonl (socket->write_sequence_number); + msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE); + + return msg; +} + + +/** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK * * @param cls the socket (set from GNUNET_MESH_connect) @@ -992,37 +1390,48 @@ client_handle_hello_ack (void *cls, const struct GNUNET_STREAM_HelloAckMessage *ack_msg; struct GNUNET_STREAM_HelloAckMessage *reply; + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + GNUNET_assert (socket->tunnel == tunnel); switch (socket->state) { case STATE_HELLO_WAIT: - socket->read_sequence_number = ntohl (ack_msg->sequence_number); - socket->receive_window_available = ntohl (ack_msg->receive_window_size); - /* Get the random sequence number */ - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - reply->header.header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); - reply->sequence_number = htonl (socket->write_sequence_number); - reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); - queue_message (socket, - &reply->header, - &set_state_established, - NULL); - return GNUNET_OK; + socket->read_sequence_number = ntohl (ack_msg->sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->read_sequence_number); + socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); + reply = generate_hello_ack_msg (socket); + queue_message (socket, + &reply->header, + &set_state_established, + NULL); + return GNUNET_OK; case STATE_ESTABLISHED: case STATE_RECEIVE_CLOSE_WAIT: // call statistics (# ACKs ignored++) return GNUNET_OK; case STATE_INIT: default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server sent HELLO_ACK when in state %d\n", socket->state); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Server %s sent HELLO_ACK when in state %d\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer), + socket->state); socket->state = STATE_CLOSED; // introduce STATE_ERROR? return GNUNET_SYSERR; } @@ -1050,7 +1459,7 @@ client_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = cls; + // struct GNUNET_STREAM_Socket *socket = cls; return GNUNET_OK; } @@ -1077,22 +1486,22 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *reply; switch (socket->state) - { - case STATE_ESTABLISHED: - socket->state = STATE_RECEIVE_CLOSED; + { + case STATE_ESTABLISHED: + socket->state = STATE_RECEIVE_CLOSED; - /* Send TRANSMIT_CLOSE_ACK */ - reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); - reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - queue_message (socket, reply, NULL, NULL); - break; + /* Send TRANSMIT_CLOSE_ACK */ + reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); + reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + queue_message (socket, reply, NULL, NULL); + break; - default: - /* FIXME: Call statistics? */ - break; - } + default: + /* FIXME: Call statistics? */ + break; + } return GNUNET_YES; } @@ -1128,6 +1537,141 @@ client_handle_transmit_close (void *cls, /** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @param operation the close operation which is being ACK'ed + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + int operation) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + shutdown_handle = socket->shutdown_handle; + if (NULL == shutdown_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when shutdown handle is NULL\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + switch (operation) + { + case SHUT_RDWR: + switch (socket->state) + { + case STATE_CLOSE_WAIT: + if (SHUT_RDWR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when shutdown handle is not for " + "SHUT_RDWR\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + break; + + case SHUT_RD: + switch (socket->state) + { + case STATE_RECEIVE_CLOSE_WAIT: + if (SHUT_RD != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK when shutdown handle " + "is not for SHUT_RD\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_RECEIVE_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + break; + case SHUT_WR: + switch (socket->state) + { + case STATE_TRANSMIT_CLOSE_WAIT: + if (SHUT_WR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle " + "is not for SHUT_WR\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_TRANSMIT_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + + return GNUNET_OK; + } + break; + default: + GNUNET_assert (0); + } + + if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ + shutdown_handle->completion_cb(shutdown_handle->completion_cls, + operation); + GNUNET_free (shutdown_handle); /* Free shutdown handle */ + socket->shutdown_handle = NULL; + if (GNUNET_SCHEDULER_NO_TASK + != shutdown_handle->close_msg_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; +} + + +/** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK * * @param cls the socket (set from GNUNET_MESH_connect) @@ -1149,6 +1693,67 @@ client_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_receive_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_MessageHeader *receive_close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + default: + break; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + receive_close_ack = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); + queue_message (socket, + receive_close_ack, + &set_state_closed, + NULL); + + /* FIXME: Handle the case where write handle is present; the write operation + should be deemed as finised and the write continuation callback + has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ return GNUNET_OK; } @@ -1175,7 +1780,12 @@ client_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1201,6 +1811,66 @@ client_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) +{ + struct GNUNET_STREAM_MessageHeader *close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + default: + break; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); + queue_message (socket, + close_ack, + &set_state_closed, + NULL); + if (socket->state == STATE_CLOSED) + return GNUNET_OK; + + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; return GNUNET_OK; } @@ -1227,7 +1897,11 @@ client_handle_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1249,11 +1923,17 @@ client_handle_close_ack (void *cls, void **tunnel_ctx, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + const struct GNUNET_ATS_Information *atsi) { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /*****************************/ @@ -1313,31 +1993,39 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; struct GNUNET_STREAM_HelloAckMessage *reply; + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } + + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + if (STATE_INIT == socket->state) - { - /* Get the random sequence number */ - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - reply->header.header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); - reply->sequence_number = htonl (socket->write_sequence_number); - queue_message (socket, - &reply->header, - &set_state_hello_wait, - NULL); - } + { + reply = generate_hello_ack_msg (socket); + queue_message (socket, + &reply->header, + &set_state_hello_wait, + NULL); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -1365,23 +2053,33 @@ server_handle_hello_ack (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; const struct GNUNET_STREAM_HelloAckMessage *ack_message; - ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); + ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; if (STATE_HELLO_WAIT == socket->state) - { - socket->read_sequence_number = ntohl (ack_message->sequence_number); - socket->receive_window_available = - ntohl (ack_message->receive_window_size); - /* Attain ESTABLISHED state */ - set_state_established (NULL, socket); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->read_sequence_number = ntohl (ack_message->sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->read_sequence_number); + socket->receiver_window_available = + ntohl (ack_message->receiver_window_size); + /* Attain ESTABLISHED state */ + set_state_established (NULL, socket); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO_ACK when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO_ACK when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -1406,7 +2104,7 @@ server_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; + // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; return GNUNET_OK; } @@ -1464,7 +2162,13 @@ server_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); } @@ -1490,7 +2194,12 @@ server_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1516,14 +2225,21 @@ server_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } /** * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE * - * @param cls the closure + * @param cls the listen socket (from GNUNET_MESH_connect in + * GNUNET_STREAM_listen) * @param tunnel connection to the other end * @param tunnel_ctx the socket * @param sender who sent the message @@ -1541,8 +2257,12 @@ server_handle_close (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - - return GNUNET_OK; + + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1568,12 +2288,18 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param socket the socket through which the ack was received * @param tunnel connection to the other end @@ -1590,30 +2316,132 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_STREAM_AckMessage *ack, const struct GNUNET_ATS_Information*atsi) { + unsigned int packet; + int need_retransmission; + + + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received ACK from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } + switch (socket->state) + { + case (STATE_ESTABLISHED): + case (STATE_RECEIVE_CLOSED): + case (STATE_RECEIVE_CLOSE_WAIT): + if (NULL == socket->write_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK when write_handle is NULL\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + /* FIXME: increment in the base sequence number is breaking current flow + */ + if (!((socket->write_sequence_number + - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK with unexpected base sequence number\n", + GNUNET_i2s (&socket->other_peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Current write sequence: %u; Ack's base sequence: %u\n", + GNUNET_i2s (&socket->other_peer), + socket->write_sequence_number, + ntohl (ack->base_sequence_number)); + return GNUNET_OK; + } + /* FIXME: include the case when write_handle is cancelled - ignore the + acks */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + /* Cancel the retransmission task */ + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (ntohl (ack->base_sequence_number) + >= ntohl (socket->write_handle->messages[packet]->sequence_number)) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + else + if (GNUNET_YES == + ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap, + ntohl (socket->write_handle->messages[packet]->sequence_number) + - ntohl (ack->base_sequence_number))) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + } + + /* Update the receive window remaining + FIXME : Should update with the value from a data ack with greater + sequence number */ + socket->receiver_window_available = + ntohl (ack->receive_window_remaining); + + /* Check if we have received all acknowledgements */ + need_retransmission = GNUNET_NO; + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (GNUNET_YES != ackbitmap_is_bit_set + (&socket->write_handle->ack_bitmap,packet)) + { + need_retransmission = GNUNET_YES; + break; + } + } + if (GNUNET_YES == need_retransmission) { - case (STATE_ESTABLISHED): - if (NULL == socket->write_handle) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA ACK when write_handle is NULL\n"); - return GNUNET_OK; - } - - socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); - socket->receive_window_available = - ntohl (ack->receive_window_remaining); write_data (socket); - break; - default: - break; } + else /* We have to call the write continuation callback now */ + { + /* Free the packets */ + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + GNUNET_free_non_null (socket->write_handle->messages[packet]); + } + if (NULL != socket->write_handle->write_cont) + socket->write_handle->write_cont + (socket->write_handle->write_cont_cls, + socket->status, + socket->write_handle->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Write completion callback completed\n", + GNUNET_i2s (&socket->other_peer)); + /* We are done with the write handle - Freeing it */ + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; + } + break; + default: + break; + } return GNUNET_OK; } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the 'struct GNUNET_STREAM_Socket' * @param tunnel connection to the other end @@ -1640,7 +2468,7 @@ client_handle_ack (void *cls, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the server's listen socket * @param tunnel connection to the other end @@ -1738,19 +2566,21 @@ mesh_peer_connect_callback (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; struct GNUNET_STREAM_MessageHeader *message; - - if (0 != memcmp (&socket->other_peer, - peer, + + if (0 != memcmp (peer, + &socket->other_peer, sizeof (struct GNUNET_PeerIdentity))) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "A peer (%s) which is not our target has connected to our tunnel", - GNUNET_i2s (peer)); - return; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: A peer which is not our target has connected to our tunnel\n", + GNUNET_i2s(peer)); + return; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Target peer %s connected\n", GNUNET_i2s (peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Target peer %s connected\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); /* Set state to INIT */ socket->state = STATE_INIT; @@ -1766,14 +2596,10 @@ mesh_peer_connect_callback (void *cls, /* Call open callback */ if (NULL == socket->open_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "STREAM_open callback is NULL\n"); - } - else - { - socket->open_cb (socket->open_cls, socket); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "STREAM_open callback is NULL\n"); + } } @@ -1787,7 +2613,112 @@ static void mesh_peer_disconnect_callback (void *cls, const struct GNUNET_PeerIdentity *peer) { + struct GNUNET_STREAM_Socket *socket=cls; + + /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Other peer %s disconnected \n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); +} + +/** + * Method called whenever a peer creates a tunnel to us + * + * @param cls closure + * @param tunnel new handle to the tunnel + * @param initiator peer that started the tunnel + * @param atsi performance information for the tunnel + * @return initial tunnel context for the tunnel + * (can be NULL -- that's not an error) + */ +static void * +new_tunnel_notify (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_ListenSocket *lsocket = cls; + struct GNUNET_STREAM_Socket *socket; + + /* FIXME: If a tunnel is already created, we should not accept new tunnels + from the same peer again until the socket is closed */ + + socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + socket->other_peer = *initiator; + socket->tunnel = tunnel; + socket->session_id = 0; /* FIXME */ + socket->state = STATE_INIT; + socket->lsocket = lsocket; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer %s initiated tunnel to us\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + /* FIXME: Copy MESH handle from lsocket to socket */ + + return socket; +} + + +/** + * Function called whenever an inbound tunnel is destroyed. Should clean up + * any associated state. This function is NOT called if the client has + * explicitly asked for the tunnel to be destroyed using + * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on + * the tunnel. + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end (henceforth invalid) + * @param tunnel_ctx place where local state associated + * with the tunnel is stored + */ +static void +tunnel_cleaner (void *cls, + const struct GNUNET_MESH_Tunnel *tunnel, + void *tunnel_ctx) +{ + struct GNUNET_STREAM_Socket *socket = tunnel_ctx; + + if (tunnel != socket->tunnel) + return; + + GNUNET_break_op(0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer %s has terminated connection abruptly\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + socket->status = GNUNET_STREAM_SHUTDOWN; + + /* Clear Transmit handles */ + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } + /* Stop Tasks using socket->tunnel */ + if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + /* FIXME: Cancel all other tasks using socket->tunnel */ + socket->tunnel = NULL; } @@ -1819,55 +2750,167 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_STREAM_Socket *socket; enum GNUNET_STREAM_Option option; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; /* Variable arguments */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = *target; socket->open_cb = open_cb; socket->open_cls = open_cb_cls; - /* Set defaults */ socket->retransmit_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); - va_start (vargs, open_cb_cls); /* Parse variable args */ do { option = va_arg (vargs, enum GNUNET_STREAM_Option); switch (option) - { - case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: - /* Expect struct GNUNET_TIME_Relative */ - socket->retransmit_timeout = va_arg (vargs, - struct GNUNET_TIME_Relative); - break; - case GNUNET_STREAM_OPTION_END: - break; - } + { + case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: + /* Expect struct GNUNET_TIME_Relative */ + socket->retransmit_timeout = va_arg (vargs, + struct GNUNET_TIME_Relative); + break; + case GNUNET_STREAM_OPTION_END: + break; + } } while (GNUNET_STREAM_OPTION_END != option); va_end (vargs); /* End of variable args parsing */ - socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ - 1, /* QUEUE size as parameter? */ + 10, /* QUEUE size as parameter? */ socket, /* cls */ NULL, /* No inbound tunnel handler */ - NULL, /* No inbound tunnel cleaner */ + NULL, /* No in-tunnel cleaner */ client_message_handlers, - NULL); /* We don't get inbound tunnels */ - // FIXME: if (NULL == socket->mesh) ... + ports); /* We don't get inbound tunnels */ + if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ + { + GNUNET_free (socket); + return NULL; + } /* Now create the mesh tunnel to target */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating MESH Tunnel\n"); socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, NULL, /* Tunnel context */ &mesh_peer_connect_callback, &mesh_peer_disconnect_callback, socket); - // FIXME: if (NULL == socket->tunnel) ... - + GNUNET_assert (NULL != socket->tunnel); + GNUNET_MESH_peer_request_connect_add (socket->tunnel, + &socket->other_peer); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return socket; } /** + * Shutdown the stream for reading or writing (similar to man 2 shutdown). + * + * @param socket the stream socket + * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR + * @param completion_cb the callback that will be called upon successful + * shutdown of given operation + * @param completion_cls the closure for the completion callback + * @return the shutdown handle + */ +struct GNUNET_STREAM_ShutdownHandle * +GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, + int operation, + GNUNET_STREAM_ShutdownCompletion completion_cb, + void *completion_cls) +{ + struct GNUNET_STREAM_ShutdownHandle *handle; + struct GNUNET_STREAM_MessageHeader *msg; + + GNUNET_assert (NULL == socket->shutdown_handle); + + handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle)); + handle->socket = socket; + handle->completion_cb = completion_cb; + handle->completion_cls = completion_cls; + socket->shutdown_handle = handle; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (operation) + { + case SHUT_RD: + handle->operation = SHUT_RD; + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + queue_message (socket, + msg, + &set_state_receive_close_wait, + NULL); + break; + case SHUT_WR: + handle->operation = SHUT_WR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + queue_message (socket, + msg, + &set_state_transmit_close_wait, + NULL); + break; + case SHUT_RDWR: + handle->operation = SHUT_RDWR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + queue_message (socket, + msg, + &set_state_close_wait, + NULL); + break; + default: + LOG (GNUNET_ERROR_TYPE_WARNING, + "GNUNET_STREAM_shutdown called with invalid value for " + "parameter operation -- Ignoring\n"); + GNUNET_free (msg); + GNUNET_free (handle); + return NULL; + } + handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + handle); + return handle; +} + + +/** + * Cancels a pending shutdown + * + * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown + */ +void +GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) +{ + if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) + GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); + GNUNET_free (handle); + return; +} + + +/** * Closes the stream * * @param socket the stream socket @@ -1877,20 +2920,45 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) { struct MessageQueue *head; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != socket->read_handle) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Closing STREAM socket when a read handle is pending\n"); + } + if (NULL != socket->write_handle) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Closing STREAM socket when a write handle is pending\n"); + } + + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { /* socket closed with read task pending!? */ GNUNET_break (0); - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; + } + + /* Terminate the ack'ing tasks if they are still present */ + if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; } /* Clear Transmit handles */ if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { @@ -1903,101 +2971,29 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) /* Close associated tunnel */ if (NULL != socket->tunnel) - { - GNUNET_MESH_tunnel_destroy (socket->tunnel); - socket->tunnel = NULL; - } + { + GNUNET_MESH_tunnel_destroy (socket->tunnel); + socket->tunnel = NULL; + } /* Close mesh connection */ - if (NULL != socket->mesh) - { - GNUNET_MESH_disconnect (socket->mesh); - socket->mesh = NULL; - } + if (NULL != socket->mesh && NULL == socket->lsocket) + { + GNUNET_MESH_disconnect (socket->mesh); + socket->mesh = NULL; + } /* Release receive buffer */ if (NULL != socket->receive_buffer) - { - GNUNET_free (socket->receive_buffer); - } + { + GNUNET_free (socket->receive_buffer); + } GNUNET_free (socket); } /** - * Method called whenever a peer creates a tunnel to us - * - * @param cls closure - * @param tunnel new handle to the tunnel - * @param initiator peer that started the tunnel - * @param atsi performance information for the tunnel - * @return initial tunnel context for the tunnel - * (can be NULL -- that's not an error) - */ -static void * -new_tunnel_notify (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_ATS_Information *atsi) -{ - struct GNUNET_STREAM_ListenSocket *lsocket = cls; - struct GNUNET_STREAM_Socket *socket; - - socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); - socket->tunnel = tunnel; - socket->session_id = 0; /* FIXME */ - socket->other_peer = *initiator; - socket->state = STATE_INIT; - - if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, - socket, - &socket->other_peer)) - { - socket->state = STATE_CLOSED; - /* FIXME: Send CLOSE message and then free */ - GNUNET_free (socket); - GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ - } - return socket; -} - - -/** - * Function called whenever an inbound tunnel is destroyed. Should clean up - * any associated state. This function is NOT called if the client has - * explicitly asked for the tunnel to be destroyed using - * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on - * the tunnel. - * - * @param cls closure (set from GNUNET_MESH_connect) - * @param tunnel connection to the other end (henceforth invalid) - * @param tunnel_ctx place where local state associated - * with the tunnel is stored - */ -static void -tunnel_cleaner (void *cls, - const struct GNUNET_MESH_Tunnel *tunnel, - void *tunnel_ctx) -{ - struct GNUNET_STREAM_Socket *socket = tunnel_ctx; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s has terminated connection abruptly\n", - GNUNET_i2s (&socket->other_peer)); - - socket->status = GNUNET_STREAM_SHUTDOWN; - /* Clear Transmit handles */ - if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } - socket->tunnel = NULL; -} - - -/** * Listens for stream connections for a specific application ports * * @param cfg the configuration to use @@ -2015,10 +3011,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; - GNUNET_MESH_ApplicationType app_types[2]; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; - app_types[0] = app_port; - app_types[1] = 0; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); lsocket->port = app_port; lsocket->listen_cb = listen_cb; @@ -2029,7 +3023,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, &new_tunnel_notify, &tunnel_cleaner, server_message_handlers, - app_types); + ports); + GNUNET_assert (NULL != lsocket->mesh); return lsocket; } @@ -2043,6 +3038,7 @@ void GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) { /* Close MESH connection */ + GNUNET_assert (NULL != lsocket->mesh); GNUNET_MESH_disconnect (lsocket->mesh); GNUNET_free (lsocket); @@ -2050,15 +3046,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) /** - * Tries to write the given data to the stream + * Tries to write the given data to the stream. The maximum size of data that + * can be written as part of a write operation is (64 * (64000 - sizeof (struct + * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API + * violation, however only the said number of maximum bytes will be written. * * @param socket the socket representing a stream * @param data the data buffer from where the data is written into the stream * @param size the number of bytes to be written from the data buffer * @param timeout the timeout period - * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont the function to call upon writing some bytes into the + * stream * @param write_cont_cls the closure - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if a previous write is pending or + * the stream has been shutdown for this operation then write_cont is + * immediately called and NULL is returned. */ struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, @@ -2075,6 +3078,10 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, uint32_t payload_size; struct GNUNET_STREAM_DataMessage *data_msg; const void *sweep; + struct GNUNET_TIME_Relative ack_deadline; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); /* Return NULL if there is already a write request pending */ if (NULL != socket->write_handle) @@ -2082,70 +3089,101 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, GNUNET_break (0); return NULL; } - if (!((STATE_ESTABLISHED == socket->state) - || (STATE_RECEIVE_CLOSE_WAIT == socket->state) - || (STATE_RECEIVE_CLOSED == socket->state))) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Attempting to write on a closed (OR) not-yet-established" - "stream\n"); - return NULL; - } + + switch (socket->state) + { + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + if (NULL != write_cont) + write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + if (NULL != write_cont) + /* FIXME: GNUNET_STREAM_SYSERR?? */ + write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_ESTABLISHED: + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + break; + } + if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); + io_handle->socket = socket; + io_handle->write_cont = write_cont; + io_handle->write_cont_cls = write_cont_cls; + io_handle->size = size; sweep = data; + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) + { + if ((packet + 1) * max_payload_size < size) + { + payload_size = max_payload_size; + packet_size = MAX_PACKET_SIZE; + } + else { - if ((packet + 1) * max_payload_size < size) - { - payload_size = max_payload_size; - packet_size = MAX_PACKET_SIZE; - } - else - { - payload_size = size - packet * max_payload_size; - packet_size = payload_size + sizeof (struct - GNUNET_STREAM_DataMessage); - } - io_handle->messages[packet] = GNUNET_malloc (packet_size); - io_handle->messages[packet]->header.header.size = htons (packet_size); - io_handle->messages[packet]->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); - io_handle->messages[packet]->sequence_number = - htons (socket->write_sequence_number++); - io_handle->messages[packet]->offset = htons (socket->write_offset); - - /* FIXME: Remove the fixed delay for ack deadline; Set it to the value - determined from RTT */ - io_handle->messages[packet]->ack_deadline = - GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5)); - data_msg = io_handle->messages[packet]; - /* Copy data from given buffer to the packet */ - memcpy (&data_msg[1], - sweep, - payload_size); - sweep += payload_size; - socket->write_offset += payload_size; + payload_size = size - packet * max_payload_size; + packet_size = payload_size + sizeof (struct + GNUNET_STREAM_DataMessage); } + io_handle->messages[packet] = GNUNET_malloc (packet_size); + io_handle->messages[packet]->header.header.size = htons (packet_size); + io_handle->messages[packet]->header.header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + io_handle->messages[packet]->sequence_number = + htonl (socket->write_sequence_number++); + io_handle->messages[packet]->offset = htonl (socket->write_offset); + + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + io_handle->messages[packet]->ack_deadline = + GNUNET_TIME_relative_hton (ack_deadline); + data_msg = io_handle->messages[packet]; + /* Copy data from given buffer to the packet */ + memcpy (&data_msg[1], + sweep, + payload_size); + sweep += payload_size; + socket->write_offset += payload_size; + } socket->write_handle = io_handle; write_data (socket); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return io_handle; } + /** - * Tries to read data from the stream + * Tries to read data from the stream. * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if the stream has been shutdown for + * this type of opeartion then the DataProcessor is immediately + * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, @@ -2155,26 +3193,99 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, { struct GNUNET_STREAM_IOReadHandle *read_handle; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s()\n", + GNUNET_i2s (&socket->other_peer), + __func__); + /* Return NULL if there is already a read handle; the user has to cancel that - first before continuing or has to wait until it is completed */ + first before continuing or has to wait until it is completed */ if (NULL != socket->read_handle) return NULL; + GNUNET_assert (NULL != proc); + + switch (socket->state) + { + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s() END\n", + GNUNET_i2s (&socket->other_peer), + __func__); + return NULL; + default: + break; + } + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; + read_handle->proc_cls = proc_cls; socket->read_handle = read_handle; /* Check if we have a packet at bitmap 0 */ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 0)) - { - socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor, - socket); + { + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); - } + } /* Setup the read timeout task */ - socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &read_io_timeout, - socket); + socket->read_io_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (timeout, + &read_io_timeout, + socket); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s() END\n", + GNUNET_i2s (&socket->other_peer), + __func__); return read_handle; } + + +/** + * Cancel pending write operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) +{ + struct GNUNET_STREAM_Socket *socket = ioh->socket; + unsigned int packet; + + GNUNET_assert (NULL != socket->write_handle); + GNUNET_assert (socket->write_handle == ioh); + + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == ioh->messages[packet]) break; + GNUNET_free (ioh->messages[packet]); + } + + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; + return; +} + + +/** + * Cancel pending read operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) +{ + return; +} |