aboutsummaryrefslogtreecommitdiff
path: root/src/stream/stream_api.c
diff options
context:
space:
mode:
authorBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
committerBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
commit740b30688bd745a527f96f9116c19acb3480971a (patch)
tree2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/stream/stream_api.c
parent2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff)
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/stream/stream_api.c')
-rw-r--r--src/stream/stream_api.c2185
1 files changed, 1648 insertions, 537 deletions
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 84fcdfd..dadba33 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -18,17 +18,31 @@
Boston, MA 02111-1307, USA.
*/
+/* TODO:
+ *
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers
+ *
+ * Add code for write io timeout
+ *
+ * Include retransmission for control messages
+ **/
+
/**
* @file stream/stream_api.c
* @brief Implementation of the stream library
* @author Sree Harsha Totakura
*/
+
+
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
+#define LOG(kind,...) \
+ GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
/**
* The maximum packet size of a stream packet
@@ -36,15 +50,15 @@
#define MAX_PACKET_SIZE 64000
/**
- * The maximum payload a data message packet can carry
+ * Receive buffer
*/
-static size_t max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+#define RECEIVE_BUFFER_SIZE 4096000
/**
- * Receive buffer
+ * The maximum payload a data message packet can carry
*/
-#define RECEIVE_BUFFER_SIZE 4096000
+static size_t max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
/**
* states in the Protocol
@@ -150,12 +164,6 @@ struct MessageQueue
*/
struct GNUNET_STREAM_Socket
{
-
- /**
- * The peer identity of the peer at the other end of the stream
- */
- struct GNUNET_PeerIdentity other_peer;
-
/**
* Retransmission timeout
*/
@@ -177,16 +185,6 @@ struct GNUNET_STREAM_Socket
struct GNUNET_TIME_Relative ack_time_deadline;
/**
- * The task for sending timely Acks
- */
- GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
-
- /**
- * Task scheduled to continue a read operation.
- */
- GNUNET_SCHEDULER_TaskIdentifier read_task;
-
- /**
* The mesh handle
*/
struct GNUNET_MESH_Handle *mesh;
@@ -212,6 +210,16 @@ struct GNUNET_STREAM_Socket
struct GNUNET_MESH_TransmitHandle *transmit_handle;
/**
+ * The current act transmit handle (if a pending ack transmit request exists)
+ */
+ struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
+
+ /**
+ * Pointer to the current ack message using in ack_task
+ */
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
+ /**
* The current message associated with the transmit handle
*/
struct MessageQueue *queue_head;
@@ -232,14 +240,45 @@ struct GNUNET_STREAM_Socket
struct GNUNET_STREAM_IOReadHandle *read_handle;
/**
+ * The shutdown handle associated with this socket
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
* Buffer for storing received messages
*/
void *receive_buffer;
/**
+ * The listen socket from which this socket is derived. Should be NULL if it
+ * is not a derived socket
+ */
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+
+ /**
+ * The peer identity of the peer at the other end of the stream
+ */
+ struct GNUNET_PeerIdentity other_peer;
+
+ /**
* Task identifier for the read io timeout task
*/
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+ /**
+ * Task identifier for retransmission task after timeout
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+
+ /**
+ * The task for sending timely Acks
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
/**
* The state of the protocol associated with this socket
@@ -257,6 +296,11 @@ struct GNUNET_STREAM_Socket
unsigned int retries;
/**
+ * The application port number (type: uint32_t)
+ */
+ GNUNET_MESH_ApplicationType app_port;
+
+ /**
* The session id associated with this stream connection
* FIXME: Not used currently, may be removed
*/
@@ -286,7 +330,7 @@ struct GNUNET_STREAM_Socket
/**
* receiver's available buffer after the last acknowledged packet
*/
- uint32_t receive_window_available;
+ uint32_t receiver_window_available;
/**
* The offset pointer used during write operation
@@ -310,7 +354,6 @@ struct GNUNET_STREAM_Socket
*/
struct GNUNET_STREAM_ListenSocket
{
-
/**
* The mesh handle
*/
@@ -328,6 +371,7 @@ struct GNUNET_STREAM_ListenSocket
/**
* The service port
+ * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
};
@@ -339,9 +383,24 @@ struct GNUNET_STREAM_ListenSocket
struct GNUNET_STREAM_IOWriteHandle
{
/**
+ * The socket to which this write handle is associated
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
* The packet_buffers associated with this Handle
*/
- struct GNUNET_STREAM_DataMessage *messages[64];
+ struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+ /**
+ * The write continuation callback
+ */
+ GNUNET_STREAM_CompletionContinuation write_cont;
+
+ /**
+ * Write continuation closure
+ */
+ void *write_cont_cls;
/**
* The bitmap of this IOHandle; Corresponding bit for a message is set when
@@ -350,11 +409,9 @@ struct GNUNET_STREAM_IOWriteHandle
GNUNET_STREAM_AckBitmap ack_bitmap;
/**
- * Number of packets sent before waiting for an ack
- *
- * FIXME: Do we need this?
+ * Number of bytes in this write handle
*/
- unsigned int sent_packets;
+ size_t size;
};
@@ -376,13 +433,45 @@ struct GNUNET_STREAM_IOReadHandle
/**
+ * Handle for Shutdown
+ */
+struct GNUNET_STREAM_ShutdownHandle
+{
+ /**
+ * The socket associated with this shutdown handle
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Shutdown completion callback
+ */
+ GNUNET_STREAM_ShutdownCompletion completion_cb;
+
+ /**
+ * Closure for completion callback
+ */
+ void *completion_cls;
+
+ /**
+ * Close message retransmission task id
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+ /**
+ * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+ int operation;
+};
+
+
+/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 300;
+static unsigned int default_timeout = 10;
/**
- * Callback function for sending hello message
+ * Callback function for sending queued message
*
* @param cls closure the socket
* @param size number of bytes available in buf
@@ -401,31 +490,32 @@ send_message_notify (void *cls, size_t size, void *buf)
if (NULL == head)
return 0; /* just to be safe */
if (0 == size) /* request timed out */
- {
- socket->retries++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message sending timed out. Retry %d \n",
- socket->retries);
- socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
- return 0;
- }
+ {
+ socket->retries++;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Message sending timed out. Retry %d \n",
+ GNUNET_i2s (&socket->other_peer),
+ socket->retries);
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ return 0;
+ }
ret = ntohs (head->message->header.size);
GNUNET_assert (size >= ret);
memcpy (buf, head->message, ret);
if (NULL != head->finish_cb)
- {
- head->finish_cb (socket, head->finish_cb_cls);
- }
+ {
+ head->finish_cb (head->finish_cb_cls, socket);
+ }
GNUNET_CONTAINER_DLL_remove (socket->queue_head,
socket->queue_tail,
head);
@@ -433,19 +523,19 @@ send_message_notify (void *cls, size_t size, void *buf)
GNUNET_free (head);
head = socket->queue_head;
if (NULL != head) /* more pending messages to send */
- {
- socket->retries = 0;
- socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
- }
+ {
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ }
return ret;
}
@@ -466,6 +556,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
{
struct MessageQueue *queue_entity;
+ GNUNET_assert
+ ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
+ && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Queueing message of type %d and size %d\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohs (message->header.type),
+ ntohs (message->header.size));
+ GNUNET_assert (NULL != message);
queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
queue_entity->message = message;
queue_entity->finish_cb = finish_cb;
@@ -490,6 +590,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
/**
+ * Copies a message and queues it for sending using the mesh connection of
+ * given socket
+ *
+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
+{
+ struct GNUNET_STREAM_MessageHeader *msg_copy;
+ uint16_t size;
+
+ size = ntohs (message->header.size);
+ msg_copy = GNUNET_malloc (size);
+ memcpy (msg_copy, message, size);
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+}
+
+
+/**
* Callback function for sending ack message
*
* @param cls closure the ACK message created in ack_task
@@ -500,21 +625,56 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
static size_t
send_ack_notify (void *cls, size_t size, void *buf)
{
- struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+ struct GNUNET_STREAM_Socket *socket = cls;
if (0 == size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s called with size 0\n", __func__);
- return 0;
- }
- GNUNET_assert (ack_msg->header.header.size <= size);
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s called with size 0\n", __func__);
+ return 0;
+ }
+ GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
+
+ size = ntohs (socket->ack_msg->header.header.size);
+ memcpy (buf, socket->ack_msg, size);
- size = ack_msg->header.header.size;
- memcpy (buf, ack_msg, size);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
return size;
}
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receiver_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket);
+
+/**
+ * Task for retransmitting data messages if they aren't ACK before their ack
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+retransmission_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ write_data (socket);
+}
+
/**
* Task for sending ACK message
@@ -530,12 +690,10 @@ ack_task (void *cls,
struct GNUNET_STREAM_AckMessage *ack_msg;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
- {
- return;
- }
-
- socket->ack_task_id = 0;
-
+ {
+ return;
+ }
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->header.header.size = htons (sizeof (struct
@@ -545,18 +703,61 @@ ack_task (void *cls,
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
-
+ socket->ack_msg = ack_msg;
/* Request MESH for sending ACK */
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (ack_msg->header.header.size),
- &send_ack_notify,
- ack_msg);
+ socket->ack_transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (ack_msg->header.header.size),
+ &send_ack_notify,
+ socket);
+}
-
+
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+ struct GNUNET_STREAM_MessageHeader *msg;
+ struct GNUNET_STREAM_Socket *socket;
+
+ GNUNET_assert (NULL != shutdown_handle);
+ socket = shutdown_handle->socket;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (shutdown_handle->operation)
+ {
+ case SHUT_RDWR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ break;
+ case SHUT_RD:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ break;
+ case SHUT_WR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ break;
+ default:
+ GNUNET_free (msg);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ return;
+ }
+ queue_message (socket, msg, NULL, NULL);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ shutdown_handle);
}
@@ -572,7 +773,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit,
int value)
{
- GNUNET_assert (bit < 64);
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
if (GNUNET_YES == value)
*bitmap |= (1LL << bit);
else
@@ -591,31 +792,14 @@ static uint8_t
ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit)
{
- GNUNET_assert (bit < 64);
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
return 0 != (*bitmap & (1LL << bit));
}
-
-/**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
- io_handle->sent_packets++;
-}
-
-
/**
* Writes data using the given socket. The amount of data written is limited by
- * the receive_window_size
+ * the receiver_window_size
*
* @param socket the socket to use
*/
@@ -623,43 +807,59 @@ static void
write_data (struct GNUNET_STREAM_Socket *socket)
{
struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
- unsigned int packet;
+ int packet; /* Although an int, should never be negative */
int ack_packet;
ack_packet = -1;
/* Find the last acknowledged packet */
- for (packet=0; packet < 64; packet++)
- {
- if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- ack_packet = packet;
- else if (NULL == io_handle->messages[packet])
- break;
- }
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ ack_packet = packet;
+ else if (NULL == io_handle->messages[packet])
+ break;
+ }
/* Resend packets which weren't ack'ed */
for (packet=0; packet < ack_packet; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
{
- if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- {
- queue_message (socket,
- &io_handle->messages[packet]->header,
- NULL,
- NULL);
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
}
+ }
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
while ( (NULL != io_handle->messages[packet]) &&
- (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
- {
- socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
- queue_message (socket,
- &io_handle->messages[packet]->header,
- &write_data_finish_cb,
- io_handle);
- packet++;
- }
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ {
+ socket->receiver_window_available -=
+ ntohs (io_handle->messages[packet]->header.header.size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ packet++;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
+ socket->retransmission_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 8),
+ &retransmission_timeout_task,
+ socket);
}
@@ -671,7 +871,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
*/
static void
call_read_processor (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
size_t read_size;
@@ -680,87 +880,95 @@ call_read_processor (void *cls,
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ if (NULL == socket->receive_buffer)
+ return;
+
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (NULL != socket->read_handle->proc);
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
- packet))
- break;
- }
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
/* We only call read processor if we have the first packet */
GNUNET_assert (0 < packet);
-
valid_read_size =
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
-
GNUNET_assert (0 != valid_read_size);
-
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
+ GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Calling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
read_size =
socket->read_handle->proc (socket->read_handle->proc_cls,
socket->status,
socket->receive_buffer + socket->copy_offset,
valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read processor read %d bytes\n",
+ GNUNET_i2s (&socket->other_peer), read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read processor completed successfully\n",
+ GNUNET_i2s (&socket->other_peer));
/* Free the read handle */
GNUNET_free (socket->read_handle);
socket->read_handle = NULL;
-
GNUNET_assert (read_size <= valid_read_size);
socket->copy_offset += read_size;
-
/* Determine upto which packet we can remove from the buffer */
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+ { packet++; break; }
if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
break;
+ }
/* If no packets can be removed we can't move the buffer */
if (0 == packet) return;
-
sequence_increase = packet;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Sequence increase after read processor completion: %u\n",
+ GNUNET_i2s (&socket->other_peer), sequence_increase);
/* Shift the data in the receive buffer */
memmove (socket->receive_buffer,
socket->receive_buffer
+ socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
-
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
-
/* Set read_sequence_number */
socket->read_sequence_number += sequence_increase;
-
/* Set read_offset */
offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
socket->read_offset += offset_increase;
-
/* Fix copy_offset */
GNUNET_assert (offset_increase <= socket->copy_offset);
socket->copy_offset -= offset_increase;
-
/* Fix relative boundaries */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
{
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
- {
- socket->receive_buffer_boundaries[packet] =
- socket->receive_buffer_boundaries[packet + sequence_increase]
- - offset_increase;
- }
- else
- socket->receive_buffer_boundaries[packet] = 0;
+ socket->receive_buffer_boundaries[packet] =
+ socket->receive_buffer_boundaries[packet + sequence_increase]
+ - offset_increase;
}
+ else
+ socket->receive_buffer_boundaries[packet] = 0;
+ }
}
@@ -772,20 +980,32 @@ call_read_processor (void *cls,
*/
static void
read_io_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (socket->read_task);
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read task timedout - Cancelling it\n",
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_assert (NULL != socket->read_handle);
-
+ proc = socket->read_handle->proc;
+ proc_cls = socket->read_handle->proc_cls;
+
GNUNET_free (socket->read_handle);
socket->read_handle = NULL;
+ /* Call the read processor to signal timeout */
+ proc (proc_cls,
+ GNUNET_STREAM_TIMEOUT,
+ NULL,
+ 0);
}
@@ -815,96 +1035,150 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
size = htons (msg->header.header.size);
if (size < sizeof (struct GNUNET_STREAM_DataMessage))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+
+ /* check if the message's sequence number is in the range we are
+ expecting */
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
- case STATE_ESTABLISHED:
- case STATE_TRANSMIT_CLOSED:
- case STATE_TRANSMIT_CLOSE_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
+
+ /* Check if we have already seen this message */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ relative_sequence_number))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring already received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
- /* check if the message's sequence number is in the range we are
- expecting */
- relative_sequence_number =
- ntohl (msg->sequence_number) - socket->read_sequence_number;
- if ( relative_sequence_number > 64)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Ignoring received message with sequence number %d",
- ntohl (msg->sequence_number));
- return GNUNET_YES;
- }
-
- /* Check if we have to allocate the buffer */
- size -= sizeof (struct GNUNET_STREAM_DataMessage);
- relative_offset = ntohl (msg->offset) - socket->read_offset;
- bytes_needed = relative_offset + size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size),
+ GNUNET_i2s (&socket->other_peer));
- if (bytes_needed > socket->receive_buffer_size)
- {
- if (bytes_needed <= RECEIVE_BUFFER_SIZE)
- {
- socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
- bytes_needed);
- socket->receive_buffer_size = bytes_needed;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cannot accommodate packet %d as buffer is full\n",
- ntohl (msg->sequence_number));
- return GNUNET_YES;
- }
- }
+ /* Check if we have to allocate the buffer */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Cannot accommodate packet %d as buffer is full\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
- /* Copy Data to buffer */
- payload = &msg[1];
- GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
- memcpy (socket->receive_buffer + relative_offset,
- payload,
- size);
- socket->receive_buffer_boundaries[relative_sequence_number] =
- relative_offset + size;
+ /* Copy Data to buffer */
+ payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
+ memcpy (socket->receive_buffer + relative_offset,
+ payload,
+ size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
- /* Modify the ACK bitmap */
- ackbitmap_modify_bit (&socket->ack_bitmap,
- relative_sequence_number,
- GNUNET_YES);
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->ack_bitmap,
+ relative_sequence_number,
+ GNUNET_YES);
- /* Start ACK sending task if one is not already present */
- if (0 == socket->ack_task_id)
- {
- socket->ack_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
- }
-
- if ((NULL != socket->read_handle) /* A read handle is waiting */
- /* There is no current read task */