aboutsummaryrefslogtreecommitdiff
path: root/src/library.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/library.js')
-rw-r--r--src/library.js75
1 files changed, 53 insertions, 22 deletions
diff --git a/src/library.js b/src/library.js
index 69ffdd2b..77d677da 100644
--- a/src/library.js
+++ b/src/library.js
@@ -6802,8 +6802,13 @@ LibraryManager.library = {
socket: function(family, type, protocol) {
var fd = Sockets.nextFd++;
assert(fd < 64); // select() assumes socket fd values are in 0..63
+ var stream = type == {{{ cDefine('SOCK_STREAM') }}};
+ if (protocol) {
+ assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if stream, must be tcp
+ }
Sockets.fds[fd] = {
- connected: false
+ connected: false,
+ stream: stream
};
return fd;
},
@@ -6832,28 +6837,49 @@ LibraryManager.library = {
var i8Temp = new Uint8Array(i32Temp.buffer);
info.inQueue = [];
+ if (!info.stream) {
+ var partialBuffer = null; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message
+ }
+
info.socket.onmessage = function(event) {
assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data!
var data = new Uint8Array(event.data); // make a typed array view on the array buffer
#if SOCKET_DEBUG
Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]);
#endif
-#if SOCKET_FORCED_MESSAGING
- var start = 0;
- while (start+4 < data.length) {
- i8Temp.set(data.subarray(start, start+4));
- var currLen = i32Temp[0];
- assert(currLen > 0);
- start += 4;
- assert(start + currLen <= data.length, [data.length, start, currLen]); // must not receive fractured messages!
- info.inQueue.push(data.subarray(start, start+currLen));
+ if (info.stream) {
+ info.inQueue.push(data);
+ } else {
+ // we added headers with message sizes, read those to find discrete messages
+ if (partialBuffer) {
+ // append to the partial buffer
+ var newBuffer = new Uint8Array(partialBuffer.length + data.length);
+ newBuffer.set(partialBuffer);
+ newBuffer.set(data, partialBuffer.length);
+ // forget the partial buffer and work on data
+ data = newBuffer;
+ partialBuffer = null;
+ }
+ var currPos = 0;
+ while (currPos+4 < data.length) {
+ i8Temp.set(data.subarray(currPos, currPos+4));
+ var currLen = i32Temp[0];
+ assert(currLen > 0);
+ if (currPos + 4 + currLen > data.length) {
+ break; // not enough data has arrived
+ }
+ currPos += 4;
#if SOCKET_DEBUG
- Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(start, start+currLen))]);
+ Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]);
#endif
- start += currLen;
+ info.inQueue.push(data.subarray(currPos, currPos+currLen));
+ currPos += currLen;
+ }
+ // If data remains, buffer it
+ if (currPos < data.length) {
+ partialBuffer = data.subarray(currPos);
+ }
}
-#else
- info.inQueue.push(data);
#endif
}
function send(data) {
@@ -6885,15 +6911,14 @@ LibraryManager.library = {
}
}
info.sender = function(data) {
-#if SOCKET_FORCED_MESSAGING
- var buffer = new Uint8Array(data.length+4);
- i32Temp[0] = data.length;
- buffer.set(i8Temp);
- buffer.set(data, 4);
- outQueue.push(buffer);
-#else
+ if (!info.stream) {
+ // add a header with the message size
+ var header = new Uint8Array(4);
+ i32Temp[0] = data.length;
+ header.set(i8Temp);
+ outQueue.push(header);
+ }
outQueue.push(new Uint8Array(data));
-#endif
trySend();
};
return 0;
@@ -7007,6 +7032,12 @@ LibraryManager.library = {
HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf);
bufferPos += currNum;
}
+ if (info.stream) {
+ // This is tcp (reliable), so if not all was read, keep it
+ if (bufferPos < bytes) {
+ info.inQueue.unshift(buffer.subArray(bufferPos));
+ }
+ }
return ret;
},