aboutsummaryrefslogtreecommitdiff
path: root/util.c
diff options
context:
space:
mode:
authorJeff Garzik <jeff@garzik.org>2011-03-14 23:17:34 -0400
committerJeff Garzik <jgarzik@pobox.com>2011-03-14 23:17:34 -0400
commit4f7a51e9ed4accbd1be6fad38a9bbc70531cb37f (patch)
treed95aa0dfdb56f6f266537c2f76b7abc6914196b8 /util.c
parentcdb4cd9c8be6c828908622efdca732eff8495787 (diff)
Move all RPC I/O to separate thread.
Diffstat (limited to 'util.c')
-rw-r--r--util.c140
1 files changed, 135 insertions, 5 deletions
diff --git a/util.c b/util.c
index af851ea..5d97403 100644
--- a/util.c
+++ b/util.c
@@ -4,7 +4,7 @@
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
- * Software Foundation; either version 2 of the License, or (at your option)
+ * Software Foundation; either version 2 of the License, or (at your option)
* any later version. See COPYING for more details.
*/
@@ -14,9 +14,11 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <pthread.h>
#include <jansson.h>
#include <curl/curl.h>
#include "miner.h"
+#include "elist.h"
struct data_buffer {
void *buf;
@@ -28,11 +30,25 @@ struct upload_buffer {
size_t len;
};
+struct tq_ent {
+ void *data;
+ struct list_head q_node;
+};
+
+struct thread_q {
+ struct list_head q;
+
+ bool frozen;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+};
+
static void databuf_free(struct data_buffer *db)
{
if (!db)
return;
-
+
free(db->buf);
memset(db, 0, sizeof(*db));
@@ -163,7 +179,7 @@ json_t *json_rpc_call(CURL *curl, const char *url,
fprintf(stderr, "JSON-RPC call failed: %s\n", s);
free(s);
-
+
goto err_out;
}
@@ -179,13 +195,13 @@ err_out:
return NULL;
}
-char *bin2hex(unsigned char *p, size_t len)
+char *bin2hex(const unsigned char *p, size_t len)
{
int i;
char *s = malloc((len * 2) + 1);
if (!s)
return NULL;
-
+
for (i = 0; i < len; i++)
sprintf(s + (i * 2), "%02x", (unsigned int) p[i]);
@@ -296,3 +312,117 @@ bool fulltest(const unsigned char *hash, const unsigned char *target)
return true; /* FIXME: return rc; */
}
+
+struct thread_q *tq_new(void)
+{
+ struct thread_q *tq;
+
+ tq = calloc(1, sizeof(*tq));
+ if (!tq)
+ return NULL;
+
+ INIT_LIST_HEAD(&tq->q);
+ pthread_mutex_init(&tq->mutex, NULL);
+ pthread_cond_init(&tq->cond, NULL);
+
+ return tq;
+}
+
+void tq_free(struct thread_q *tq)
+{
+ struct tq_ent *ent, *iter;
+
+ if (!tq)
+ return;
+
+ list_for_each_entry_safe(ent, iter, &tq->q, q_node) {
+ list_del(&ent->q_node);
+ free(ent);
+ }
+
+ pthread_cond_destroy(&tq->cond);
+ pthread_mutex_destroy(&tq->mutex);
+
+ memset(tq, 0, sizeof(*tq)); /* poison */
+ free(tq);
+}
+
+static void tq_freezethaw(struct thread_q *tq, bool frozen)
+{
+ pthread_mutex_lock(&tq->mutex);
+
+ tq->frozen = frozen;
+
+ pthread_cond_signal(&tq->cond);
+ pthread_mutex_unlock(&tq->mutex);
+}
+
+void tq_freeze(struct thread_q *tq)
+{
+ tq_freezethaw(tq, true);
+}
+
+void tq_thaw(struct thread_q *tq)
+{
+ tq_freezethaw(tq, false);
+}
+
+bool tq_push(struct thread_q *tq, void *data)
+{
+ struct tq_ent *ent;
+ bool rc = true;
+
+ ent = calloc(1, sizeof(*ent));
+ if (!ent)
+ return false;
+
+ ent->data = data;
+ INIT_LIST_HEAD(&ent->q_node);
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!tq->frozen) {
+ list_add_tail(&ent->q_node, &tq->q);
+ } else {
+ free(ent);
+ rc = false;
+ }
+
+ pthread_cond_signal(&tq->cond);
+ pthread_mutex_unlock(&tq->mutex);
+
+ return rc;
+}
+
+void *tq_pop(struct thread_q *tq, const struct timespec *abstime)
+{
+ struct tq_ent *ent;
+ void *rval = NULL;
+ int rc;
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!list_empty(&tq->q))
+ goto pop;
+
+ if (abstime)
+ rc = pthread_cond_timedwait(&tq->cond, &tq->mutex, abstime);
+ else
+ rc = pthread_cond_wait(&tq->cond, &tq->mutex);
+ if (rc)
+ goto out;
+ if (list_empty(&tq->q))
+ goto out;
+
+pop:
+ ent = list_entry(tq->q.next, struct tq_ent, q_node);
+ rval = ent->data;
+
+ list_del(&ent->q_node);
+ free(ent);
+
+out:
+ pthread_mutex_unlock(&tq->mutex);
+ return rval;
+}
+