diff options
Diffstat (limited to 'src/stream/test_stream_local.c')
-rw-r--r-- | src/stream/test_stream_local.c | 272 |
1 files changed, 172 insertions, 100 deletions
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 4da1258..c9fab84 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -30,6 +30,7 @@ #include "gnunet_util_lib.h" #include "gnunet_mesh_service.h" #include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" #define VERBOSE 1 @@ -44,9 +45,14 @@ struct PeerData struct GNUNET_STREAM_Socket *socket; /** - * Peer's io handle + * Peer's io write handle */ - struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; /** * Bytes the peer has written @@ -63,14 +69,92 @@ static struct GNUNET_OS_Process *arm_pid; static struct PeerData peer1; static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config_peer1; +static struct GNUNET_CONFIGURATION_Handle *config_peer2; static GNUNET_SCHEDULER_TaskIdentifier abort_task; static GNUNET_SCHEDULER_TaskIdentifier test_task; -static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; +static int writing_success; +static int reading_success; + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + peer); + GNUNET_assert (NULL != peer->io_read_handle); +} + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + + GNUNET_assert (NULL != peer->io_write_handle); + } + /** * Shutdown nicely */ @@ -78,7 +162,10 @@ static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_STREAM_close (peer1.socket); - GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { @@ -90,8 +177,11 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); + /* Free the duplicated configuration */ + GNUNET_CONFIGURATION_destroy (config_peer1); + GNUNET_CONFIGURATION_destroy (config_peer2); GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); - GNUNET_OS_process_close (arm_pid); + GNUNET_OS_process_destroy (arm_pid); } @@ -106,10 +196,7 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_SCHEDULER_cancel (test_task); } - if (0 != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - } + result = GNUNET_SYSERR; abort_task = 0; do_shutdown (cls, tc); @@ -129,35 +216,31 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer; + struct PeerData *peer=cls; - peer = (struct PeerData *) cls; GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); + GNUNET_assert (size <= strlen (data)); peer->bytes_wrote += size; if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ { - peer->io_handle = GNUNET_STREAM_write (peer->socket, - (void *) data, - strlen(data) - peer->bytes_wrote, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + if (&peer1 == peer) /* Peer1 has finished writing; should read now */ { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL!=peer->io_handle); + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } } @@ -173,20 +256,15 @@ static void stream_open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct PeerData *peer; + struct PeerData *peer=cls; - peer = (struct PeerData *) cls; - peer->bytes_wrote = 0; + GNUNET_assert (&peer1 == peer); GNUNET_assert (socket == peer1.socket); GNUNET_assert (socket == peer->socket); - peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -206,44 +284,31 @@ input_processor (void *cls, const void *input_data, size_t size) { - struct PeerData *peer; + struct PeerData *peer = cls; - peer = (struct PeerData *) cls; - - GNUNET_assert (GNUNET_STERAM_OK == status); - GNUNET_assert (size < strlen (data)); - GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, - (const char *) input_data, - size)); + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); peer->bytes_read += size; if (peer->bytes_read < strlen (data)) { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_handle); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } else { if (&peer2 == peer) /* Peer2 has completed reading; should write */ { peer->bytes_wrote = 0; - peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *) - peer->socket, - (void *) data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else /* Peer1 has completed reading. End of tests */ { - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } return size; @@ -251,29 +316,9 @@ input_processor (void *cls, /** - * Scheduler call back; to be executed when a new stream is connected - * Called from listen connect for peer2 - */ -static void -stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL != cls); - peer2.bytes_read = 0; - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - (void *) &peer2); - GNUNET_assert (NULL != peer2.io_handle); -} - - -/** * Functions of this type are called upon new stream connection from other peers * - * @param cls the closure from GNUNET_STREAM_listen + * @param cls the PeerData of peer2 * @param socket the socket representing the stream * @param initiator the identity of the peer who wants to establish a stream * with us @@ -285,35 +330,60 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, const struct GNUNET_PeerIdentity *initiator) { + struct PeerData *peer=cls; + struct GNUNET_PeerIdentity self; + GNUNET_assert (NULL != socket); - GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ GNUNET_assert (socket != peer1.socket); + GNUNET_assert (&peer2 == peer); + + /* Get our identity */ + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, + &self)); + GNUNET_assert (0 == memcmp (&self, + initiator, + sizeof (struct GNUNET_PeerIdentity))); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer connected: %s\n", GNUNET_i2s(initiator)); - peer2.socket = socket; - read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + peer->socket = socket; + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } /** * Testing function + * + * @param cls NULL + * @param tc the task context */ static void test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GNUNET_PeerIdentity self; + test_task = GNUNET_SCHEDULER_NO_TASK; + /* Get our identity */ + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, + &self)); + + peer2_listen_socket = GNUNET_STREAM_listen (config_peer2, + 10, /* App port */ + &stream_listen_cb, + &peer2); + GNUNET_assert (NULL != peer2_listen_socket); /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ + peer1.socket = GNUNET_STREAM_open (config_peer1, + &self, /* Null for local peer? */ 10, /* App port */ &stream_open_cb, - (void *) &peer1); + &peer1, + GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer1.socket); - peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */ - &stream_listen_cb, - NULL); - GNUNET_assert (NULL != peer2_listen_socket); - } /** @@ -330,6 +400,9 @@ run (void *cls, char *const *args, const char *cfgfile, "WARNING", #endif NULL); + /* Duplicate the configuration */ + config_peer1 = GNUNET_CONFIGURATION_dup (cfg); + config_peer2 = GNUNET_CONFIGURATION_dup (cfg); arm_pid = GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm", "gnunet-service-arm", @@ -340,11 +413,10 @@ run (void *cls, char *const *args, const char *cfgfile, abort_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort, + (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg); - + + test_task = GNUNET_SCHEDULER_add_now (&test, NULL); } /** @@ -355,7 +427,7 @@ int main (int argc, char **argv) int ret; char *const argv2[] = { "test-stream-local", - "-c", "test_stream.conf", + "-c", "test_stream_local.conf", #if VERBOSE "-L", "DEBUG", #endif @@ -365,7 +437,7 @@ int main (int argc, char **argv) struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_OPTION_END }; - + ret = GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, "test-stream-local", "nohelp", options, &run, NULL); @@ -381,6 +453,6 @@ int main (int argc, char **argv) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); return 1; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); return 0; } |