diff options
author | Jeff Garzik <jeff@garzik.org> | 2011-03-14 23:17:34 -0400 |
---|---|---|
committer | Jeff Garzik <jgarzik@pobox.com> | 2011-03-14 23:17:34 -0400 |
commit | 4f7a51e9ed4accbd1be6fad38a9bbc70531cb37f (patch) | |
tree | d95aa0dfdb56f6f266537c2f76b7abc6914196b8 | |
parent | cdb4cd9c8be6c828908622efdca732eff8495787 (diff) |
Move all RPC I/O to separate thread.
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | cpu-miner.c | 287 | ||||
-rw-r--r-- | elist.h | 251 | ||||
-rw-r--r-- | miner.h | 11 | ||||
-rw-r--r-- | util.c | 140 |
5 files changed, 640 insertions, 51 deletions
diff --git a/Makefile.am b/Makefile.am index 7d25dd8..73d3b8b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -13,7 +13,7 @@ INCLUDES = $(PTHREAD_FLAGS) -fno-strict-aliasing $(JANSSON_INCLUDES) bin_PROGRAMS = minerd -minerd_SOURCES = miner.h compat.h \ +minerd_SOURCES = elist.h miner.h compat.h \ cpu-miner.c util.c \ sha256_generic.c sha256_4way.c sha256_via.c \ sha256_cryptopp.c sha256_sse2_amd64.c diff --git a/cpu-miner.c b/cpu-miner.c index fd64a0c..a78be29 100644 --- a/cpu-miner.c +++ b/cpu-miner.c @@ -4,7 +4,7 @@ * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) + * Software Foundation; either version 2 of the License, or (at your option) * any later version. See COPYING for more details. */ @@ -32,6 +32,25 @@ #define DEF_RPC_URL "http://127.0.0.1:8332/" #define DEF_RPC_USERPASS "rpcuser:rpcpass" +struct thr_info { + int id; + pthread_t pth; + struct thread_q *q; +}; + +enum workio_commands { + WC_GET_WORK, + WC_SUBMIT_WORK, +}; + +struct workio_cmd { + enum workio_commands cmd; + struct thr_info *thr; + union { + struct work *work; + } u; +}; + enum sha256_algos { ALGO_C, /* plain C */ ALGO_4WAY, /* parallel SSE2 */ @@ -70,6 +89,8 @@ static enum sha256_algos opt_algo = ALGO_C; static int opt_n_threads = 1; static char *rpc_url; static char *userpass; +static struct thr_info *thr_info; +static int work_thr_id; struct option_help { @@ -214,20 +235,21 @@ err_out: return false; } -static void submit_work(CURL *curl, struct work *work) +static bool submit_upstream_work(CURL *curl, const struct work *work) { char *hexstr = NULL; json_t *val, *res; char s[345], timestr[64]; time_t now; struct tm *tm; + bool rc = false; now = time(NULL); /* build hex string */ hexstr = bin2hex(work->data, sizeof(work->data)); if (!hexstr) { - fprintf(stderr, "submit_work OOM\n"); + fprintf(stderr, "submit_upstream_work OOM\n"); goto out; } @@ -242,7 +264,7 @@ static void submit_work(CURL *curl, struct work *work) /* issue JSON-RPC request */ val = json_rpc_call(curl, rpc_url, userpass, s); if (!val) { - fprintf(stderr, "submit_work json_rpc_call failed\n"); + fprintf(stderr, "submit_upstream_work json_rpc_call failed\n"); goto out; } @@ -256,11 +278,14 @@ static void submit_work(CURL *curl, struct work *work) json_decref(val); + rc = true; + out: free(hexstr); + return rc; } -static bool get_work(CURL *curl, struct work *work) +static bool get_upstream_work(CURL *curl, struct work *work) { static const char *rpc_req = "{\"method\": \"getwork\", \"params\": [], \"id\":0}\r\n"; @@ -278,6 +303,120 @@ static bool get_work(CURL *curl, struct work *work) return rc; } +static void workio_cmd_free(struct workio_cmd *wc) +{ + if (!wc) + return; + + switch (wc->cmd) { + case WC_SUBMIT_WORK: + free(wc->u.work); + break; + default: /* do nothing */ + break; + } + + memset(wc, 0, sizeof(*wc)); /* poison */ + free(wc); +} + +static bool workio_get_work(struct workio_cmd *wc, CURL *curl) +{ + struct work *ret_work; + int failures = 0; + + ret_work = calloc(1, sizeof(*ret_work)); + if (!ret_work) + return false; + + /* obtain new work from bitcoin via JSON-RPC */ + while (!get_upstream_work(curl, ret_work)) { + fprintf(stderr, "json_rpc_call failed, "); + + if ((opt_retries >= 0) && (++failures > opt_retries)) { + fprintf(stderr, "terminating workio thread\n"); + free(ret_work); + return false; + } + + /* pause, then restart work-request loop */ + fprintf(stderr, "retry after %d seconds\n", + opt_fail_pause); + sleep(opt_fail_pause); + } + + /* send work to requesting thread */ + if (!tq_push(wc->thr->q, ret_work)) + free(ret_work); + + return true; +} + +static bool workio_submit_work(struct workio_cmd *wc, CURL *curl) +{ + int failures = 0; + + /* submit solution to bitcoin via JSON-RPC */ + while (!submit_upstream_work(curl, wc->u.work)) { + if ((opt_retries >= 0) && (++failures > opt_retries)) { + fprintf(stderr, "...terminating workio thread\n"); + return false; + } + + /* pause, then restart work-request loop */ + fprintf(stderr, "...retry after %d seconds\n", + opt_fail_pause); + sleep(opt_fail_pause); + } + + return true; +} + +static void *workio_thread(void *userdata) +{ + struct thr_info *mythr = userdata; + CURL *curl; + bool ok = true; + + curl = curl_easy_init(); + if (!curl) { + fprintf(stderr, "CURL initialization failed\n"); + return NULL; + } + + while (ok) { + struct workio_cmd *wc; + + /* wait for workio_cmd sent to us, on our queue */ + wc = tq_pop(mythr->q, NULL); + if (!wc) { + ok = false; + break; + } + + /* process workio_cmd */ + switch (wc->cmd) { + case WC_GET_WORK: + ok = workio_get_work(wc, curl); + break; + case WC_SUBMIT_WORK: + ok = workio_submit_work(wc, curl); + break; + + default: /* should never happen */ + ok = false; + break; + } + + workio_cmd_free(wc); + } + + tq_freeze(mythr->q); + curl_easy_cleanup(curl); + + return NULL; +} + static void hashmeter(int thr_id, const struct timeval *diff, unsigned long hashes_done) { @@ -292,39 +431,82 @@ static void hashmeter(int thr_id, const struct timeval *diff, khashes / secs); } -static void *miner_thread(void *thr_id_int) +static bool get_work(struct thr_info *thr, struct work *work) { - int thr_id = (unsigned long) thr_id_int; - int failures = 0; - uint32_t max_nonce = 0xffffff; - CURL *curl; + struct workio_cmd *wc; + struct work *work_heap; - curl = curl_easy_init(); - if (!curl) { - fprintf(stderr, "CURL initialization failed\n"); - return NULL; + /* fill out work request message */ + wc = calloc(1, sizeof(*wc)); + if (!wc) + return false; + + wc->cmd = WC_GET_WORK; + wc->thr = thr; + + /* send work request to workio thread */ + if (!tq_push(thr_info[work_thr_id].q, wc)) { + workio_cmd_free(wc); + return false; } + /* wait for response, a unit of work */ + work_heap = tq_pop(thr->q, NULL); + if (!work_heap) + return false; + + /* copy returned work into storage provided by caller */ + memcpy(work, work_heap, sizeof(*work)); + free(work_heap); + + return true; +} + +static bool submit_work(struct thr_info *thr, const struct work *work_in) +{ + struct workio_cmd *wc; + + /* fill out work request message */ + wc = calloc(1, sizeof(*wc)); + if (!wc) + return false; + + wc->u.work = malloc(sizeof(*work_in)); + if (!wc->u.work) + goto err_out; + + wc->cmd = WC_SUBMIT_WORK; + wc->thr = thr; + memcpy(wc->u.work, work_in, sizeof(*work_in)); + + /* send solution to workio thread */ + if (!tq_push(thr_info[work_thr_id].q, wc)) + goto err_out; + + return true; + +err_out: + workio_cmd_free(wc); + return false; +} + +static void *miner_thread(void *userdata) +{ + struct thr_info *mythr = userdata; + int thr_id = mythr->id; + uint32_t max_nonce = 0xffffff; + while (1) { struct work work __attribute__((aligned(128))); unsigned long hashes_done; struct timeval tv_start, tv_end, diff; bool rc; - /* obtain new work from bitcoin */ - if (!get_work(curl, &work)) { - fprintf(stderr, "json_rpc_call failed, "); - - if ((opt_retries >= 0) && (++failures > opt_retries)) { - fprintf(stderr, "terminating thread\n"); - return NULL; /* exit thread */ - } - - /* pause, then restart work loop */ - fprintf(stderr, "retry after %d seconds\n", - opt_fail_pause); - sleep(opt_fail_pause); - continue; + /* obtain new work from internal workio thread */ + if (!get_work(mythr, &work)) { + fprintf(stderr, "work retrieval failed, exiting " + "mining thread %d\n", mythr->id); + goto out; } hashes_done = 0; @@ -347,7 +529,7 @@ static void *miner_thread(void *thr_id_int) max_nonce, &hashes_done); rc = (rc5 == -1) ? false : true; } - break; + break; #endif #ifdef WANT_SSE2_4WAY @@ -384,7 +566,7 @@ static void *miner_thread(void *thr_id_int) default: /* should never happen */ - return NULL; + goto out; } /* record scanhash elapsed time */ @@ -404,13 +586,12 @@ static void *miner_thread(void *thr_id_int) max_nonce += 100000; /* small increase */ /* if nonce found, submit work */ - if (rc) - submit_work(curl, &work); - - failures = 0; + if (rc && !submit_work(mythr, &work)) + break; } - curl_easy_cleanup(curl); +out: + tq_freeze(mythr->q); return NULL; } @@ -564,8 +745,8 @@ static void parse_cmdline(int argc, char *argv[]) int main (int argc, char *argv[]) { + struct thr_info *thr; int i; - pthread_t *t_all; rpc_url = strdup(DEF_RPC_URL); userpass = strdup(DEF_RPC_USERPASS); @@ -577,14 +758,33 @@ int main (int argc, char *argv[]) if (setpriority(PRIO_PROCESS, 0, 19)) perror("setpriority"); - t_all = calloc(opt_n_threads, sizeof(pthread_t)); - if (!t_all) + thr_info = calloc(opt_n_threads + 1, sizeof(*thr)); + if (!thr_info) + return 1; + + work_thr_id = opt_n_threads; + thr = &thr_info[work_thr_id]; + thr->id = opt_n_threads; + thr->q = tq_new(); + if (!thr->q) + return 1; + + /* start work I/O thread */ + if (pthread_create(&thr->pth, NULL, workio_thread, thr)) { + fprintf(stderr, "workio thread create failed\n"); return 1; + } /* start mining threads */ for (i = 0; i < opt_n_threads; i++) { - if (pthread_create(&t_all[i], NULL, miner_thread, - (void *)(unsigned long) i)) { + thr = &thr_info[i]; + + thr->id = i; + thr->q = tq_new(); + if (!thr->q) + return 1; + + if (pthread_create(&thr->pth, NULL, miner_thread, thr)) { fprintf(stderr, "thread %d create failed\n", i); return 1; } @@ -597,11 +797,10 @@ int main (int argc, char *argv[]) opt_n_threads, algo_names[opt_algo]); - /* main loop - simply wait for all threads to exit */ - for (i = 0; i < opt_n_threads; i++) - pthread_join(t_all[i], NULL); + /* main loop - simply wait for workio thread to exit */ + pthread_join(thr_info[work_thr_id].pth, NULL); - fprintf(stderr, "all threads dead, fred. exiting.\n"); + fprintf(stderr, "workio thread dead, exiting.\n"); return 0; } @@ -0,0 +1,251 @@ +#ifndef _LINUX_LIST_H +#define _LINUX_LIST_H + +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +struct list_head { + struct list_head *next, *prev; +}; + +#define LIST_HEAD_INIT(name) { &(name), &(name) } + +#define LIST_HEAD(name) \ + struct list_head name = LIST_HEAD_INIT(name) + +#define INIT_LIST_HEAD(ptr) do { \ + (ptr)->next = (ptr); (ptr)->prev = (ptr); \ +} while (0) + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head *prev, struct list_head *next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty on entry does not return true after this, the entry is in an undefined state. + */ +static inline void list_del(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + entry->next = (void *) 0; + entry->prev = (void *) 0; +} + +/** + * list_del_init - deletes entry from list and reinitialize it. + * @entry: the element to delete from the list. + */ +static inline void list_del_init(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); + INIT_LIST_HEAD(entry); +} + +/** + * list_move - delete from one list and add as another's head + * @list: the entry to move + * @head: the head that will precede our entry + */ +static inline void list_move(struct list_head *list, struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add(list, head); +} + +/** + * list_move_tail - delete from one list and add as another's tail + * @list: the entry to move + * @head: the head that will follow our entry + */ +static inline void list_move_tail(struct list_head *list, + struct list_head *head) +{ + __list_del(list->prev, list->next); + list_add_tail(list, head); +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(struct list_head *head) +{ + return head->next == head; +} + +static inline void __list_splice(struct list_head *list, + struct list_head *head) +{ + struct list_head *first = list->next; + struct list_head *last = list->prev; + struct list_head *at = head->next; + + first->prev = head; + head->next = first; + + last->next = at; + at->prev = last; +} + +/** + * list_splice - join two lists + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice(struct list_head *list, struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head); +} + +/** + * list_splice_init - join two lists and reinitialise the emptied list. + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * The list at @list is reinitialised + */ +static inline void list_splice_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head); + INIT_LIST_HEAD(list); + } +} + +/** + * list_entry - get the struct for this entry + * @ptr: the &struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_struct within the struct. + */ +#define list_entry(ptr, type, member) \ + ((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member))) + +/** + * list_for_each - iterate over a list + * @pos: the &struct list_head to use as a loop counter. + * @head: the head for your list. + */ +#define list_for_each(pos, head) \ + for (pos = (head)->next; pos != (head); \ + pos = pos->next) +/** + * list_for_each_prev - iterate over a list backwards + * @pos: the &struct list_head to use as a loop counter. + * @head: the head for your list. + */ +#define list_for_each_prev(pos, head) \ + for (pos = (head)->prev; pos != (head); \ + pos = pos->prev) + +/** + * list_for_each_safe - iterate over a list safe against removal of list entry + * @pos: the &struct list_head to use as a loop counter. + * @n: another &struct list_head to use as temporary storage + * @head: the head for your list. + */ +#define list_for_each_safe(pos, n, head) \ + for (pos = (head)->next, n = pos->next; pos != (head); \ + pos = n, n = pos->next) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop counter. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member)) + +/** + * list_for_each_entry_safe - iterate over list of given type safe against removal of list entry + * @pos: the type * to use as a loop counter. + * @n: another type * to use as temporary storage + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_safe(pos, n, head, member) \ + for (pos = list_entry((head)->next, typeof(*pos), member), \ + n = list_entry(pos->member.next, typeof(*pos), member); \ + &pos->member != (head); \ + pos = n, n = list_entry(n->member.next, typeof(*n), member)) + +/** + * list_for_each_entry_continue - iterate over list of given type + * continuing after existing point + * @pos: the type * to use as a loop counter. + * @head: the head for your list. + * @member: the name of the list_struct within the struct. + */ +#define list_for_each_entry_continue(pos, head, member) \ + for (pos = list_entry(pos->member.next, typeof(*pos), member), \ + prefetch(pos->member.next); \ + &pos->member != (head); \ + pos = list_entry(pos->member.next, typeof(*pos), member), \ + prefetch(pos->member.next)) + +#endif @@ -70,7 +70,7 @@ extern bool opt_protocol; extern const uint32_t sha256_init_state[]; extern json_t *json_rpc_call(CURL *curl, const char *url, const char *userpass, const char *rpc_req); -extern char *bin2hex(unsigned char *p, size_t len); +extern char *bin2hex(const unsigned char *p, size_t len); extern bool hex2bin(unsigned char *p, const char *hexstr, size_t len); extern unsigned int ScanHash_4WaySSE2(const unsigned char *pmidstate, @@ -109,4 +109,13 @@ timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y); extern bool fulltest(const unsigned char *hash, const unsigned char *target); +struct thread_q; + +extern struct thread_q *tq_new(void); +extern void tq_free(struct thread_q *tq); +extern bool tq_push(struct thread_q *tq, void *data); +extern void *tq_pop(struct thread_q *tq, const struct timespec *abstime); +extern void tq_freeze(struct thread_q *tq); +extern void tq_thaw(struct thread_q *tq); + #endif /* __MINER_H__ */ @@ -4,7 +4,7 @@ * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) + * Software Foundation; either version 2 of the License, or (at your option) * any later version. See COPYING for more details. */ @@ -14,9 +14,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <pthread.h> #include <jansson.h> #include <curl/curl.h> #include "miner.h" +#include "elist.h" struct data_buffer { void *buf; @@ -28,11 +30,25 @@ struct upload_buffer { size_t len; }; +struct tq_ent { + void *data; + struct list_head q_node; +}; + +struct thread_q { + struct list_head q; + + bool frozen; + + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + static void databuf_free(struct data_buffer *db) { if (!db) return; - + free(db->buf); memset(db, 0, sizeof(*db)); @@ -163,7 +179,7 @@ json_t *json_rpc_call(CURL *curl, const char *url, fprintf(stderr, "JSON-RPC call failed: %s\n", s); free(s); - + goto err_out; } @@ -179,13 +195,13 @@ err_out: return NULL; } -char *bin2hex(unsigned char *p, size_t len) +char *bin2hex(const unsigned char *p, size_t len) { int i; char *s = malloc((len * 2) + 1); if (!s) return NULL; - + for (i = 0; i < len; i++) sprintf(s + (i * 2), "%02x", (unsigned int) p[i]); @@ -296,3 +312,117 @@ bool fulltest(const unsigned char *hash, const unsigned char *target) return true; /* FIXME: return rc; */ } + +struct thread_q *tq_new(void) +{ + struct thread_q *tq; + + tq = calloc(1, sizeof(*tq)); + if (!tq) + return NULL; + + INIT_LIST_HEAD(&tq->q); + pthread_mutex_init(&tq->mutex, NULL); + pthread_cond_init(&tq->cond, NULL); + + return tq; +} + +void tq_free(struct thread_q *tq) +{ + struct tq_ent *ent, *iter; + + if (!tq) + return; + + list_for_each_entry_safe(ent, iter, &tq->q, q_node) { + list_del(&ent->q_node); + free(ent); + } + + pthread_cond_destroy(&tq->cond); + pthread_mutex_destroy(&tq->mutex); + + memset(tq, 0, sizeof(*tq)); /* poison */ + free(tq); +} + +static void tq_freezethaw(struct thread_q *tq, bool frozen) +{ + pthread_mutex_lock(&tq->mutex); + + tq->frozen = frozen; + + pthread_cond_signal(&tq->cond); + pthread_mutex_unlock(&tq->mutex); +} + +void tq_freeze(struct thread_q *tq) +{ + tq_freezethaw(tq, true); +} + +void tq_thaw(struct thread_q *tq) +{ + tq_freezethaw(tq, false); +} + +bool tq_push(struct thread_q *tq, void *data) +{ + struct tq_ent *ent; + bool rc = true; + + ent = calloc(1, sizeof(*ent)); + if (!ent) + return false; + + ent->data = data; + INIT_LIST_HEAD(&ent->q_node); + + pthread_mutex_lock(&tq->mutex); + + if (!tq->frozen) { + list_add_tail(&ent->q_node, &tq->q); + } else { + free(ent); + rc = false; + } + + pthread_cond_signal(&tq->cond); + pthread_mutex_unlock(&tq->mutex); + + return rc; +} + +void *tq_pop(struct thread_q *tq, const struct timespec *abstime) +{ + struct tq_ent *ent; + void *rval = NULL; + int rc; + + pthread_mutex_lock(&tq->mutex); + + if (!list_empty(&tq->q)) + goto pop; + + if (abstime) + rc = pthread_cond_timedwait(&tq->cond, &tq->mutex, abstime); + else + rc = pthread_cond_wait(&tq->cond, &tq->mutex); + if (rc) + goto out; + if (list_empty(&tq->q)) + goto out; + +pop: + ent = list_entry(tq->q.next, struct tq_ent, q_node); + rval = ent->data; + + list_del(&ent->q_node); + free(ent); + +out: + pthread_mutex_unlock(&tq->mutex); + return rval; +} + |