diff options
Diffstat (limited to 'src/library.js')
-rw-r--r-- | src/library.js | 75 |
1 files changed, 53 insertions, 22 deletions
diff --git a/src/library.js b/src/library.js index 69ffdd2b..77d677da 100644 --- a/src/library.js +++ b/src/library.js @@ -6802,8 +6802,13 @@ LibraryManager.library = { socket: function(family, type, protocol) { var fd = Sockets.nextFd++; assert(fd < 64); // select() assumes socket fd values are in 0..63 + var stream = type == {{{ cDefine('SOCK_STREAM') }}}; + if (protocol) { + assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if stream, must be tcp + } Sockets.fds[fd] = { - connected: false + connected: false, + stream: stream }; return fd; }, @@ -6832,28 +6837,49 @@ LibraryManager.library = { var i8Temp = new Uint8Array(i32Temp.buffer); info.inQueue = []; + if (!info.stream) { + var partialBuffer = null; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message + } + info.socket.onmessage = function(event) { assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data! var data = new Uint8Array(event.data); // make a typed array view on the array buffer #if SOCKET_DEBUG Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]); #endif -#if SOCKET_FORCED_MESSAGING - var start = 0; - while (start+4 < data.length) { - i8Temp.set(data.subarray(start, start+4)); - var currLen = i32Temp[0]; - assert(currLen > 0); - start += 4; - assert(start + currLen <= data.length, [data.length, start, currLen]); // must not receive fractured messages! - info.inQueue.push(data.subarray(start, start+currLen)); + if (info.stream) { + info.inQueue.push(data); + } else { + // we added headers with message sizes, read those to find discrete messages + if (partialBuffer) { + // append to the partial buffer + var newBuffer = new Uint8Array(partialBuffer.length + data.length); + newBuffer.set(partialBuffer); + newBuffer.set(data, partialBuffer.length); + // forget the partial buffer and work on data + data = newBuffer; + partialBuffer = null; + } + var currPos = 0; + while (currPos+4 < data.length) { + i8Temp.set(data.subarray(currPos, currPos+4)); + var currLen = i32Temp[0]; + assert(currLen > 0); + if (currPos + 4 + currLen > data.length) { + break; // not enough data has arrived + } + currPos += 4; #if SOCKET_DEBUG - Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(start, start+currLen))]); + Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]); #endif - start += currLen; + info.inQueue.push(data.subarray(currPos, currPos+currLen)); + currPos += currLen; + } + // If data remains, buffer it + if (currPos < data.length) { + partialBuffer = data.subarray(currPos); + } } -#else - info.inQueue.push(data); #endif } function send(data) { @@ -6885,15 +6911,14 @@ LibraryManager.library = { } } info.sender = function(data) { -#if SOCKET_FORCED_MESSAGING - var buffer = new Uint8Array(data.length+4); - i32Temp[0] = data.length; - buffer.set(i8Temp); - buffer.set(data, 4); - outQueue.push(buffer); -#else + if (!info.stream) { + // add a header with the message size + var header = new Uint8Array(4); + i32Temp[0] = data.length; + header.set(i8Temp); + outQueue.push(header); + } outQueue.push(new Uint8Array(data)); -#endif trySend(); }; return 0; @@ -7007,6 +7032,12 @@ LibraryManager.library = { HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf); bufferPos += currNum; } + if (info.stream) { + // This is tcp (reliable), so if not all was read, keep it + if (bufferPos < bytes) { + info.inQueue.unshift(buffer.subArray(bufferPos)); + } + } return ret; }, |