diff options
author | Jeff Garzik <jeff@garzik.org> | 2011-03-14 23:17:34 -0400 |
---|---|---|
committer | Jeff Garzik <jgarzik@pobox.com> | 2011-03-14 23:17:34 -0400 |
commit | 4f7a51e9ed4accbd1be6fad38a9bbc70531cb37f (patch) | |
tree | d95aa0dfdb56f6f266537c2f76b7abc6914196b8 /util.c | |
parent | cdb4cd9c8be6c828908622efdca732eff8495787 (diff) |
Move all RPC I/O to separate thread.
Diffstat (limited to 'util.c')
-rw-r--r-- | util.c | 140 |
1 files changed, 135 insertions, 5 deletions
@@ -4,7 +4,7 @@ * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the License, or (at your option) + * Software Foundation; either version 2 of the License, or (at your option) * any later version. See COPYING for more details. */ @@ -14,9 +14,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <pthread.h> #include <jansson.h> #include <curl/curl.h> #include "miner.h" +#include "elist.h" struct data_buffer { void *buf; @@ -28,11 +30,25 @@ struct upload_buffer { size_t len; }; +struct tq_ent { + void *data; + struct list_head q_node; +}; + +struct thread_q { + struct list_head q; + + bool frozen; + + pthread_mutex_t mutex; + pthread_cond_t cond; +}; + static void databuf_free(struct data_buffer *db) { if (!db) return; - + free(db->buf); memset(db, 0, sizeof(*db)); @@ -163,7 +179,7 @@ json_t *json_rpc_call(CURL *curl, const char *url, fprintf(stderr, "JSON-RPC call failed: %s\n", s); free(s); - + goto err_out; } @@ -179,13 +195,13 @@ err_out: return NULL; } -char *bin2hex(unsigned char *p, size_t len) +char *bin2hex(const unsigned char *p, size_t len) { int i; char *s = malloc((len * 2) + 1); if (!s) return NULL; - + for (i = 0; i < len; i++) sprintf(s + (i * 2), "%02x", (unsigned int) p[i]); @@ -296,3 +312,117 @@ bool fulltest(const unsigned char *hash, const unsigned char *target) return true; /* FIXME: return rc; */ } + +struct thread_q *tq_new(void) +{ + struct thread_q *tq; + + tq = calloc(1, sizeof(*tq)); + if (!tq) + return NULL; + + INIT_LIST_HEAD(&tq->q); + pthread_mutex_init(&tq->mutex, NULL); + pthread_cond_init(&tq->cond, NULL); + + return tq; +} + +void tq_free(struct thread_q *tq) +{ + struct tq_ent *ent, *iter; + + if (!tq) + return; + + list_for_each_entry_safe(ent, iter, &tq->q, q_node) { + list_del(&ent->q_node); + free(ent); + } + + pthread_cond_destroy(&tq->cond); + pthread_mutex_destroy(&tq->mutex); + + memset(tq, 0, sizeof(*tq)); /* poison */ + free(tq); +} + +static void tq_freezethaw(struct thread_q *tq, bool frozen) +{ + pthread_mutex_lock(&tq->mutex); + + tq->frozen = frozen; + + pthread_cond_signal(&tq->cond); + pthread_mutex_unlock(&tq->mutex); +} + +void tq_freeze(struct thread_q *tq) +{ + tq_freezethaw(tq, true); +} + +void tq_thaw(struct thread_q *tq) +{ + tq_freezethaw(tq, false); +} + +bool tq_push(struct thread_q *tq, void *data) +{ + struct tq_ent *ent; + bool rc = true; + + ent = calloc(1, sizeof(*ent)); + if (!ent) + return false; + + ent->data = data; + INIT_LIST_HEAD(&ent->q_node); + + pthread_mutex_lock(&tq->mutex); + + if (!tq->frozen) { + list_add_tail(&ent->q_node, &tq->q); + } else { + free(ent); + rc = false; + } + + pthread_cond_signal(&tq->cond); + pthread_mutex_unlock(&tq->mutex); + + return rc; +} + +void *tq_pop(struct thread_q *tq, const struct timespec *abstime) +{ + struct tq_ent *ent; + void *rval = NULL; + int rc; + + pthread_mutex_lock(&tq->mutex); + + if (!list_empty(&tq->q)) + goto pop; + + if (abstime) + rc = pthread_cond_timedwait(&tq->cond, &tq->mutex, abstime); + else + rc = pthread_cond_wait(&tq->cond, &tq->mutex); + if (rc) + goto out; + if (list_empty(&tq->q)) + goto out; + +pop: + ent = list_entry(tq->q.next, struct tq_ent, q_node); + rval = ent->data; + + list_del(&ent->q_node); + free(ent); + +out: + pthread_mutex_unlock(&tq->mutex); + return rval; +} + |