diff options
-rw-r--r-- | src/library.js | 75 | ||||
-rw-r--r-- | src/settings.js | 10 | ||||
-rwxr-xr-x | tests/runner.py | 6 | ||||
-rw-r--r-- | tests/websockets_bi.c | 4 | ||||
-rw-r--r-- | tests/websockets_bi_side.c | 4 |
5 files changed, 67 insertions, 32 deletions
diff --git a/src/library.js b/src/library.js index 69ffdd2b..77d677da 100644 --- a/src/library.js +++ b/src/library.js @@ -6802,8 +6802,13 @@ LibraryManager.library = { socket: function(family, type, protocol) { var fd = Sockets.nextFd++; assert(fd < 64); // select() assumes socket fd values are in 0..63 + var stream = type == {{{ cDefine('SOCK_STREAM') }}}; + if (protocol) { + assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if stream, must be tcp + } Sockets.fds[fd] = { - connected: false + connected: false, + stream: stream }; return fd; }, @@ -6832,28 +6837,49 @@ LibraryManager.library = { var i8Temp = new Uint8Array(i32Temp.buffer); info.inQueue = []; + if (!info.stream) { + var partialBuffer = null; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message + } + info.socket.onmessage = function(event) { assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data! var data = new Uint8Array(event.data); // make a typed array view on the array buffer #if SOCKET_DEBUG Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]); #endif -#if SOCKET_FORCED_MESSAGING - var start = 0; - while (start+4 < data.length) { - i8Temp.set(data.subarray(start, start+4)); - var currLen = i32Temp[0]; - assert(currLen > 0); - start += 4; - assert(start + currLen <= data.length, [data.length, start, currLen]); // must not receive fractured messages! - info.inQueue.push(data.subarray(start, start+currLen)); + if (info.stream) { + info.inQueue.push(data); + } else { + // we added headers with message sizes, read those to find discrete messages + if (partialBuffer) { + // append to the partial buffer + var newBuffer = new Uint8Array(partialBuffer.length + data.length); + newBuffer.set(partialBuffer); + newBuffer.set(data, partialBuffer.length); + // forget the partial buffer and work on data + data = newBuffer; + partialBuffer = null; + } + var currPos = 0; + while (currPos+4 < data.length) { + i8Temp.set(data.subarray(currPos, currPos+4)); + var currLen = i32Temp[0]; + assert(currLen > 0); + if (currPos + 4 + currLen > data.length) { + break; // not enough data has arrived + } + currPos += 4; #if SOCKET_DEBUG - Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(start, start+currLen))]); + Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]); #endif - start += currLen; + info.inQueue.push(data.subarray(currPos, currPos+currLen)); + currPos += currLen; + } + // If data remains, buffer it + if (currPos < data.length) { + partialBuffer = data.subarray(currPos); + } } -#else - info.inQueue.push(data); #endif } function send(data) { @@ -6885,15 +6911,14 @@ LibraryManager.library = { } } info.sender = function(data) { -#if SOCKET_FORCED_MESSAGING - var buffer = new Uint8Array(data.length+4); - i32Temp[0] = data.length; - buffer.set(i8Temp); - buffer.set(data, 4); - outQueue.push(buffer); -#else + if (!info.stream) { + // add a header with the message size + var header = new Uint8Array(4); + i32Temp[0] = data.length; + header.set(i8Temp); + outQueue.push(header); + } outQueue.push(new Uint8Array(data)); -#endif trySend(); }; return 0; @@ -7007,6 +7032,12 @@ LibraryManager.library = { HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf); bufferPos += currNum; } + if (info.stream) { + // This is tcp (reliable), so if not all was read, keep it + if (bufferPos < bytes) { + info.inQueue.unshift(buffer.subArray(bufferPos)); + } + } return ret; }, diff --git a/src/settings.js b/src/settings.js index ba37c715..42267428 100644 --- a/src/settings.js +++ b/src/settings.js @@ -147,12 +147,6 @@ var LIBRARY_DEBUG = 0; // Print out when we enter a library call (library*.js). var GL_DEBUG = 0; // Print out all calls into WebGL. As with LIBRARY_DEBUG, you can set a runtime // option, in this case GL.debug. var SOCKET_DEBUG = 0; // Log out socket/network data transfer. -var SOCKET_FORCED_MESSAGING = 0; // If 1, we make sure that each socket send ends up a single socket - // receive, that is, we force proper messaging (otherwise, sending - // [A] and [B] can show up on the other side as [A, B]). This will - // only work if both sides have it enabled, obviously, so it only - // makes sense for p2p or when connecting to a special server - we - // add some metadata (message size) to messages in this mode var PROFILE_MAIN_LOOP = 0; // Profile the function called in set_main_loop @@ -1197,6 +1191,8 @@ var C_DEFINES = {'SI_MESGQ': '5', '_SC_TTY_NAME_MAX': '41', 'AF_INET': '1', 'AF_INET6': '6', - 'FIONREAD': '1' + 'FIONREAD': '1', + 'SOCK_STREAM': '200', + 'IPPROTO_TCP': 1 }; diff --git a/tests/runner.py b/tests/runner.py index b26162c0..2d4146a9 100755 --- a/tests/runner.py +++ b/tests/runner.py @@ -10283,12 +10283,12 @@ elif 'browser' in str(sys.argv): return relay_server def test_zz_websockets_bi(self): - for fm in [0,1]: + for datagram in [0,1]: try: with self.WebsockHarness(8992, self.make_relay_server(8992, 8994)): with self.WebsockHarness(8994, no_server=True): - Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-s', 'SOCKET_FORCED_MESSAGING=%d' % fm]).communicate() - self.btest('websockets_bi.c', expected='2499', args=['-s', 'SOCKET_FORCED_MESSAGING=%d' % fm]) + Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-DTEST_DGRAM=%d' % datagram]).communicate() + self.btest('websockets_bi.c', expected='2499', args=['-DTEST_DGRAM=%d' % datagram]) finally: self.clean_pids() diff --git a/tests/websockets_bi.c b/tests/websockets_bi.c index 73d061d2..c2dbb7da 100644 --- a/tests/websockets_bi.c +++ b/tests/websockets_bi.c @@ -81,7 +81,11 @@ int main(void) struct sockaddr_in stSockAddr; int Res; +#if !TEST_DGRAM SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); +#else + SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); +#endif if (-1 == SocketFD) { diff --git a/tests/websockets_bi_side.c b/tests/websockets_bi_side.c index c9c50618..12b790fd 100644 --- a/tests/websockets_bi_side.c +++ b/tests/websockets_bi_side.c @@ -18,7 +18,11 @@ int main(void) { struct sockaddr_in stSockAddr; int Res; +#if !TEST_DGRAM int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); +#else + int SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP); +#endif if (-1 == SocketFD) { |