diff options
-rw-r--r-- | src/fs/test_fs_download_persistence.c | 1 | ||||
-rw-r--r-- | src/fs/test_fs_publish_persistence.c | 1 | ||||
-rw-r--r-- | src/include/gnunet_common.h | 15 | ||||
-rw-r--r-- | src/include/gnunet_network_lib.h | 6 | ||||
-rw-r--r-- | src/include/gnunet_scheduler_lib.h | 44 | ||||
-rw-r--r-- | src/util/network.c | 6 | ||||
-rw-r--r-- | src/util/program.c | 12 | ||||
-rw-r--r-- | src/util/scheduler.c | 1535 |
8 files changed, 947 insertions, 673 deletions
diff --git a/src/fs/test_fs_download_persistence.c b/src/fs/test_fs_download_persistence.c index 76a1ea9110..8f27e82af5 100644 --- a/src/fs/test_fs_download_persistence.c +++ b/src/fs/test_fs_download_persistence.c @@ -179,7 +179,6 @@ progress_cb (void *cls, const struct GNUNET_FS_ProgressInfo *event) GNUNET_FS_DOWNLOAD_OPTION_NONE, "download", NULL); break; case GNUNET_FS_STATUS_DOWNLOAD_COMPLETED: - consider_restart (event->status); printf ("Download complete, %llu kbps.\n", (unsigned long long) (FILESIZE * 1000000LL / (1 + diff --git a/src/fs/test_fs_publish_persistence.c b/src/fs/test_fs_publish_persistence.c index be9006d423..103ca01b8b 100644 --- a/src/fs/test_fs_publish_persistence.c +++ b/src/fs/test_fs_publish_persistence.c @@ -134,7 +134,6 @@ progress_cb (void *cls, switch (event->status) { case GNUNET_FS_STATUS_PUBLISH_COMPLETED: - consider_restart (event->status); ret = event->value.publish.cctx; printf ("Publish complete, %llu kbps.\n", (unsigned long long) (FILESIZE * 1000000LL / diff --git a/src/include/gnunet_common.h b/src/include/gnunet_common.h index d7f7b76ff7..7d23e6f9b8 100644 --- a/src/include/gnunet_common.h +++ b/src/include/gnunet_common.h @@ -988,7 +988,8 @@ GNUNET_ntoh_double (double d); * arr is important since size is the number of elements and * not the size in bytes * @param size the number of elements in the existing vector (number - * of elements to copy over) + * of elements to copy over), will be updated with the new + * array size * @param tsize the target size for the resulting vector, use 0 to * free the vector (then, arr will be NULL afterwards). */ @@ -996,8 +997,16 @@ GNUNET_ntoh_double (double d); /** * @ingroup memory - * Append an element to a list (growing the - * list by one). + * Append an element to a list (growing the list by one). + * + * @param arr base-pointer of the vector, may be NULL if size is 0; + * will be updated to reflect the new address. The TYPE of + * arr is important since size is the number of elements and + * not the size in bytes + * @param size the number of elements in the existing vector (number + * of elements to copy over), will be updated with the new + * array size + * @param element the element that will be appended to the array */ #define GNUNET_array_append(arr,size,element) do { GNUNET_array_grow(arr,size,size+1); arr[size-1] = element; } while(0) diff --git a/src/include/gnunet_network_lib.h b/src/include/gnunet_network_lib.h index d9d3d90e77..9e692bbbf6 100644 --- a/src/include/gnunet_network_lib.h +++ b/src/include/gnunet_network_lib.h @@ -464,7 +464,7 @@ GNUNET_NETWORK_fdset_copy (struct GNUNET_NETWORK_FDSet *to, * @return POSIX file descriptor */ int -GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_fd (const struct GNUNET_NETWORK_Handle *desc); /** @@ -474,7 +474,7 @@ GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc); * @return POSIX file descriptor */ struct sockaddr* -GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_addr (const struct GNUNET_NETWORK_Handle *desc); /** @@ -484,7 +484,7 @@ GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc); * @return socklen_t for sockaddr */ socklen_t -GNUNET_NETWORK_get_addrlen (struct GNUNET_NETWORK_Handle *desc); +GNUNET_NETWORK_get_addrlen (const struct GNUNET_NETWORK_Handle *desc); /** diff --git a/src/include/gnunet_scheduler_lib.h b/src/include/gnunet_scheduler_lib.h index a855ab8aba..d2805a6850 100644 --- a/src/include/gnunet_scheduler_lib.h +++ b/src/include/gnunet_scheduler_lib.h @@ -152,14 +152,14 @@ struct GNUNET_SCHEDULER_FdInfo * NULL if this is about a file handle or if no network * handle was given to the scheduler originally. */ - struct GNUNET_NETWORK_Handle *fd; + const struct GNUNET_NETWORK_Handle *fd; /** * GNUnet file handle the event is about, matches @a sock, * NULL if this is about a network socket or if no network * handle was given to the scheduler originally. */ - struct GNUNET_DISK_FileHandle *fh; + const struct GNUNET_DISK_FileHandle *fh; /** * Type of the event that was generated related to @e sock. @@ -216,17 +216,18 @@ struct GNUNET_SCHEDULER_TaskContext /** * Function used by event-loop implementations to signal the scheduler - * that a particular @a task is ready due to an event of type @a et. + * that a particular @a task is ready due to an event specified in the + * et field of @a fdi. * * This function will then queue the task to notify the application * that the task is ready (with the respective priority). * * @param task the task that is ready - * @param et information about why the task is ready + * @param fdi information about the related FD */ void GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, - enum GNUNET_SCHEDULER_EventType et); + struct GNUNET_SCHEDULER_FdInfo *fdi); /** @@ -241,15 +242,16 @@ struct GNUNET_SCHEDULER_Handle; * there are tasks left to run just to give other tasks a chance as * well. If we return #GNUNET_YES, the driver should call this * function again as soon as possible, while if we return #GNUNET_NO - * it must block until the operating system has more work as the - * scheduler has no more work to do right now. + * it must block until either the operating system has more work (the + * scheduler has no more work to do right now) or the timeout set by + * the scheduler (using the set_wakeup callback) is reached. * * @param sh scheduler handle that was given to the `loop` * @return #GNUNET_OK if there are more tasks that are ready, * and thus we would like to run more (yield to avoid * blocking other activities for too long) * #GNUNET_NO if we are done running tasks (yield to block) - * #GNUNET_SYSERR on error + * #GNUNET_SYSERR on error, e.g. no tasks were ready */ int GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh); @@ -268,8 +270,11 @@ struct GNUNET_SCHEDULER_Driver void *cls; /** - * Add a @a task to be run if the conditions given - * in @a fdi are satisfied. + * Add a @a task to be run if the conditions specified in the + * et field of the given @a fdi are satisfied. The et field will + * be cleared after this call and the driver is expected to set + * the type of the actual event before passing @a fdi to + * #GNUNET_SCHEDULER_task_ready. * * @param cls closure * @param task task to add @@ -280,21 +285,21 @@ struct GNUNET_SCHEDULER_Driver int (*add)(void *cls, struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_SCHEDULER_FdInfo *fdi); + struct GNUNET_SCHEDULER_FdInfo *fdi); /** - * Delete a @a task from the set of tasks to be run. + * Delete a @a task from the set of tasks to be run. A task may + * comprise multiple FdInfo entries previously added with the add + * function. The driver is expected to delete them all. * * @param cls closure * @param task task to delete - * @param fdi conditions to watch for (must match @e add call) * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure - * (i.e. @a task or @a fdi do not match prior @e add call) + * (i.e. @a task does not match prior @e add call) */ int (*del)(void *cls, - struct GNUNET_SCHEDULER_Task *task, - const struct GNUNET_SCHEDULER_FdInfo *fdi); + struct GNUNET_SCHEDULER_Task *task); /** * Set time at which we definitively want to get a wakeup call. @@ -309,7 +314,10 @@ struct GNUNET_SCHEDULER_Driver /** * Event loop's "main" function, to be called from * #GNUNET_SCHEDULER_run_with_driver() to actually - * launch the loop. + * launch the loop. The loop should run as long as + * tasks (added by the add callback) are available + * OR the wakeup time (added by the set_wakeup + * callback) is not FOREVER. * * @param cls closure * @param sh scheduler handle to pass to @@ -359,7 +367,7 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select (void); diff --git a/src/util/network.c b/src/util/network.c index 9422886136..cf5ef3e004 100644 --- a/src/util/network.c +++ b/src/util/network.c @@ -1223,7 +1223,7 @@ GNUNET_NETWORK_fdset_copy (struct GNUNET_NETWORK_FDSet *to, * @return POSIX file descriptor */ int -GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_fd (const struct GNUNET_NETWORK_Handle *desc) { return desc->fd; } @@ -1236,7 +1236,7 @@ GNUNET_NETWORK_get_fd (struct GNUNET_NETWORK_Handle *desc) * @return sockaddr */ struct sockaddr* -GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_addr (const struct GNUNET_NETWORK_Handle *desc) { return desc->addr; } @@ -1249,7 +1249,7 @@ GNUNET_NETWORK_get_addr (struct GNUNET_NETWORK_Handle *desc) * @return socklen_t for sockaddr */ socklen_t -GNUNET_NETWORK_get_addrlen (struct GNUNET_NETWORK_Handle *desc) +GNUNET_NETWORK_get_addrlen (const struct GNUNET_NETWORK_Handle *desc) { return desc->addrlen; } diff --git a/src/util/program.c b/src/util/program.c index 92a9750f3e..2337923877 100644 --- a/src/util/program.c +++ b/src/util/program.c @@ -69,6 +69,16 @@ struct CommandContext /** + * task run when the scheduler shuts down + */ +static void +shutdown_task (void *cls) +{ + GNUNET_SPEEDUP_stop_ (); +} + + +/** * Initial task called by the scheduler for each * program. Runs the program-specific main task. */ @@ -78,6 +88,7 @@ program_main (void *cls) struct CommandContext *cc = cls; GNUNET_SPEEDUP_start_(cc->cfg); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); GNUNET_RESOLVER_connect (cc->cfg); cc->task (cc->task_cls, cc->args, cc->cfgfile, cc->cfg); } @@ -306,7 +317,6 @@ GNUNET_PROGRAM_run2 (int argc, char *const *argv, const char *binaryName, } ret = GNUNET_OK; cleanup: - GNUNET_SPEEDUP_stop_ (); GNUNET_CONFIGURATION_destroy (cfg); GNUNET_free_non_null (cc.cfgfile); GNUNET_free (cfg_fn); diff --git a/src/util/scheduler.c b/src/util/scheduler.c index 4615ecee9b..9bd7765172 100644 --- a/src/util/scheduler.c +++ b/src/util/scheduler.c @@ -89,12 +89,6 @@ struct GNUNET_SCHEDULER_Handle * @deprecated */ struct GNUNET_NETWORK_FDSet *ws; - - /** - * Driver we used for the event loop. - */ - const struct GNUNET_SCHEDULER_Driver *driver; - }; @@ -124,36 +118,40 @@ struct GNUNET_SCHEDULER_Task void *callback_cls; /** - * Handle to the scheduler's state. + * Information about which FDs are ready for this task (and why). */ - const struct GNUNET_SCHEDULER_Handle *sh; + struct GNUNET_SCHEDULER_FdInfo *fds; /** - * Set of file descriptors this task is waiting - * for for reading. Once ready, this is updated - * to reflect the set of file descriptors ready - * for operation. + * Storage location used for @e fds if we want to avoid + * a separate malloc() call in the common case that this + * task is only about a single FD. */ - struct GNUNET_NETWORK_FDSet *read_set; + struct GNUNET_SCHEDULER_FdInfo fdx; /** - * Set of file descriptors this task is waiting for for writing. - * Once ready, this is updated to reflect the set of file - * descriptors ready for operation. + * Size of the @e fds array. */ - struct GNUNET_NETWORK_FDSet *write_set; + unsigned int fds_len; /** - * Information about which FDs are ready for this task (and why). + * if this task is related to multiple FDs this array contains + * all FdInfo structs that were marked as ready by calling + * #GNUNET_SCHEDULER_task_ready */ - const struct GNUNET_SCHEDULER_FdInfo *fds; + struct GNUNET_SCHEDULER_FdInfo *ready_fds; /** - * Storage location used for @e fds if we want to avoid - * a separate malloc() call in the common case that this - * task is only about a single FD. + * Size of the @e ready_fds array */ - struct GNUNET_SCHEDULER_FdInfo fdx; + unsigned int ready_fds_len; + + /** + * Do we own the network and file handles referenced by the FdInfo + * structs in the fds array. This will only be GNUNET_YES if the + * task was created by the #GNUNET_SCHEDULER_add_select function. + */ + int own_handles; /** * Absolute timeout value for the task, or @@ -169,11 +167,6 @@ struct GNUNET_SCHEDULER_Task #endif /** - * Size of the @e fds array. - */ - unsigned int fds_len; - - /** * Why is the task ready? Set after task is added to ready queue. * Initially set to zero. All reasons that have already been * satisfied (i.e. read or write ready) will be set over time. @@ -224,11 +217,72 @@ struct GNUNET_SCHEDULER_Task int num_backtrace_strings; #endif +}; + +/** + * A struct representing an event the select driver is waiting for + */ +struct Scheduled +{ + struct Scheduled *prev; + + struct Scheduled *next; + + /** + * the task, the event is related to + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * information about the network socket / file descriptor where + * the event is expected to occur + */ + struct GNUNET_SCHEDULER_FdInfo *fdi; + + /** + * the event types (multiple event types can be ORed) the select + * driver is expected to wait for + */ + enum GNUNET_SCHEDULER_EventType et; }; /** + * Driver context used by GNUNET_SCHEDULER_run + */ +struct DriverContext +{ + /** + * the head of a DLL containing information about the events the + * select driver is waiting for + */ + struct Scheduled *scheduled_head; + + /** + * the tail of a DLL containing information about the events the + * select driver is waiting for + */ + struct Scheduled *scheduled_tail; + + /** + * the time until the select driver will wake up again (after + * calling select) + */ + struct GNUNET_TIME_Relative timeout; +}; + + +/** + * The driver used for the event loop. Will be handed over to + * the scheduler in #GNUNET_SCHEDULER_run_from_driver(), peristed + * there in this variable for later use in functions like + * #GNUNET_SCHEDULER_add_select(), #add_without_sets() and + * #GNUNET_SCHEDULER_cancel(). + */ +static const struct GNUNET_SCHEDULER_Driver *scheduler_driver; + +/** * Head of list of tasks waiting for an event. */ static struct GNUNET_SCHEDULER_Task *pending_head; @@ -330,6 +384,11 @@ static struct GNUNET_SCHEDULER_TaskContext tc; */ static void *scheduler_select_cls; +/** + * Scheduler handle used for the driver functions + */ +static struct GNUNET_SCHEDULER_Handle sh; + /** * Sets the select function to use in the scheduler (scheduler_select). @@ -364,115 +423,44 @@ check_priority (enum GNUNET_SCHEDULER_Priority p) /** - * Update all sets and timeout for select. - * - * @param rs read-set, set to all FDs we would like to read (updated) - * @param ws write-set, set to all FDs we would like to write (updated) - * @param timeout next timeout (updated) + * chooses the nearest timeout from all pending tasks, to be used + * to tell the driver the next wakeup time (using its set_wakeup + * callback) */ -static void -update_sets (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws, - struct GNUNET_TIME_Relative *timeout) +struct GNUNET_TIME_Absolute +get_timeout () { struct GNUNET_SCHEDULER_Task *pos; struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Relative to; + struct GNUNET_TIME_Absolute timeout; - now = GNUNET_TIME_absolute_get (); pos = pending_timeout_head; + now = GNUNET_TIME_absolute_get (); + timeout = GNUNET_TIME_UNIT_FOREVER_ABS; if (NULL != pos) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; + { + timeout = now; + } + else + { + timeout = pos->timeout; + } } for (pos = pending_head; NULL != pos; pos = pos->next) { - if (pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) + if (0 != pos->reason) { - to = GNUNET_TIME_absolute_get_difference (now, pos->timeout); - if (timeout->rel_value_us > to.rel_value_us) - *timeout = to; + timeout = now; + } + else if ((pos->timeout.abs_value_us != GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us) && + (timeout.abs_value_us > pos->timeout.abs_value_us)) + { + timeout = pos->timeout; } - if (-1 != pos->read_fd) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - if (-1 != pos->write_fd) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if (NULL != pos->read_set) - GNUNET_NETWORK_fdset_add (rs, pos->read_set); - if (NULL != pos->write_set) - GNUNET_NETWORK_fdset_add (ws, pos->write_set); - if (0 != pos->reason) - *timeout = GNUNET_TIME_UNIT_ZERO; - } -} - - -/** - * Check if the ready set overlaps with the set we want to have ready. - * If so, update the want set (set all FDs that are ready). If not, - * return #GNUNET_NO. - * - * @param ready set that is ready - * @param want set that we want to be ready - * @return #GNUNET_YES if there was some overlap - */ -static int -set_overlaps (const struct GNUNET_NETWORK_FDSet *ready, - struct GNUNET_NETWORK_FDSet *want) -{ - if ((NULL == want) || (NULL == ready)) - return GNUNET_NO; - if (GNUNET_NETWORK_fdset_overlap (ready, want)) - { - /* copy all over (yes, there maybe unrelated bits, - * but this should not hurt well-written clients) */ - GNUNET_NETWORK_fdset_copy (want, ready); - return GNUNET_YES; } - return GNUNET_NO; -} - - -/** - * Check if the given task is eligible to run now. - * Also set the reason why it is eligible. - * - * @param task task to check if it is ready - * @param now the current time - * @param rs set of FDs ready for reading - * @param ws set of FDs ready for writing - * @return #GNUNET_YES if we can run it, #GNUNET_NO if not. - */ -static int -is_ready (struct GNUNET_SCHEDULER_Task *task, - struct GNUNET_TIME_Absolute now, - const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - enum GNUNET_SCHEDULER_Reason reason; - - reason = task->reason; - if (now.abs_value_us >= task->timeout.abs_value_us) - reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - (((task->read_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, task->read_fd))) || - (set_overlaps (rs, task->read_set)))) - reason |= GNUNET_SCHEDULER_REASON_READ_READY; - if ((0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (((task->write_fd != -1) && - (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, task->write_fd))) - || (set_overlaps (ws, task->write_set)))) - reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; - if (0 == reason) - return GNUNET_NO; /* not ready */ - reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; - task->reason = reason; - return GNUNET_YES; + return timeout; } @@ -495,51 +483,6 @@ queue_ready_task (struct GNUNET_SCHEDULER_Task *task) /** - * Check which tasks are ready and move them - * to the respective ready queue. - * - * @param rs FDs ready for reading - * @param ws FDs ready for writing - */ -static void -check_ready (const struct GNUNET_NETWORK_FDSet *rs, - const struct GNUNET_NETWORK_FDSet *ws) -{ - struct GNUNET_SCHEDULER_Task *pos; - struct GNUNET_SCHEDULER_Task *next; - struct GNUNET_TIME_Absolute now; - - now = GNUNET_TIME_absolute_get (); - while (NULL != (pos = pending_timeout_head)) - { - if (now.abs_value_us >= pos->timeout.abs_value_us) - pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; - if (0 == pos->reason) - break; - GNUNET_CONTAINER_DLL_remove (pending_timeout_head, - pending_timeout_tail, - pos); - if (pending_timeout_last == pos) - pending_timeout_last = NULL; - queue_ready_task (pos); - } - pos = pending_head; - while (NULL != pos) - { - next = pos->next; - if (GNUNET_YES == is_ready (pos, now, rs, ws)) - { - GNUNET_CONTAINER_DLL_remove (pending_head, - pending_tail, - pos); - queue_ready_task (pos); - } - pos = next; - } -} - - -/** * Request the shutdown of a scheduler. Marks all tasks * awaiting shutdown as ready. Note that tasks * scheduled with #GNUNET_SCHEDULER_add_shutdown() AFTER this call @@ -562,25 +505,6 @@ GNUNET_SCHEDULER_shutdown () /** - * Destroy a task (release associated resources) - * - * @param t task to destroy - */ -static void -destroy_task (struct GNUNET_SCHEDULER_Task *t) -{ - if (NULL != t->read_set) - GNUNET_NETWORK_fdset_destroy (t->read_set); - if (NULL != t->write_set) - GNUNET_NETWORK_fdset_destroy (t->write_set); -#if EXECINFO - GNUNET_free (t->backtrace_strings); -#endif - GNUNET_free (t); -} - - -/** * Output stack trace of task @a t. * * @param t task to dump stack trace of @@ -589,89 +513,62 @@ static void dump_backtrace (struct GNUNET_SCHEDULER_Task *t) { #if EXECINFO - for (unsigned int i = 0; i < t->num_backtrace_strings; i++) - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p trace %u: %s\n", - t, - i, - t->backtrace_strings[i]); + unsigned int i; + + for (i = 0; i < t->num_backtrace_strings; i++) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Task %p trace %u: %s\n", + t, + i, + t->backtrace_strings[i]); #endif } /** - * Run at least one task in the highest-priority queue that is not - * empty. Keep running tasks until we are either no longer running - * "URGENT" tasks or until we have at least one "pending" task (which - * may become ready, hence we should select on it). Naturally, if - * there are no more ready tasks, we also return. + * Destroy a task (release associated resources) * - * @param rs FDs ready for reading - * @param ws FDs ready for writing + * @param t task to destroy */ static void -run_ready (struct GNUNET_NETWORK_FDSet *rs, - struct GNUNET_NETWORK_FDSet *ws) +destroy_task (struct GNUNET_SCHEDULER_Task *t) { - enum GNUNET_SCHEDULER_Priority p; - struct GNUNET_SCHEDULER_Task *pos; + unsigned int i; - max_priority_added = GNUNET_SCHEDULER_PRIORITY_KEEP; - do + LOG (GNUNET_ERROR_TYPE_DEBUG, + "destroying task %p\n", + t); + + if (GNUNET_YES == t->own_handles) { - if (0 == ready_count) - return; - GNUNET_assert (NULL == ready_head[GNUNET_SCHEDULER_PRIORITY_KEEP]); - /* yes, p>0 is correct, 0 is "KEEP" which should - * always be an empty queue (see assertion)! */ - for (p = GNUNET_SCHEDULER_PRIORITY_COUNT - 1; p > 0; p--) - { - pos = ready_head[p]; - if (NULL != pos) - break; - } - GNUNET_assert (NULL != pos); /* ready_count wrong? */ - GNUNET_CONTAINER_DLL_remove (ready_head[p], - ready_tail[p], - pos); - ready_count--; - current_priority = pos->priority; - current_lifeness = pos->lifeness; - active_task = pos; -#if PROFILE_DELAYS - if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > - DELAY_THRESHOLD.rel_value_us) + for (i = 0; i != t->fds_len; ++i) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p took %s to be scheduled\n", - pos, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), - GNUNET_YES)); + const struct GNUNET_NETWORK_Handle *fd = t->fds[i].fd; + const struct GNUNET_DISK_FileHandle *fh = t->fds[i].fh; + if (fd) + { + GNUNET_NETWORK_socket_free_memory_only_ ((struct GNUNET_NETWORK_Handle *) fd); + } + if (fh) + { + // FIXME: on WIN32 this is not enough! A function + // GNUNET_DISK_file_free_memory_only would be nice + GNUNET_free ((void *) fh); + } } -#endif - tc.reason = pos->reason; - tc.read_ready = (NULL == pos->read_set) ? rs : pos->read_set; - if ((-1 != pos->read_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY))) - GNUNET_NETWORK_fdset_set_native (rs, pos->read_fd); - tc.write_ready = (NULL == pos->write_set) ? ws : pos->write_set; - if ((-1 != pos->write_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) - GNUNET_NETWORK_fdset_set_native (ws, pos->write_fd); - if ((0 != (tc.reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (-1 != pos->write_fd) && - (!GNUNET_NETWORK_fdset_test_native (ws, pos->write_fd))) - GNUNET_assert (0); // added to ready in previous select loop! - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", - pos); - pos->callback (pos->callback_cls); - dump_backtrace (pos); - active_task = NULL; - destroy_task (pos); - tasks_run++; } - while ((NULL == pending_head) || (p >= max_priority_added)); + if (t->fds_len > 1) + { + GNUNET_array_grow (t->fds, t->fds_len, 0); + } + if (t->ready_fds_len > 0) + { + GNUNET_array_grow (t->ready_fds, t->ready_fds_len, 0); + } +#if EXECINFO + GNUNET_free (t->backtrace_strings); +#endif + GNUNET_free (t); } @@ -698,22 +595,22 @@ sighandler_pipe () #endif -/** - * Wait for a short time. - * Sleeps for @a ms ms (as that should be long enough for virtually all - * modern systems to context switch and allow another process to do - * some 'real' work). - * - * @param ms how many ms to wait - */ -static void -short_wait (unsigned int ms) -{ - struct GNUNET_TIME_Relative timeout; - - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms); - (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout); -} +///** +// * Wait for a short time. +// * Sleeps for @a ms ms (as that should be long enough for virtually all +// * modern systems to context switch and allow another process to do +// * some 'real' work). +// * +// * @param ms how many ms to wait +// */ +//static void +//short_wait (unsigned int ms) +//{ +// struct GNUNET_TIME_Relative timeout; +// +// timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ms); +// (void) GNUNET_NETWORK_socket_select (NULL, NULL, NULL, timeout); +//} /** @@ -735,35 +632,31 @@ sighandler_shutdown () } -/** - * Check if the system is still alive. Trigger shutdown if we - * have tasks, but none of them give us lifeness. - * - * @return #GNUNET_OK to continue the main loop, - * #GNUNET_NO to exit - */ -static int -check_lifeness () +void +shutdown_if_no_lifeness () { struct GNUNET_SCHEDULER_Task *t; if (ready_count > 0) - return GNUNET_OK; + return; for (t = pending_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; + if (GNUNET_YES == t->lifeness) + return; for (t = shutdown_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; + if (GNUNET_YES == t->lifeness) + return; for (t = pending_timeout_head; NULL != t; t = t->next) - if (t->lifeness == GNUNET_YES) - return GNUNET_OK; - if (NULL != shutdown_head) + if (GNUNET_YES == t->lifeness) + return; + /* No lifeness! Cancel all pending tasks the driver knows about and shutdown */ + t = pending_head; + while (NULL != t) { - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; + struct GNUNET_SCHEDULER_Task *next = t->next; + GNUNET_SCHEDULER_cancel (t); + t = next; } - return GNUNET_NO; + GNUNET_SCHEDULER_shutdown (); } @@ -785,204 +678,17 @@ void GNUNET_SCHEDULER_run (GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { - GNUNET_SCHEDULER_run_with_optional_signals(GNUNET_YES, task, task_cls); -} - -void -GNUNET_SCHEDULER_run_with_optional_signals (int install_signals, - GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) -{ - struct GNUNET_NETWORK_FDSet *rs; - struct GNUNET_NETWORK_FDSet *ws; - struct GNUNET_TIME_Relative timeout; - int ret; - struct GNUNET_SIGNAL_Context *shc_int; - struct GNUNET_SIGNAL_Context *shc_term; -#if (SIGTERM != GNUNET_TERM_SIG) - struct GNUNET_SIGNAL_Context *shc_gterm; -#endif - -#ifndef MINGW - struct GNUNET_SIGNAL_Context *shc_quit; - struct GNUNET_SIGNAL_Context *shc_hup; - struct GNUNET_SIGNAL_Context *shc_pipe; -#endif - unsigned long long last_tr; - unsigned int busy_wait_warning; - const struct GNUNET_DISK_FileHandle *pr; - char c; - - GNUNET_assert (NULL == active_task); - rs = GNUNET_NETWORK_fdset_create (); - ws = GNUNET_NETWORK_fdset_create (); - GNUNET_assert (NULL == shutdown_pipe_handle); - shutdown_pipe_handle = GNUNET_DISK_pipe (GNUNET_NO, - GNUNET_NO, - GNUNET_NO, - GNUNET_NO); - GNUNET_assert (NULL != shutdown_pipe_handle); - pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, - GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); - my_pid = getpid (); - - if (GNUNET_YES == install_signals) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Registering signal handlers\n"); - shc_int = GNUNET_SIGNAL_handler_install (SIGINT, - &sighandler_shutdown); - shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, - &sighandler_shutdown); -#if (SIGTERM != GNUNET_TERM_SIG) - shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG, - &sighandler_shutdown); -#endif -#ifndef MINGW - shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE, - &sighandler_pipe); - shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, - &sighandler_shutdown); - shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, - &sighandler_shutdown); -#endif - } - - current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT; - current_lifeness = GNUNET_YES; - GNUNET_SCHEDULER_add_with_reason_and_priority (task, - task_cls, - GNUNET_SCHEDULER_REASON_STARTUP, - GNUNET_SCHEDULER_PRIORITY_DEFAULT); - active_task = (void *) (long) -1; /* force passing of sanity check */ - GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO, - &GNUNET_OS_install_parent_control_handler, - NULL); - active_task = NULL; - last_tr = 0; - busy_wait_warning = 0; - while (GNUNET_OK == check_lifeness ()) - { - GNUNET_NETWORK_fdset_zero (rs); - GNUNET_NETWORK_fdset_zero (ws); - timeout = GNUNET_TIME_UNIT_FOREVER_REL; - update_sets (rs, ws, &timeout); - GNUNET_NETWORK_fdset_handle_set (rs, pr); - if (ready_count > 0) - { - /* no blocking, more work already ready! */ - timeout = GNUNET_TIME_UNIT_ZERO; - } - if (NULL == scheduler_select) - ret = GNUNET_NETWORK_socket_select (rs, - ws, - NULL, - timeout); - else - ret = scheduler_select (scheduler_select_cls, - rs, - ws, - NULL, - timeout); - if (ret == GNUNET_SYSERR) - { - if (errno == EINTR) - continue; - - LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); -#ifndef MINGW -#if USE_LSOF - char lsof[512]; - - snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); - (void) close (1); - (void) dup2 (2, 1); - if (0 != system (lsof)) - LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, - "system"); -#endif -#endif -#if DEBUG_FDS - struct GNUNET_SCHEDULER_Task *t; - - for (t = pending_head; NULL != t; t = t->next) - { - if (-1 != t->read_fd) - { - int flags = fcntl (t->read_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->read_fd); - dump_backtrace (t); - } - } - if (-1 != t->write_fd) - { - int flags = fcntl (t->write_fd, F_GETFD); - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - t->write_fd); - dump_backtrace (t); - } - } - } -#endif - GNUNET_assert (0); - break; - } - - if ( (0 == ret) && - (0 == timeout.rel_value_us) && - (busy_wait_warning > 16) ) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Looks like we're busy waiting...\n"); - short_wait (100); /* mitigate */ - } - check_ready (rs, ws); - run_ready (rs, ws); - if (GNUNET_NETWORK_fdset_handle_isset (rs, pr)) - { - /* consume the signal */ - GNUNET_DISK_file_read (pr, &c, sizeof (c)); - /* mark all active tasks as ready due to shutdown */ - GNUNET_SCHEDULER_shutdown (); - } - if (last_tr == tasks_run) - { - short_wait (1); - busy_wait_warning++; - } - else - { - last_tr = tasks_run; - busy_wait_warning = 0; - } - } - - if (GNUNET_YES == install_signals) - { - GNUNET_SIGNAL_handler_uninstall (shc_int); - GNUNET_SIGNAL_handler_uninstall (shc_term); -#if (SIGTERM != GNUNET_TERM_SIG) - GNUNET_SIGNAL_handler_uninstall (shc_gterm); -#endif -#ifndef MINGW - GNUNET_SIGNAL_handler_uninstall (shc_pipe); - GNUNET_SIGNAL_handler_uninstall (shc_quit); - GNUNET_SIGNAL_handler_uninstall (shc_hup); -#endif - } - - GNUNET_DISK_pipe_close (shutdown_pipe_handle); - shutdown_pipe_handle = NULL; - GNUNET_NETWORK_fdset_destroy (rs); - GNUNET_NETWORK_fdset_destroy (ws); + struct GNUNET_SCHEDULER_Driver *driver; + struct DriverContext context = {.scheduled_head = NULL, + .scheduled_tail = NULL, + .timeout = GNUNET_TIME_UNIT_FOREVER_REL}; + + driver = GNUNET_SCHEDULER_driver_select (); + driver->cls = &context; + + GNUNET_SCHEDULER_run_with_driver (driver, task, task_cls); + + GNUNET_free (driver); } @@ -1027,9 +733,164 @@ GNUNET_SCHEDULER_get_load (enum GNUNET_SCHEDULER_Priority p) } +void +init_fd_info (struct GNUNET_SCHEDULER_Task *t, + const struct GNUNET_NETWORK_Handle *const *read_nh, + unsigned int read_nh_len, + const struct GNUNET_NETWORK_Handle *const *write_nh, + unsigned int write_nh_len, + const struct GNUNET_DISK_FileHandle *const *read_fh, + unsigned int read_fh_len, + const struct GNUNET_DISK_FileHandle *const *write_fh, + unsigned int write_fh_len) +{ + // FIXME: if we have exactly two network handles / exactly two file handles + // and they are equal, we can make one FdInfo with both + // GNUNET_SCHEDULER_ET_IN and GNUNET_SCHEDULER_ET_OUT set. + struct GNUNET_SCHEDULER_FdInfo *fdi; + + t->fds_len = read_nh_len + write_nh_len + read_fh_len + write_fh_len; + if (1 == t->fds_len) + { + fdi = &t->fdx; + t->fds = fdi; + if (1 == read_nh_len) + { + fdi->fd = *read_nh; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = GNUNET_NETWORK_get_fd (*read_nh); + t->read_fd = fdi->sock; + t->write_fd = -1; + } + else if (1 == write_nh_len) + { + fdi->fd = *write_nh; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = GNUNET_NETWORK_get_fd (*write_nh); + t->read_fd = -1; + t->write_fd = fdi->sock; + } + else if (1 == read_fh_len) + { + fdi->fh = *read_fh; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = (*read_fh)->fd; // FIXME: does not work under WIN32 + t->read_fd = fdi->sock; + t->write_fd = -1; + } + else + { + fdi->fh = *write_fh; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = (*write_fh)->fd; // FIXME: does not work under WIN32 + t->read_fd = -1; + t->write_fd = fdi->sock; + } + } + else + { + fdi = GNUNET_new_array (t->fds_len, struct GNUNET_SCHEDULER_FdInfo); + t->fds = fdi; + t->read_fd = -1; + t->write_fd = -1; + unsigned int i; + for (i = 0; i != read_nh_len; ++i) + { + fdi->fd = read_nh[i]; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = GNUNET_NETWORK_get_fd (read_nh[i]); + ++fdi; + } + for (i = 0; i != write_nh_len; ++i) + { + fdi->fd = write_nh[i]; + GNUNET_assert (NULL != fdi->fd); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = GNUNET_NETWORK_get_fd (write_nh[i]); + ++fdi; + } + for (i = 0; i != read_fh_len; ++i) + { + fdi->fh = read_fh[i]; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_IN; + fdi->sock = (read_fh[i])->fd; // FIXME: does not work under WIN32 + ++fdi; + } + for (i = 0; i != write_fh_len; ++i) + { + fdi->fh = write_fh[i]; + GNUNET_assert (NULL != fdi->fh); + fdi->et = GNUNET_SCHEDULER_ET_OUT; + fdi->sock = (write_fh[i])->fd; // FIXME: does not work under WIN32 + ++fdi; + } + } +} + + +/** + * calls the given function @a func on each FdInfo related to @a t. + * Optionally updates the event type field in each FdInfo after calling + * @a func. + * + * @param t the task + * @param driver_func the function to call with each FdInfo contained in + * in @a t + * @param if_not_ready only call @a driver_func on FdInfos that are not + * ready + * @param et the event type to be set in each FdInfo after calling + * @a driver_func on it, or -1 if no updating not desired. + */ +void driver_add_multiple (struct GNUNET_SCHEDULER_Task *t, + enum GNUNET_SCHEDULER_EventType et) +{ + struct GNUNET_SCHEDULER_FdInfo *fdi; + int success = GNUNET_YES; + + for (int i = 0; i != t->fds_len; ++i) + { + fdi = &t->fds[i]; + success = scheduler_driver->add (scheduler_driver->cls, t, fdi) && success; + if (et != -1) + { + fdi->et = et; + } + } + if (GNUNET_YES != success) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not add task\n"); + } +} + + +void +shutdown_cb (void *cls) +{ + char c; + const struct GNUNET_DISK_FileHandle *pr; + + pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, + GNUNET_DISK_PIPE_END_READ); + GNUNET_assert (! GNUNET_DISK_handle_invalid (pr)); + /* consume the signal */ + GNUNET_DISK_file_read (pr, &c, sizeof (c)); + /* mark all active tasks as ready due to shutdown */ + GNUNET_SCHEDULER_shutdown (); +} + + /** * Cancel the task with the specified identifier. - * The task must not yet have run. + * The task must not yet have run. Only allowed to be called as long as the + * scheduler is running (#GNUNET_SCHEDULER_run or + * #GNUNET_SCHEDULER_run_with_driver has been called and has not returned yet). * * @param task id of the task to cancel * @return original closure of the task @@ -1038,34 +899,50 @@ void * GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) { enum GNUNET_SCHEDULER_Priority p; + int is_fd_task; void *ret; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "canceling task %p\n", + task); + + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); GNUNET_assert ( (NULL != active_task) || - (GNUNET_NO == task->lifeness) ); - if (! task->in_ready_list) + (GNUNET_NO == task->lifeness) ); + is_fd_task = (NULL != task->fds); + if (is_fd_task) { - if ( (-1 == task->read_fd) && - (-1 == task->write_fd) && - (NULL == task->read_set) && - (NULL == task->write_set) ) + int del_result = scheduler_driver->del (scheduler_driver->cls, task); + if (GNUNET_OK != del_result) { - if (GNUNET_YES == task->on_shutdown) - GNUNET_CONTAINER_DLL_remove (shutdown_head, - shutdown_tail, - task); - else - GNUNET_CONTAINER_DLL_remove (pending_timeout_head, - pending_timeout_tail, - task); - if (task == pending_timeout_last) - pending_timeout_last = NULL; + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not delete task\n"); + GNUNET_assert (0); } - else + } + if (! task->in_ready_list) + { + if (is_fd_task) { GNUNET_CONTAINER_DLL_remove (pending_head, pending_tail, task); } + else if (GNUNET_YES == task->on_shutdown) + { + GNUNET_CONTAINER_DLL_remove (shutdown_head, + shutdown_tail, + task); + } + else + { + GNUNET_CONTAINER_DLL_remove (pending_timeout_head, + pending_timeout_tail, + task); + if (pending_timeout_last == task) + pending_timeout_last = NULL; + } } else { @@ -1076,9 +953,6 @@ GNUNET_SCHEDULER_cancel (struct GNUNET_SCHEDULER_Task *task) ready_count--; } ret = task->callback_cls; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Canceling task %p\n", - task); destroy_task (task); return ret; } @@ -1099,7 +973,7 @@ init_backtrace (struct GNUNET_SCHEDULER_Task *t) = backtrace (backtrace_array, MAX_TRACE_DEPTH); t->backtrace_strings = backtrace_symbols (backtrace_array, - t->num_backtrace_strings); + t->num_backtrace_strings); dump_backtrace (t); #endif } @@ -1216,7 +1090,7 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, pending_timeout_last = t; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task: %p\n", + "Adding task %p\n", t); init_backtrace (t); return t; @@ -1236,8 +1110,8 @@ GNUNET_SCHEDULER_add_at_with_priority (struct GNUNET_TIME_Absolute at, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed_with_priority (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - GNUNET_SCHEDULER_TaskCallback task, + enum GNUNET_SCHEDULER_Priority priority, + GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { return GNUNET_SCHEDULER_add_at_with_priority (GNUNET_TIME_relative_to_absolute (delay), @@ -1305,12 +1179,12 @@ GNUNET_SCHEDULER_add_at (struct GNUNET_TIME_Absolute at, struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { return GNUNET_SCHEDULER_add_delayed_with_priority (delay, - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - task, - task_cls); + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + task, + task_cls); } @@ -1331,11 +1205,11 @@ GNUNET_SCHEDULER_add_delayed (struct GNUNET_TIME_Relative delay, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { return GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_ZERO, - task, - task_cls); + task, + task_cls); } @@ -1351,7 +1225,7 @@ GNUNET_SCHEDULER_add_now (GNUNET_SCHEDULER_TaskCallback task, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + void *task_cls) { struct GNUNET_SCHEDULER_Task *t; @@ -1368,12 +1242,12 @@ GNUNET_SCHEDULER_add_shutdown (GNUNET_SCHEDULER_TaskCallback task, t->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; t->priority = GNUNET_SCHEDULER_PRIORITY_SHUTDOWN; t->on_shutdown = GNUNET_YES; - t->lifeness = GNUNET_YES; + t->lifeness = GNUNET_NO; GNUNET_CONTAINER_DLL_insert (shutdown_head, - shutdown_tail, - t); + shutdown_tail, + t); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task: %p\n", + "Adding shutdown task %p\n", t); init_backtrace (t); return t; @@ -1407,6 +1281,33 @@ GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness, } +#if DEBUG_FDS +/** + * check a raw file descriptor and abort if it is bad (for debugging purposes) + * + * @param t the task related to the file descriptor + * @param raw_fd the raw file descriptor to check + */ +void +check_fd (struct GNUNET_SCHEDULER_Task *t, int raw_fd) +{ + if (-1 != raw_fd) + { + int flags = fcntl (raw_fd, F_GETFD); + + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + raw_fd); + init_backtrace (t); + GNUNET_assert (0); + } + } +} +#endif + + /** * Schedule a new task to be run with a specified delay or when any of * the specified file descriptor sets is ready. The delay can be used @@ -1435,9 +1336,11 @@ GNUNET_SCHEDULER_add_now_with_lifeness (int lifeness, #ifndef MINGW static struct GNUNET_SCHEDULER_Task * add_without_sets (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - int rfd, - int wfd, + enum GNUNET_SCHEDULER_Priority priority, + const struct GNUNET_NETWORK_Handle *read_nh, + const struct GNUNET_NETWORK_Handle *write_nh, + const struct GNUNET_DISK_FileHandle *read_fh, + const struct GNUNET_DISK_FileHandle *write_fh, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { @@ -1446,39 +1349,23 @@ add_without_sets (struct GNUNET_TIME_Relative delay, GNUNET_assert (NULL != active_task); GNUNET_assert (NULL != task); t = GNUNET_new (struct GNUNET_SCHEDULER_Task); + init_fd_info (t, + &read_nh, + read_nh ? 1 : 0, + &write_nh, + write_nh ? 1 : 0, + &read_fh, + read_fh ? 1 : 0, + &write_fh, + write_fh ? 1 : 0); t->callback = task; t->callback_cls = task_cls; #if DEBUG_FDS - if (-1 != rfd) - { - int flags = fcntl (rfd, F_GETFD); - - if ((flags == -1) && (errno == EBADF)) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - rfd); - init_backtrace (t); - GNUNET_assert (0); - } - } - if (-1 != wfd) - { - int flags = fcntl (wfd, F_GETFD); - - if (flags == -1 && errno == EBADF) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Got invalid file descriptor %d!\n", - wfd); - init_backtrace (t); - GNUNET_assert (0); - } - } + check_fd (t, NULL != read_nh ? GNUNET_NETWORK_get_fd (read_nh) : -1); + check_fd (t, NULL != write_nh ? GNUNET_NETWORK_get_fd (write_nh) : -1); + check_fd (t, NULL != read_fh ? read_fh->fd : -1); + check_fd (t, NULL != write_fh ? write_fh->fd : -1); #endif - t->read_fd = rfd; - GNUNET_assert (wfd >= -1); - t->write_fd = wfd; #if PROFILE_DELAYS t->start_time = GNUNET_TIME_absolute_get (); #endif @@ -1488,11 +1375,9 @@ add_without_sets (struct GNUNET_TIME_Relative delay, GNUNET_CONTAINER_DLL_insert (pending_head, pending_tail, t); + driver_add_multiple (t, GNUNET_SCHEDULER_ET_NONE); max_priority_added = GNUNET_MAX (max_priority_added, t->priority); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Adding task %p\n", - t); init_backtrace (t); return t; } @@ -1505,6 +1390,9 @@ add_without_sets (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param rfd read file-descriptor @@ -1520,8 +1408,8 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, void *task_cls) { return GNUNET_SCHEDULER_add_read_net_with_priority (delay, - GNUNET_SCHEDULER_PRIORITY_DEFAULT, - rfd, task, task_cls); + GNUNET_SCHEDULER_PRIORITY_DEFAULT, + rfd, task, task_cls); } @@ -1532,6 +1420,9 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, * socket being ready. The task will be scheduled for execution once * either the delay has expired or the socket operation is ready. It * will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority to use for the task @@ -1543,9 +1434,9 @@ GNUNET_SCHEDULER_add_read_net (struct GNUNET_TIME_Relative delay, */ struct GNUNET_SCHEDULER_Task * GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay, - enum GNUNET_SCHEDULER_Priority priority, - struct GNUNET_NETWORK_Handle *rfd, - GNUNET_SCHEDULER_TaskCallback task, + enum GNUNET_SCHEDULER_Priority priority, + struct GNUNET_NETWORK_Handle *rfd, + GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { return GNUNET_SCHEDULER_add_net_with_priority (delay, priority, @@ -1563,6 +1454,9 @@ GNUNET_SCHEDULER_add_read_net_with_priority (struct GNUNET_TIME_Relative delay, * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the priority of * the calling task. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param wfd write file-descriptor @@ -1590,6 +1484,9 @@ GNUNET_SCHEDULER_add_write_net (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority of the task @@ -1610,6 +1507,9 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); + #if MINGW struct GNUNET_NETWORK_FDSet *s; struct GNUNET_SCHEDULER_Task * ret; @@ -1625,10 +1525,13 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_NETWORK_fdset_destroy (s); return ret; #else + GNUNET_assert (on_read || on_write); GNUNET_assert (GNUNET_NETWORK_get_fd (fd) >= 0); return add_without_sets (delay, priority, - on_read ? GNUNET_NETWORK_get_fd (fd) : -1, - on_write ? GNUNET_NETWORK_get_fd (fd) : -1, + on_read ? fd : NULL, + on_write ? fd : NULL, + NULL, + NULL, task, task_cls); #endif } @@ -1640,6 +1543,9 @@ GNUNET_SCHEDULER_add_net_with_priority (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param rfd read file-descriptor @@ -1666,6 +1572,9 @@ GNUNET_SCHEDULER_add_read_file (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. It will be run with the DEFAULT priority. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param wfd write file-descriptor @@ -1692,6 +1601,9 @@ GNUNET_SCHEDULER_add_write_file (struct GNUNET_TIME_Relative delay, * used as a timeout on the socket being ready. The task will be * scheduled for execution once either the delay has expired or the * socket operation is ready. + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param delay when should this operation time out? * @param priority priority of the task @@ -1710,6 +1622,9 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, int on_read, int on_write, GNUNET_SCHEDULER_TaskCallback task, void *task_cls) { + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); + #if MINGW struct GNUNET_NETWORK_FDSet *s; struct GNUNET_SCHEDULER_Task * ret; @@ -1725,19 +1640,70 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, GNUNET_NETWORK_fdset_destroy (s); return ret; #else - int real_fd; - - GNUNET_DISK_internal_file_handle_ (fd, &real_fd, sizeof (int)); - GNUNET_assert (real_fd >= 0); - return add_without_sets ( - delay, priority, - on_read ? real_fd : -1, - on_write ? real_fd : -1, - task, task_cls); + GNUNET_assert (on_read || on_write); + GNUNET_assert (fd->fd >= 0); + return add_without_sets (delay, priority, + NULL, + NULL, + on_read ? fd : NULL, + on_write ? fd : NULL, + task, task_cls); #endif } +void +extract_handles (struct GNUNET_SCHEDULER_Task *t, + const struct GNUNET_NETWORK_FDSet *fdset, + const struct GNUNET_NETWORK_Handle ***ntarget, + unsigned int *extracted_nhandles, + const struct GNUNET_DISK_FileHandle ***ftarget, + unsigned int *extracted_fhandles) +{ + // FIXME: this implementation only works for unix, for WIN32 the file handles + // in fdset must be handled separately + const struct GNUNET_NETWORK_Handle **nhandles; + const struct GNUNET_DISK_FileHandle **fhandles; + unsigned int nhandles_len, fhandles_len; + int sock; + + nhandles = NULL; + fhandles = NULL; + nhandles_len = 0; + fhandles_len = 0; + for (sock = 0; sock != fdset->nsds; ++sock) + { + if (GNUNET_YES == GNUNET_NETWORK_fdset_test_native (fdset, sock)) + { + struct GNUNET_NETWORK_Handle *nhandle; + struct GNUNET_DISK_FileHandle *fhandle; + + nhandle = GNUNET_NETWORK_socket_box_native (sock); + if (NULL != nhandle) + { + GNUNET_array_append (nhandles, nhandles_len, nhandle); + } + else + { + fhandle = GNUNET_DISK_get_handle_from_int_fd (sock); + if (NULL != fhandle) + { + GNUNET_array_append (fhandles, fhandles_len, fhandle); + } + else + { + GNUNET_assert (0); + } + } + } + } + *ntarget = nhandles_len > 0 ? nhandles : NULL; + *ftarget = fhandles_len > 0 ? fhandles : NULL; + *extracted_nhandles = nhandles_len; + *extracted_fhandles = fhandles_len; +} + + /** * Schedule a new task to be run with a specified delay or when any of * the specified file descriptor sets is ready. The delay can be used @@ -1753,6 +1719,9 @@ GNUNET_SCHEDULER_add_file_with_priority (struct GNUNET_TIME_Relative delay, * || any-rs-ready * || any-ws-ready) ) * </code> + * Only allowed to be called as long as the scheduler is running + * (#GNUNET_SCHEDULER_run or #GNUNET_SCHEDULER_run_with_driver has been + * called and has not returned yet). * * @param prio how important is this task? * @param delay how long should we wait? @@ -1772,13 +1741,20 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, void *task_cls) { struct GNUNET_SCHEDULER_Task *t; - - if ( (NULL == rs) && - (NULL == ws) ) + const struct GNUNET_NETWORK_Handle **read_nhandles; + const struct GNUNET_NETWORK_Handle **write_nhandles; + const struct GNUNET_DISK_FileHandle **read_fhandles; + const struct GNUNET_DISK_FileHandle **write_fhandles; + unsigned int read_nhandles_len, write_nhandles_len, + read_fhandles_len, write_fhandles_len; + + if (((NULL == rs) && (NULL == ws)) || ((0 == rs->nsds) && (0 == ws->nsds))) return GNUNET_SCHEDULER_add_delayed_with_priority (delay, prio, task, task_cls); + /* scheduler must be running */ + GNUNET_assert (NULL != scheduler_driver); GNUNET_assert (NULL != active_task); GNUNET_assert (NULL != task); t = GNUNET_new (struct GNUNET_SCHEDULER_Task); @@ -1786,16 +1762,48 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, t->callback_cls = task_cls; t->read_fd = -1; t->write_fd = -1; + t->own_handles = GNUNET_YES; + read_nhandles = NULL; + write_nhandles = NULL; + read_fhandles = NULL; + write_fhandles = NULL; + read_nhandles_len = 0; + write_nhandles_len = 0; + read_fhandles_len = 0; + write_fhandles_len = 0; if (NULL != rs) { - t->read_set = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_copy (t->read_set, rs); + extract_handles (t, + rs, + &read_nhandles, + &read_nhandles_len, + &read_fhandles, + &read_fhandles_len); } if (NULL != ws) { - t->write_set = GNUNET_NETWORK_fdset_create (); - GNUNET_NETWORK_fdset_copy (t->write_set, ws); + extract_handles (t, + ws, + &write_nhandles, + &write_nhandles_len, + &write_fhandles, + &write_fhandles_len); } + init_fd_info (t, + read_nhandles, + read_nhandles_len, + write_nhandles, + write_nhandles_len, + read_fhandles, + read_fhandles_len, + write_fhandles, + write_fhandles_len); + /* free the arrays of pointers to network / file handles, the actual + * handles will be freed in destroy_task */ + GNUNET_array_grow (read_nhandles, read_nhandles_len, 0); + GNUNET_array_grow (write_nhandles, write_nhandles_len, 0); + GNUNET_array_grow (read_fhandles, read_fhandles_len, 0); + GNUNET_array_grow (write_fhandles, write_fhandles_len, 0); #if PROFILE_DELAYS t->start_time = GNUNET_TIME_absolute_get (); #endif @@ -1808,8 +1816,9 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, GNUNET_CONTAINER_DLL_insert (pending_head, pending_tail, t); + driver_add_multiple (t, GNUNET_SCHEDULER_ET_NONE); max_priority_added = GNUNET_MAX (max_priority_added, - t->priority); + t->priority); LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding task %p\n", t); @@ -1820,17 +1829,18 @@ GNUNET_SCHEDULER_add_select (enum GNUNET_SCHEDULER_Priority prio, /** * Function used by event-loop implementations to signal the scheduler - * that a particular @a task is ready due to an event of type @a et. + * that a particular @a task is ready due to an event specified in the + * et field of @a fdi. * * This function will then queue the task to notify the application * that the task is ready (with the respective priority). * - * @param task the task that is ready, NULL for wake up calls - * @param et information about why the task is ready + * @param task the task that is ready + * @param fdi information about the related FD */ void GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, - enum GNUNET_SCHEDULER_EventType et) + struct GNUNET_SCHEDULER_FdInfo *fdi) { enum GNUNET_SCHEDULER_Reason reason; struct GNUNET_TIME_Absolute now; @@ -1840,17 +1850,20 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, if (now.abs_value_us >= task->timeout.abs_value_us) reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; if ( (0 == (reason & GNUNET_SCHEDULER_REASON_READ_READY)) && - (0 != (GNUNET_SCHEDULER_ET_IN & et)) ) + (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et)) ) reason |= GNUNET_SCHEDULER_REASON_READ_READY; if ( (0 == (reason & GNUNET_SCHEDULER_REASON_WRITE_READY)) && - (0 != (GNUNET_SCHEDULER_ET_OUT & et)) ) + (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)) ) reason |= GNUNET_SCHEDULER_REASON_WRITE_READY; reason |= GNUNET_SCHEDULER_REASON_PREREQ_DONE; task->reason = reason; - task->fds = &task->fdx; - task->fdx.et = et; - task->fds_len = 1; - queue_ready_task (task); + if (GNUNET_NO == task->in_ready_list) + { + GNUNET_CONTAINER_DLL_remove (pending_head, + pending_tail, + task); + queue_ready_task (task); + } } @@ -1860,15 +1873,16 @@ GNUNET_SCHEDULER_task_ready (struct GNUNET_SCHEDULER_Task *task, * there are tasks left to run just to give other tasks a chance as * well. If we return #GNUNET_YES, the driver should call this * function again as soon as possible, while if we return #GNUNET_NO - * it must block until the operating system has more work as the - * scheduler has no more work to do right now. + * it must block until either the operating system has more work (the + * scheduler has no more work to do right now) or the timeout set by + * the scheduler (using the set_wakeup callback) is reached. * * @param sh scheduler handle that was given to the `loop` * @return #GNUNET_OK if there are more tasks that are ready, * and thus we would like to run more (yield to avoid * blocking other activities for too long) * #GNUNET_NO if we are done running tasks (yield to block) - * #GNUNET_SYSERR on error + * #GNUNET_SYSERR on error, e.g. no tasks were ready */ int GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) @@ -1892,9 +1906,27 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) pending_timeout_last = NULL; queue_ready_task (pos); } + pos = pending_head; + while (NULL != pos) + { + struct GNUNET_SCHEDULER_Task *next = pos->next; + if (now.abs_value_us >= pos->timeout.abs_value_us) + { + pos->reason |= GNUNET_SCHEDULER_REASON_TIMEOUT; + GNUNET_CONTAINER_DLL_remove (pending_head, + pending_tail, + pos); + queue_ready_task (pos); + } + pos = next; + } if (0 == ready_count) - return GNUNET_NO; + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "GNUNET_SCHEDULER_run_from_driver was called, but no tasks are ready!\n"); + return GNUNET_SYSERR; + } /* find out which task priority level we are going to process this time */ @@ -1914,49 +1946,74 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) while (NULL != (pos = ready_head[p])) { GNUNET_CONTAINER_DLL_remove (ready_head[p], - ready_tail[p], - pos); + ready_tail[p], + pos); ready_count--; current_priority = pos->priority; current_lifeness = pos->lifeness; active_task = pos; #if PROFILE_DELAYS if (GNUNET_TIME_absolute_get_duration (pos->start_time).rel_value_us > - DELAY_THRESHOLD.rel_value_us) + DELAY_THRESHOLD.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Task %p took %s to be scheduled\n", - pos, - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), - GNUNET_YES)); + "Task %p took %s to be scheduled\n", + pos, + GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pos->start_time), + GNUNET_YES)); } #endif tc.reason = pos->reason; GNUNET_NETWORK_fdset_zero (sh->rs); GNUNET_NETWORK_fdset_zero (sh->ws); + // FIXME: do we have to remove FdInfos from fds if they are not ready? tc.fds_len = pos->fds_len; tc.fds = pos->fds; - tc.read_ready = (NULL == pos->read_set) ? sh->rs : pos->read_set; - if ( (-1 != pos->read_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_READ_READY)) ) - GNUNET_NETWORK_fdset_set_native (sh->rs, - pos->read_fd); - tc.write_ready = (NULL == pos->write_set) ? sh->ws : pos->write_set; - if ((-1 != pos->write_fd) && - (0 != (pos->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))) - GNUNET_NETWORK_fdset_set_native (sh->ws, - pos->write_fd); + for (int i = 0; i != pos->fds_len; ++i) + { + struct GNUNET_SCHEDULER_FdInfo *fdi = &pos->fds[i]; + if (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et)) + { + GNUNET_NETWORK_fdset_set_native (sh->rs, + fdi->sock); + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)) + { + GNUNET_NETWORK_fdset_set_native (sh->ws, + fdi->sock); + } + } + tc.read_ready = sh->rs; + tc.write_ready = sh->ws; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Running task: %p\n", - pos); + "Running task %p\n", + pos); + GNUNET_assert (NULL != pos->callback); pos->callback (pos->callback_cls); + if (NULL != pos->fds) + { + int del_result = scheduler_driver->del (scheduler_driver->cls, pos); + if (GNUNET_OK != del_result) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "driver could not delete task\n"); + GNUNET_assert (0); + } + } active_task = NULL; dump_backtrace (pos); destroy_task (pos); tasks_run++; } + shutdown_if_no_lifeness (); if (0 == ready_count) + { + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); return GNUNET_NO; + } + scheduler_driver->set_wakeup (scheduler_driver->cls, + GNUNET_TIME_absolute_get ()); return GNUNET_OK; } @@ -1979,8 +2036,8 @@ GNUNET_SCHEDULER_run_from_driver (struct GNUNET_SCHEDULER_Handle *sh) */ int GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, - GNUNET_SCHEDULER_TaskCallback task, - void *task_cls) + GNUNET_SCHEDULER_TaskCallback task, + void *task_cls) { int ret; struct GNUNET_SIGNAL_Context *shc_int; @@ -1995,7 +2052,6 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, #endif struct GNUNET_SCHEDULER_Task tsk; const struct GNUNET_DISK_FileHandle *pr; - struct GNUNET_SCHEDULER_Handle sh; /* general set-up */ GNUNET_assert (NULL == active_task); @@ -2007,54 +2063,56 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, GNUNET_assert (NULL != shutdown_pipe_handle); pr = GNUNET_DISK_pipe_handle (shutdown_pipe_handle, GNUNET_DISK_PIPE_END_READ); - GNUNET_assert (NULL != pr); my_pid = getpid (); + scheduler_driver = driver; /* install signal handlers */ LOG (GNUNET_ERROR_TYPE_DEBUG, "Registering signal handlers\n"); shc_int = GNUNET_SIGNAL_handler_install (SIGINT, - &sighandler_shutdown); + &sighandler_shutdown); shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, - &sighandler_shutdown); + &sighandler_shutdown); #if (SIGTERM != GNUNET_TERM_SIG) shc_gterm = GNUNET_SIGNAL_handler_install (GNUNET_TERM_SIG, - &sighandler_shutdown); + &sighandler_shutdown); #endif #ifndef MINGW shc_pipe = GNUNET_SIGNAL_handler_install (SIGPIPE, - &sighandler_pipe); + &sighandler_pipe); shc_quit = GNUNET_SIGNAL_handler_install (SIGQUIT, - &sighandler_shutdown); + &sighandler_shutdown); shc_hup = GNUNET_SIGNAL_handler_install (SIGHUP, - &sighandler_shutdown); + &sighandler_shutdown); #endif /* Setup initial tasks */ current_priority = GNUNET_SCHEDULER_PRIORITY_DEFAULT; - current_lifeness = GNUNET_YES; + current_lifeness = GNUNET_NO; memset (&tsk, - 0, - sizeof (tsk)); + 0, + sizeof (tsk)); active_task = &tsk; - tsk.sh = &sh; + GNUNET_SCHEDULER_add_now (&GNUNET_OS_install_parent_control_handler, + NULL); + GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, + pr, + &shutdown_cb, + NULL); + current_lifeness = GNUNET_YES; GNUNET_SCHEDULER_add_with_reason_and_priority (task, task_cls, GNUNET_SCHEDULER_REASON_STARTUP, GNUNET_SCHEDULER_PRIORITY_DEFAULT); - GNUNET_SCHEDULER_add_now_with_lifeness (GNUNET_NO, - &GNUNET_OS_install_parent_control_handler, - NULL); active_task = NULL; - driver->set_wakeup (driver->cls, - GNUNET_TIME_absolute_get ()); - + scheduler_driver->set_wakeup (scheduler_driver->cls, + get_timeout ()); /* begin main event loop */ sh.rs = GNUNET_NETWORK_fdset_create (); sh.ws = GNUNET_NETWORK_fdset_create (); - sh.driver = driver; + GNUNET_NETWORK_fdset_handle_set (sh.rs, pr); ret = driver->loop (driver->cls, - &sh); + &sh); GNUNET_NETWORK_fdset_destroy (sh.rs); GNUNET_NETWORK_fdset_destroy (sh.ws); @@ -2071,20 +2129,211 @@ GNUNET_SCHEDULER_run_with_driver (const struct GNUNET_SCHEDULER_Driver *driver, #endif GNUNET_DISK_pipe_close (shutdown_pipe_handle); shutdown_pipe_handle = NULL; + scheduler_driver = NULL; return ret; } +int +select_add (void *cls, + struct GNUNET_SCHEDULER_Task *task, + struct GNUNET_SCHEDULER_FdInfo *fdi) +{ + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + GNUNET_assert (NULL != task); + GNUNET_assert (NULL != fdi); + GNUNET_assert (0 != (GNUNET_SCHEDULER_ET_IN & fdi->et) || + 0 != (GNUNET_SCHEDULER_ET_OUT & fdi->et)); + + if (!((NULL != fdi->fd) ^ (NULL != fdi->fh)) || (fdi->sock < 0)) + { + /* exactly one out of {fd, hf} must be != NULL and the OS handle must be valid */ + return GNUNET_SYSERR; + } + + struct Scheduled *scheduled = GNUNET_new (struct Scheduled); + scheduled->task = task; + scheduled->fdi = fdi; + scheduled->et = fdi->et; + + GNUNET_CONTAINER_DLL_insert (context->scheduled_head, + context->scheduled_tail, + scheduled); + return GNUNET_OK; +} + + +int +select_del (void *cls, + struct GNUNET_SCHEDULER_Task *task) +{ + struct DriverContext *context; + struct Scheduled *pos; + int ret; + + GNUNET_assert (NULL != cls); + + context = cls; + ret = GNUNET_SYSERR; + pos = context->scheduled_head; + while (NULL != pos) + { + struct Scheduled *next = pos->next; + if (pos->task == task) + { + GNUNET_CONTAINER_DLL_remove (context->scheduled_head, + context->scheduled_tail, + pos); + GNUNET_free (pos); + ret = GNUNET_OK; + } + pos = next; + } + return ret; +} + + +int +select_loop (void *cls, + struct GNUNET_SCHEDULER_Handle *sh) +{ + struct GNUNET_NETWORK_FDSet *rs; + struct GNUNET_NETWORK_FDSet *ws; + struct DriverContext *context; + int select_result; + int tasks_ready; + + context = cls; + GNUNET_assert (NULL != context); + rs = GNUNET_NETWORK_fdset_create (); + ws = GNUNET_NETWORK_fdset_create (); + tasks_ready = GNUNET_NO; + while (NULL != context->scheduled_head || + GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us != context->timeout.rel_value_us) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "select timeout = %s\n", + GNUNET_STRINGS_relative_time_to_string (context->timeout, GNUNET_NO)); + + GNUNET_NETWORK_fdset_zero (rs); + GNUNET_NETWORK_fdset_zero (ws); + struct Scheduled *pos; + for (pos = context->scheduled_head; NULL != pos; pos = pos->next) + { + if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et)) + { + GNUNET_NETWORK_fdset_set_native (rs, pos->fdi->sock); + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et)) + { + GNUNET_NETWORK_fdset_set_native (ws, pos->fdi->sock); + } + } + if (NULL == scheduler_select) + { + select_result = GNUNET_NETWORK_socket_select (rs, + ws, + NULL, + context->timeout); + } + else + { + select_result = scheduler_select (scheduler_select_cls, + rs, + ws, + NULL, + context->timeout); + } + if (select_result == GNUNET_SYSERR) + { + if (errno == EINTR) + continue; + + LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "select"); +#ifndef MINGW +#if USE_LSOF + char lsof[512]; + + snprintf (lsof, sizeof (lsof), "lsof -p %d", getpid ()); + (void) close (1); + (void) dup2 (2, 1); + if (0 != system (lsof)) + LOG_STRERROR (GNUNET_ERROR_TYPE_WARNING, + "system"); +#endif +#endif +#if DEBUG_FDS + struct Scheduled *s; + for (s = context->scheduled_head; NULL != s; s = s->next) + { + int flags = fcntl (s->fdi->sock, F_GETFD); + if ((flags == -1) && (errno == EBADF)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Got invalid file descriptor %d!\n", + s->fdi->sock); + } + } +#endif + GNUNET_assert (0); + return GNUNET_SYSERR; + } + for (pos = context->scheduled_head; NULL != pos; pos = pos->next) + { + int is_ready = GNUNET_NO; + if (0 != (GNUNET_SCHEDULER_ET_IN & pos->et) && + GNUNET_YES == GNUNET_NETWORK_fdset_test_native (rs, pos->fdi->sock)) + { + pos->fdi->et |= GNUNET_SCHEDULER_ET_IN; + is_ready = GNUNET_YES; + } + if (0 != (GNUNET_SCHEDULER_ET_OUT & pos->et) && + GNUNET_YES == GNUNET_NETWORK_fdset_test_native (ws, pos->fdi->sock)) + { + pos->fdi->et |= GNUNET_SCHEDULER_ET_OUT; + is_ready = GNUNET_YES; + } + if (GNUNET_YES == is_ready) + { + GNUNET_SCHEDULER_task_ready (pos->task, pos->fdi); + } + } + tasks_ready = GNUNET_SCHEDULER_run_from_driver (sh); + GNUNET_assert (GNUNET_SYSERR != tasks_ready); + } + return GNUNET_OK; +} + + +void +select_set_wakeup(void *cls, + struct GNUNET_TIME_Absolute dt) +{ + struct DriverContext *context = cls; + GNUNET_assert (NULL != context); + + context->timeout = GNUNET_TIME_absolute_get_remaining (dt); +} + + /** * Obtain the driver for using select() as the event loop. * * @return NULL on error */ -const struct GNUNET_SCHEDULER_Driver * +struct GNUNET_SCHEDULER_Driver * GNUNET_SCHEDULER_driver_select () { - GNUNET_break (0); // not implemented - return NULL; + struct GNUNET_SCHEDULER_Driver *select_driver; + select_driver = GNUNET_new (struct GNUNET_SCHEDULER_Driver); + + select_driver->loop = &select_loop; + select_driver->add = &select_add; + select_driver->del = &select_del; + select_driver->set_wakeup = &select_set_wakeup; + + return select_driver; } |