diff options
-rw-r--r-- | tools/llc/SRPCStreamer.cpp | 60 | ||||
-rw-r--r-- | tools/llc/SRPCStreamer.h | 32 |
2 files changed, 71 insertions, 21 deletions
diff --git a/tools/llc/SRPCStreamer.cpp b/tools/llc/SRPCStreamer.cpp index 3eaa7c17c6..ae70a24822 100644 --- a/tools/llc/SRPCStreamer.cpp +++ b/tools/llc/SRPCStreamer.cpp @@ -20,22 +20,46 @@ using llvm::dbgs; +const size_t QueueStreamer::queuesize_limit_; + size_t QueueStreamer::GetBytes(unsigned char *buf, size_t len) { + size_t total_copied = 0; pthread_mutex_lock(&Mutex); - while (!Done && queueSize() < len) { + while (!Done && queueSize() < len - total_copied) { + size_t size = queueSize(); DEBUG(dbgs() << "QueueStreamer::GetBytes len " << len << " size " << - queueSize() <<" << waiting\n"); + size << " << waiting\n"); + queueGet(buf + total_copied, size); + total_copied += size; + pthread_cond_signal(&Cond); pthread_cond_wait(&Cond, &Mutex); } - if (Done && queueSize() < len) len = queueSize(); - queueGet(buf, len); + // If this is the last partial chunk, adjust len such that the amount we + // fetch will be just the remaining bytes. + if (Done && queueSize() < len - total_copied) { + len = queueSize() + total_copied; + } + queueGet(buf + total_copied, len - total_copied); + pthread_cond_signal(&Cond); pthread_mutex_unlock(&Mutex); return len; } size_t QueueStreamer::PutBytes(unsigned char *buf, size_t len) { + size_t total_copied = 0; pthread_mutex_lock(&Mutex); - queuePut(buf, len); + while (capacityRemaining() < len - total_copied) { + if (Bytes.size() * 2 > queuesize_limit_) { + size_t space = capacityRemaining(); + queuePut(buf + total_copied, space); + total_copied += space; + pthread_cond_signal(&Cond); + pthread_cond_wait(&Cond, &Mutex); + } else { + queueResize(); + } + } + queuePut(buf + total_copied, len - total_copied); pthread_cond_signal(&Cond); pthread_mutex_unlock(&Mutex); return len; @@ -50,20 +74,22 @@ void QueueStreamer::SetDone() { pthread_mutex_unlock(&Mutex); } +// Double the size of the queue. Called with Mutex to protect Cons/Prod/Bytes. +void QueueStreamer::queueResize() { + int leftover = Bytes.size() - Cons; + DEBUG(dbgs() << "resizing to " << Bytes.size() * 2 << " " << leftover << " " + << Prod << " " << Cons << "\n"); + Bytes.resize(Bytes.size() * 2); + if (Cons > Prod) { + // There are unread bytes left between Cons and the previous end of the + // buffer. Move them to the new end of the buffer. + memmove(&Bytes[Bytes.size() - leftover], &Bytes[Cons], leftover); + Cons = Bytes.size() - leftover; + } +} + // Called with Mutex held to protect Cons, Prod, and Bytes void QueueStreamer::queuePut(unsigned char *buf, size_t len) { - while (capacityRemaining() < len) { - int leftover = Bytes.size() - Cons; - DEBUG(dbgs() << "resizing " << leftover << " " << Prod << " " << - Cons << "\n"); - Bytes.resize(Bytes.size() * 2); - if (Cons > Prod) { - // There are unread bytes left between Cons and the previous end of the - // buffer. Move them to the new end of the buffer. - memmove(&Bytes[Bytes.size() - leftover], &Bytes[Cons], leftover); - Cons = Bytes.size() - leftover; - } - } size_t EndSpace = std::min(len, Bytes.size() - Prod); DEBUG(dbgs() << "put, len " << len << " Endspace " << EndSpace << " p " << Prod << " c " << Cons << "\n"); diff --git a/tools/llc/SRPCStreamer.h b/tools/llc/SRPCStreamer.h index a326d9276d..4c1c6737e6 100644 --- a/tools/llc/SRPCStreamer.h +++ b/tools/llc/SRPCStreamer.h @@ -24,6 +24,10 @@ // Implements LLVM's interface for fetching data from a stream source. // Bitcode bytes from the RPC thread are placed here with PutBytes and buffered // until the bitcode reader calls GetBytes to remove them. +// The blocking behavior of GetBytes and PutBytes means that if the +// compilation happens faster than the bytes come in from the browser, the +// whole pipeline can block waiting for the RPC thread to put more bytes. + class QueueStreamer : public llvm::DataStreamer { public: QueueStreamer() : Done(false), Prod(0), Cons(0) { @@ -31,14 +35,20 @@ class QueueStreamer : public llvm::DataStreamer { pthread_cond_init(&Cond, NULL); Bytes.resize(64 * 1024); } - // Called by the compilation thread. Wait for len bytes to become available, - // and copy them into buf. If all bytes have been received and there are + + // Called by the compilation thread. Copy len bytes from the queue into + // buf. If there are less than len bytes available, copy as many as + // there are, signal the RPC thread, and block to wait for the rest. + // If all bytes have been received from the browser and there are // fewer than len bytes available, copy all remaining bytes. // Return the number of bytes copied. virtual size_t GetBytes(unsigned char *buf, size_t len); - // Called by the RPC thread. Copy len bytes from buf and wake up the - // compilation thread if it is waiting. Return the number of bytes copied. + // Called by the RPC thread. Copy len bytes from buf into the queue. + // If there is not enough space in the queue, copy as many bytes as + // will fit, signal the compilation thread, and block until there is + // enough space for the rest. + // Return the number of bytes copied. size_t PutBytes(unsigned char *buf, size_t len); // Called by the RPC thread. Signal that all bytes have been received, @@ -50,6 +60,19 @@ class QueueStreamer : public llvm::DataStreamer { bool Done; pthread_mutex_t Mutex; pthread_cond_t Cond; + // Maximum size of the queue. The limitation on the queue size means that + // if the compilation happens slower than bytes arrive from the network, + // the queue will fill up, the RPC thread will be blocked most of the time, + // the RPC thread on the browser side will be waiting for the SRPC to return, + // and the buffer on the browser side will grow unboundedly until the + // whole bitcode file arrives (which is better than having the queue on + // the untrusted side consume all that memory). + // The partial-copying behavior of GetBytes and PutBytes prevents deadlock + // even if the requested number of bytes is greater than the size limit + // (although it will of course be less efficient). + // The initial size of the queue is expected to be smaller than this, but + // if not, it will simply never be resized. + const static size_t queuesize_limit_ = 256 * 1024; // Variables and functions to manage the circular queue std::vector<unsigned char> Bytes; @@ -61,6 +84,7 @@ class QueueStreamer : public llvm::DataStreamer { size_t capacityRemaining() { return (Prod >= Cons ? Bytes.size() - (Prod - Cons) : (Cons - Prod)) - 1; } + void queueResize(); void queuePut(unsigned char *buf, size_t len); void queueGet(unsigned char *buf, size_t len); }; |