diff options
Diffstat (limited to 'src/fs/fs_api.c')
-rw-r--r-- | src/fs/fs_api.c | 134 |
1 files changed, 118 insertions, 16 deletions
diff --git a/src/fs/fs_api.c b/src/fs/fs_api.c index 1df9b2e..651c174 100644 --- a/src/fs/fs_api.c +++ b/src/fs/fs_api.c @@ -30,6 +30,15 @@ #include "fs_api.h" #include "fs_tree.h" +/** + * How many block requests can we have outstanding in parallel at a time by default? + */ +#define DEFAULT_MAX_PARALLEL_REQUESTS (1024 * 10) + +/** + * How many downloads can we have outstanding in parallel at a time by default? + */ +#define DEFAULT_MAX_PARALLEL_DOWNLOADS 16 /** * Start the given job (send signal, remove from pending queue, update @@ -99,6 +108,8 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_TIME_Absolute end_time; h->queue_job = GNUNET_SCHEDULER_NO_TASK; + restart_at = GNUNET_TIME_UNIT_FOREVER_REL; + /* first, see if we can start all the jobs */ next = h->pending_head; while (NULL != (qe = next)) { @@ -109,7 +120,7 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) continue; } if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) && - (h->active_downloads + 1 <= h->max_parallel_downloads)) + (h->active_downloads < h->max_parallel_downloads)) { start_job (qe); continue; @@ -117,7 +128,7 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } if (h->pending_head == NULL) return; /* no need to stop anything */ - restart_at = GNUNET_TIME_UNIT_FOREVER_REL; + /* then, check if we should stop some jobs */ next = h->running_head; while (NULL != (qe = next)) { @@ -125,6 +136,22 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) run_time = GNUNET_TIME_relative_multiply (h->avg_block_latency, qe->blocks * qe->start_times); + switch (qe->priority) + { + case GNUNET_FS_QUEUE_PRIORITY_PROBE: + /* run probes for at most 1s * number-of-restarts; note that + as the total runtime of a probe is limited to 2m, we don't + need to additionally limit the total time of a probe to + strictly limit its lifetime. */ + run_time = GNUNET_TIME_relative_min (run_time, + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + 1 + qe->start_times)); + break; + case GNUNET_FS_QUEUE_PRIORITY_NORMAL: + break; + default: + GNUNET_break (0); + } end_time = GNUNET_TIME_absolute_add (qe->start_time, run_time); rst = GNUNET_TIME_absolute_get_remaining (end_time); restart_at = GNUNET_TIME_relative_min (rst, restart_at); @@ -132,6 +159,18 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) continue; stop_job (qe); } + /* finally, start some more tasks if we now have empty slots */ + next = h->pending_head; + while (NULL != (qe = next)) + { + next = qe->next; + if ((qe->blocks + h->active_blocks <= h->max_parallel_requests) && + (h->active_downloads < h->max_parallel_downloads)) + { + start_job (qe); + continue; + } + } h->queue_job = GNUNET_SCHEDULER_add_delayed (restart_at, &process_job_queue, h); } @@ -145,11 +184,13 @@ process_job_queue (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param stop function to call to pause the job, or on dequeue (if the job was running) * @param cls closure for start and stop * @param blocks number of blocks this jobs uses + * @param priority how important is this download * @return queue handle */ struct GNUNET_FS_QueueEntry * GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, GNUNET_FS_QueueStart start, - GNUNET_FS_QueueStop stop, void *cls, unsigned int blocks) + GNUNET_FS_QueueStop stop, void *cls, unsigned int blocks, + enum GNUNET_FS_QueuePriority priority) { struct GNUNET_FS_QueueEntry *qe; @@ -160,6 +201,7 @@ GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h, GNUNET_FS_QueueStart start, qe->cls = cls; qe->queue_time = GNUNET_TIME_absolute_get (); qe->blocks = blocks; + qe->priority = priority; GNUNET_CONTAINER_DLL_insert_after (h->pending_head, h->pending_tail, h->pending_tail, qe); if (h->queue_job != GNUNET_SCHEDULER_NO_TASK) @@ -249,7 +291,11 @@ struct FileInfo * @param cls closure (points to the file information) * @param offset offset to read from; it is possible * that the caller might need to go backwards - * a bit at times + * a bit at times; set to UINT64_MAX to tell + * the reader that we won't be reading for a while + * (used to close the file descriptor but NOT fully + * clean up the reader's state); in this case, + * a value of '0' for max should be ignored * @param max maximum number of bytes that should be * copied to buf; readers are not allowed * to provide less data unless there is an error; @@ -266,20 +312,29 @@ GNUNET_FS_data_reader_file_ (void *cls, uint64_t offset, size_t max, void *buf, struct FileInfo *fi = cls; ssize_t ret; - if (max == 0) + if (UINT64_MAX == offset) { - if (fi->fd != NULL) + if (NULL != fi->fd) + { + GNUNET_DISK_file_close (fi->fd); + fi->fd = NULL; + } + return 0; + } + if (0 == max) + { + if (NULL != fi->fd) GNUNET_DISK_file_close (fi->fd); GNUNET_free (fi->filename); GNUNET_free (fi); return 0; } - if (fi->fd == NULL) + if (NULL == fi->fd) { fi->fd = GNUNET_DISK_file_open (fi->filename, GNUNET_DISK_OPEN_READ, GNUNET_DISK_PERM_NONE); - if (fi->fd == NULL) + if (NULL == fi->fd) { GNUNET_asprintf (emsg, _("Could not open file `%s': %s"), fi->filename, STRERROR (errno)); @@ -288,7 +343,7 @@ GNUNET_FS_data_reader_file_ (void *cls, uint64_t offset, size_t max, void *buf, } GNUNET_DISK_file_seek (fi->fd, offset, GNUNET_DISK_SEEK_SET); ret = GNUNET_DISK_file_read (fi->fd, buf, max); - if (ret == -1) + if (-1 == ret) { GNUNET_asprintf (emsg, _("Could not read file `%s': %s"), fi->filename, STRERROR (errno)); @@ -332,7 +387,11 @@ GNUNET_FS_make_file_reader_context_ (const char *filename) * @param cls closure (points to the buffer) * @param offset offset to read from; it is possible * that the caller might need to go backwards - * a bit at times + * a bit at times; set to UINT64_MAX to tell + * the reader that we won't be reading for a while + * (used to close the file descriptor but NOT fully + * clean up the reader's state); in this case, + * a value of '0' for max should be ignored * @param max maximum number of bytes that should be * copied to buf; readers are not allowed * to provide less data unless there is an error; @@ -348,6 +407,8 @@ GNUNET_FS_data_reader_copy_ (void *cls, uint64_t offset, size_t max, void *buf, { char *data = cls; + if (UINT64_MAX == offset) + return 0; if (max == 0) { GNUNET_free_non_null (data); @@ -1482,6 +1543,7 @@ void GNUNET_FS_unindex_sync_ (struct GNUNET_FS_UnindexContext *uc) { struct GNUNET_BIO_WriteHandle *wh; + char *uris; if (NULL == uc->serialization) uc->serialization = @@ -1496,10 +1558,18 @@ GNUNET_FS_unindex_sync_ (struct GNUNET_FS_UnindexContext *uc) GNUNET_break (0); goto cleanup; } + if (NULL != uc->ksk_uri) + uris = GNUNET_FS_uri_to_string (uc->ksk_uri); + else + uris = NULL; if ((GNUNET_OK != GNUNET_BIO_write_string (wh, uc->filename)) || (GNUNET_OK != GNUNET_BIO_write_int64 (wh, uc->file_size)) || (GNUNET_OK != write_start_time (wh, uc->start_time)) || (GNUNET_OK != GNUNET_BIO_write_int32 (wh, (uint32_t) uc->state)) || + (GNUNET_OK != + GNUNET_BIO_write (wh, &uc->chk, sizeof (struct ContentHashKey))) || + (GNUNET_OK != GNUNET_BIO_write_string (wh, uris)) || + (GNUNET_OK != GNUNET_BIO_write_int32 (wh, (uint32_t) uc->ksk_offset)) || ((uc->state == UNINDEX_STATE_FS_NOTIFY) && (GNUNET_OK != GNUNET_BIO_write (wh, &uc->file_id, sizeof (GNUNET_HashCode)))) || @@ -1637,9 +1707,8 @@ get_download_sync_filename (struct GNUNET_FS_DownloadContext *dc, if (dc->parent == NULL) return get_serialization_file_name (dc->h, - (dc->search != - NULL) ? - GNUNET_FS_SYNC_PATH_CHILD_DOWNLOAD : + (dc->search != NULL) ? + GNUNET_FS_SYNC_PATH_CHILD_DOWNLOAD : GNUNET_FS_SYNC_PATH_MASTER_DOWNLOAD, uni); if (dc->parent->serialization == NULL) @@ -1669,6 +1738,8 @@ GNUNET_FS_download_sync_ (struct GNUNET_FS_DownloadContext *dc) char *fn; char *dir; + if (0 != (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE)) + return; /* we don't sync probes */ if (NULL == dc->serialization) { dir = get_download_sync_filename (dc, "", ""); @@ -1920,6 +1991,7 @@ deserialize_unindex_file (void *cls, const char *filename) struct GNUNET_FS_UnindexContext *uc; struct GNUNET_FS_ProgressInfo pi; char *emsg; + char *uris; uint32_t state; uc = GNUNET_malloc (sizeof (struct GNUNET_FS_UnindexContext)); @@ -1931,15 +2003,37 @@ deserialize_unindex_file (void *cls, const char *filename) GNUNET_break (0); goto cleanup; } + uris = NULL; if ((GNUNET_OK != GNUNET_BIO_read_string (rh, "unindex-fn", &uc->filename, 10 * 1024)) || (GNUNET_OK != GNUNET_BIO_read_int64 (rh, &uc->file_size)) || (GNUNET_OK != read_start_time (rh, &uc->start_time)) || - (GNUNET_OK != GNUNET_BIO_read_int32 (rh, &state))) + (GNUNET_OK != GNUNET_BIO_read_int32 (rh, &state)) || + (GNUNET_OK != GNUNET_BIO_read (rh, "uri", &uc->chk, sizeof (struct ContentHashKey))) || + (GNUNET_OK != GNUNET_BIO_read_string (rh, "unindex-kskuri", &uris, 10 * 1024)) || + (GNUNET_OK != GNUNET_BIO_read_int32 (rh, &uc->ksk_offset)) ) { + GNUNET_free_non_null (uris); GNUNET_break (0); goto cleanup; } + if (NULL != uris) + { + uc->ksk_uri = GNUNET_FS_uri_parse (uris, &emsg); + GNUNET_free (uris); + if (NULL == uc->ksk_uri) + { + GNUNET_break (0); + goto cleanup; + } + } + if ( (uc->ksk_offset > 0) && + ( (NULL == uc->ksk_uri) || + (uc->ksk_offset > uc->ksk_uri->data.ksk.keywordCount) ) ) + { + GNUNET_break (0); + goto cleanup; + } uc->state = (enum UnindexState) state; switch (state) { @@ -1955,6 +2049,8 @@ deserialize_unindex_file (void *cls, const char *filename) } break; case UNINDEX_STATE_DS_REMOVE: + case UNINDEX_STATE_EXTRACT_KEYWORDS: + case UNINDEX_STATE_DS_REMOVE_KBLOCKS: break; case UNINDEX_STATE_COMPLETE: break; @@ -1991,6 +2087,12 @@ deserialize_unindex_file (void *cls, const char *filename) case UNINDEX_STATE_DS_REMOVE: GNUNET_FS_unindex_do_remove_ (uc); break; + case UNINDEX_STATE_EXTRACT_KEYWORDS: + GNUNET_FS_unindex_do_extract_keywords_ (uc); + break; + case UNINDEX_STATE_DS_REMOVE_KBLOCKS: + GNUNET_FS_unindex_do_remove_kblocks_ (uc); + break; case UNINDEX_STATE_COMPLETE: case UNINDEX_STATE_ERROR: /* no need to resume any operation, we were done */ @@ -2762,8 +2864,8 @@ GNUNET_FS_start (const struct GNUNET_CONFIGURATION_Handle *cfg, ret->upcb = upcb; ret->upcb_cls = upcb_cls; ret->flags = flags; - ret->max_parallel_downloads = 1; - ret->max_parallel_requests = 1; + ret->max_parallel_downloads = DEFAULT_MAX_PARALLEL_DOWNLOADS; + ret->max_parallel_requests = DEFAULT_MAX_PARALLEL_REQUESTS; ret->avg_block_latency = GNUNET_TIME_UNIT_MINUTES; /* conservative starting point */ va_start (ap, flags); while (GNUNET_FS_OPTIONS_END != (opt = va_arg (ap, enum GNUNET_FS_OPTIONS))) |