aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/llc/SRPCStreamer.cpp60
-rw-r--r--tools/llc/SRPCStreamer.h32
2 files changed, 71 insertions, 21 deletions
diff --git a/tools/llc/SRPCStreamer.cpp b/tools/llc/SRPCStreamer.cpp
index 3eaa7c17c6..ae70a24822 100644
--- a/tools/llc/SRPCStreamer.cpp
+++ b/tools/llc/SRPCStreamer.cpp
@@ -20,22 +20,46 @@
using llvm::dbgs;
+const size_t QueueStreamer::queuesize_limit_;
+
size_t QueueStreamer::GetBytes(unsigned char *buf, size_t len) {
+ size_t total_copied = 0;
pthread_mutex_lock(&Mutex);
- while (!Done && queueSize() < len) {
+ while (!Done && queueSize() < len - total_copied) {
+ size_t size = queueSize();
DEBUG(dbgs() << "QueueStreamer::GetBytes len " << len << " size " <<
- queueSize() <<" << waiting\n");
+ size << " << waiting\n");
+ queueGet(buf + total_copied, size);
+ total_copied += size;
+ pthread_cond_signal(&Cond);
pthread_cond_wait(&Cond, &Mutex);
}
- if (Done && queueSize() < len) len = queueSize();
- queueGet(buf, len);
+ // If this is the last partial chunk, adjust len such that the amount we
+ // fetch will be just the remaining bytes.
+ if (Done && queueSize() < len - total_copied) {
+ len = queueSize() + total_copied;
+ }
+ queueGet(buf + total_copied, len - total_copied);
+ pthread_cond_signal(&Cond);
pthread_mutex_unlock(&Mutex);
return len;
}
size_t QueueStreamer::PutBytes(unsigned char *buf, size_t len) {
+ size_t total_copied = 0;
pthread_mutex_lock(&Mutex);
- queuePut(buf, len);
+ while (capacityRemaining() < len - total_copied) {
+ if (Bytes.size() * 2 > queuesize_limit_) {
+ size_t space = capacityRemaining();
+ queuePut(buf + total_copied, space);
+ total_copied += space;
+ pthread_cond_signal(&Cond);
+ pthread_cond_wait(&Cond, &Mutex);
+ } else {
+ queueResize();
+ }
+ }
+ queuePut(buf + total_copied, len - total_copied);
pthread_cond_signal(&Cond);
pthread_mutex_unlock(&Mutex);
return len;
@@ -50,20 +74,22 @@ void QueueStreamer::SetDone() {
pthread_mutex_unlock(&Mutex);
}
+// Double the size of the queue. Called with Mutex to protect Cons/Prod/Bytes.
+void QueueStreamer::queueResize() {
+ int leftover = Bytes.size() - Cons;
+ DEBUG(dbgs() << "resizing to " << Bytes.size() * 2 << " " << leftover << " "
+ << Prod << " " << Cons << "\n");
+ Bytes.resize(Bytes.size() * 2);
+ if (Cons > Prod) {
+ // There are unread bytes left between Cons and the previous end of the
+ // buffer. Move them to the new end of the buffer.
+ memmove(&Bytes[Bytes.size() - leftover], &Bytes[Cons], leftover);
+ Cons = Bytes.size() - leftover;
+ }
+}
+
// Called with Mutex held to protect Cons, Prod, and Bytes
void QueueStreamer::queuePut(unsigned char *buf, size_t len) {
- while (capacityRemaining() < len) {
- int leftover = Bytes.size() - Cons;
- DEBUG(dbgs() << "resizing " << leftover << " " << Prod << " " <<
- Cons << "\n");
- Bytes.resize(Bytes.size() * 2);
- if (Cons > Prod) {
- // There are unread bytes left between Cons and the previous end of the
- // buffer. Move them to the new end of the buffer.
- memmove(&Bytes[Bytes.size() - leftover], &Bytes[Cons], leftover);
- Cons = Bytes.size() - leftover;
- }
- }
size_t EndSpace = std::min(len, Bytes.size() - Prod);
DEBUG(dbgs() << "put, len " << len << " Endspace " << EndSpace << " p " <<
Prod << " c " << Cons << "\n");
diff --git a/tools/llc/SRPCStreamer.h b/tools/llc/SRPCStreamer.h
index a326d9276d..4c1c6737e6 100644
--- a/tools/llc/SRPCStreamer.h
+++ b/tools/llc/SRPCStreamer.h
@@ -24,6 +24,10 @@
// Implements LLVM's interface for fetching data from a stream source.
// Bitcode bytes from the RPC thread are placed here with PutBytes and buffered
// until the bitcode reader calls GetBytes to remove them.
+// The blocking behavior of GetBytes and PutBytes means that if the
+// compilation happens faster than the bytes come in from the browser, the
+// whole pipeline can block waiting for the RPC thread to put more bytes.
+
class QueueStreamer : public llvm::DataStreamer {
public:
QueueStreamer() : Done(false), Prod(0), Cons(0) {
@@ -31,14 +35,20 @@ class QueueStreamer : public llvm::DataStreamer {
pthread_cond_init(&Cond, NULL);
Bytes.resize(64 * 1024);
}
- // Called by the compilation thread. Wait for len bytes to become available,
- // and copy them into buf. If all bytes have been received and there are
+
+ // Called by the compilation thread. Copy len bytes from the queue into
+ // buf. If there are less than len bytes available, copy as many as
+ // there are, signal the RPC thread, and block to wait for the rest.
+ // If all bytes have been received from the browser and there are
// fewer than len bytes available, copy all remaining bytes.
// Return the number of bytes copied.
virtual size_t GetBytes(unsigned char *buf, size_t len);
- // Called by the RPC thread. Copy len bytes from buf and wake up the
- // compilation thread if it is waiting. Return the number of bytes copied.
+ // Called by the RPC thread. Copy len bytes from buf into the queue.
+ // If there is not enough space in the queue, copy as many bytes as
+ // will fit, signal the compilation thread, and block until there is
+ // enough space for the rest.
+ // Return the number of bytes copied.
size_t PutBytes(unsigned char *buf, size_t len);
// Called by the RPC thread. Signal that all bytes have been received,
@@ -50,6 +60,19 @@ class QueueStreamer : public llvm::DataStreamer {
bool Done;
pthread_mutex_t Mutex;
pthread_cond_t Cond;
+ // Maximum size of the queue. The limitation on the queue size means that
+ // if the compilation happens slower than bytes arrive from the network,
+ // the queue will fill up, the RPC thread will be blocked most of the time,
+ // the RPC thread on the browser side will be waiting for the SRPC to return,
+ // and the buffer on the browser side will grow unboundedly until the
+ // whole bitcode file arrives (which is better than having the queue on
+ // the untrusted side consume all that memory).
+ // The partial-copying behavior of GetBytes and PutBytes prevents deadlock
+ // even if the requested number of bytes is greater than the size limit
+ // (although it will of course be less efficient).
+ // The initial size of the queue is expected to be smaller than this, but
+ // if not, it will simply never be resized.
+ const static size_t queuesize_limit_ = 256 * 1024;
// Variables and functions to manage the circular queue
std::vector<unsigned char> Bytes;
@@ -61,6 +84,7 @@ class QueueStreamer : public llvm::DataStreamer {
size_t capacityRemaining() {
return (Prod >= Cons ? Bytes.size() - (Prod - Cons) : (Cons - Prod)) - 1;
}
+ void queueResize();
void queuePut(unsigned char *buf, size_t len);
void queueGet(unsigned char *buf, size_t len);
};