aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlon Zakai <alonzakai@gmail.com>2013-01-05 12:15:05 -0800
committerAlon Zakai <alonzakai@gmail.com>2013-01-05 12:15:05 -0800
commitd15a3b366c7ec8b0f2dfc44e7802d7cc3ebd8290 (patch)
tree9a63ae6f04db60ca6d06d4c9603bcc7c7c821fdd
parent814cd72af956fd215938e1d1bc8a9266339f63a4 (diff)
fix networking for both stream and dgram
-rw-r--r--src/library.js75
-rw-r--r--src/settings.js10
-rwxr-xr-xtests/runner.py6
-rw-r--r--tests/websockets_bi.c4
-rw-r--r--tests/websockets_bi_side.c4
5 files changed, 67 insertions, 32 deletions
diff --git a/src/library.js b/src/library.js
index 69ffdd2b..77d677da 100644
--- a/src/library.js
+++ b/src/library.js
@@ -6802,8 +6802,13 @@ LibraryManager.library = {
socket: function(family, type, protocol) {
var fd = Sockets.nextFd++;
assert(fd < 64); // select() assumes socket fd values are in 0..63
+ var stream = type == {{{ cDefine('SOCK_STREAM') }}};
+ if (protocol) {
+ assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if stream, must be tcp
+ }
Sockets.fds[fd] = {
- connected: false
+ connected: false,
+ stream: stream
};
return fd;
},
@@ -6832,28 +6837,49 @@ LibraryManager.library = {
var i8Temp = new Uint8Array(i32Temp.buffer);
info.inQueue = [];
+ if (!info.stream) {
+ var partialBuffer = null; // inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message
+ }
+
info.socket.onmessage = function(event) {
assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data!
var data = new Uint8Array(event.data); // make a typed array view on the array buffer
#if SOCKET_DEBUG
Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]);
#endif
-#if SOCKET_FORCED_MESSAGING
- var start = 0;
- while (start+4 < data.length) {
- i8Temp.set(data.subarray(start, start+4));
- var currLen = i32Temp[0];
- assert(currLen > 0);
- start += 4;
- assert(start + currLen <= data.length, [data.length, start, currLen]); // must not receive fractured messages!
- info.inQueue.push(data.subarray(start, start+currLen));
+ if (info.stream) {
+ info.inQueue.push(data);
+ } else {
+ // we added headers with message sizes, read those to find discrete messages
+ if (partialBuffer) {
+ // append to the partial buffer
+ var newBuffer = new Uint8Array(partialBuffer.length + data.length);
+ newBuffer.set(partialBuffer);
+ newBuffer.set(data, partialBuffer.length);
+ // forget the partial buffer and work on data
+ data = newBuffer;
+ partialBuffer = null;
+ }
+ var currPos = 0;
+ while (currPos+4 < data.length) {
+ i8Temp.set(data.subarray(currPos, currPos+4));
+ var currLen = i32Temp[0];
+ assert(currLen > 0);
+ if (currPos + 4 + currLen > data.length) {
+ break; // not enough data has arrived
+ }
+ currPos += 4;
#if SOCKET_DEBUG
- Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(start, start+currLen))]);
+ Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]);
#endif
- start += currLen;
+ info.inQueue.push(data.subarray(currPos, currPos+currLen));
+ currPos += currLen;
+ }
+ // If data remains, buffer it
+ if (currPos < data.length) {
+ partialBuffer = data.subarray(currPos);
+ }
}
-#else
- info.inQueue.push(data);
#endif
}
function send(data) {
@@ -6885,15 +6911,14 @@ LibraryManager.library = {
}
}
info.sender = function(data) {
-#if SOCKET_FORCED_MESSAGING
- var buffer = new Uint8Array(data.length+4);
- i32Temp[0] = data.length;
- buffer.set(i8Temp);
- buffer.set(data, 4);
- outQueue.push(buffer);
-#else
+ if (!info.stream) {
+ // add a header with the message size
+ var header = new Uint8Array(4);
+ i32Temp[0] = data.length;
+ header.set(i8Temp);
+ outQueue.push(header);
+ }
outQueue.push(new Uint8Array(data));
-#endif
trySend();
};
return 0;
@@ -7007,6 +7032,12 @@ LibraryManager.library = {
HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf);
bufferPos += currNum;
}
+ if (info.stream) {
+ // This is tcp (reliable), so if not all was read, keep it
+ if (bufferPos < bytes) {
+ info.inQueue.unshift(buffer.subArray(bufferPos));
+ }
+ }
return ret;
},
diff --git a/src/settings.js b/src/settings.js
index ba37c715..42267428 100644
--- a/src/settings.js
+++ b/src/settings.js
@@ -147,12 +147,6 @@ var LIBRARY_DEBUG = 0; // Print out when we enter a library call (library*.js).
var GL_DEBUG = 0; // Print out all calls into WebGL. As with LIBRARY_DEBUG, you can set a runtime
// option, in this case GL.debug.
var SOCKET_DEBUG = 0; // Log out socket/network data transfer.
-var SOCKET_FORCED_MESSAGING = 0; // If 1, we make sure that each socket send ends up a single socket
- // receive, that is, we force proper messaging (otherwise, sending
- // [A] and [B] can show up on the other side as [A, B]). This will
- // only work if both sides have it enabled, obviously, so it only
- // makes sense for p2p or when connecting to a special server - we
- // add some metadata (message size) to messages in this mode
var PROFILE_MAIN_LOOP = 0; // Profile the function called in set_main_loop
@@ -1197,6 +1191,8 @@ var C_DEFINES = {'SI_MESGQ': '5',
'_SC_TTY_NAME_MAX': '41',
'AF_INET': '1',
'AF_INET6': '6',
- 'FIONREAD': '1'
+ 'FIONREAD': '1',
+ 'SOCK_STREAM': '200',
+ 'IPPROTO_TCP': 1
};
diff --git a/tests/runner.py b/tests/runner.py
index b26162c0..2d4146a9 100755
--- a/tests/runner.py
+++ b/tests/runner.py
@@ -10283,12 +10283,12 @@ elif 'browser' in str(sys.argv):
return relay_server
def test_zz_websockets_bi(self):
- for fm in [0,1]:
+ for datagram in [0,1]:
try:
with self.WebsockHarness(8992, self.make_relay_server(8992, 8994)):
with self.WebsockHarness(8994, no_server=True):
- Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-s', 'SOCKET_FORCED_MESSAGING=%d' % fm]).communicate()
- self.btest('websockets_bi.c', expected='2499', args=['-s', 'SOCKET_FORCED_MESSAGING=%d' % fm])
+ Popen([PYTHON, EMCC, path_from_root('tests', 'websockets_bi_side.c'), '-o', 'side.html', '-DSOCKK=8995', '-DTEST_DGRAM=%d' % datagram]).communicate()
+ self.btest('websockets_bi.c', expected='2499', args=['-DTEST_DGRAM=%d' % datagram])
finally:
self.clean_pids()
diff --git a/tests/websockets_bi.c b/tests/websockets_bi.c
index 73d061d2..c2dbb7da 100644
--- a/tests/websockets_bi.c
+++ b/tests/websockets_bi.c
@@ -81,7 +81,11 @@ int main(void)
struct sockaddr_in stSockAddr;
int Res;
+#if !TEST_DGRAM
SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+#else
+ SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+#endif
if (-1 == SocketFD)
{
diff --git a/tests/websockets_bi_side.c b/tests/websockets_bi_side.c
index c9c50618..12b790fd 100644
--- a/tests/websockets_bi_side.c
+++ b/tests/websockets_bi_side.c
@@ -18,7 +18,11 @@ int main(void)
{
struct sockaddr_in stSockAddr;
int Res;
+#if !TEST_DGRAM
int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+#else
+ int SocketFD = socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP);
+#endif
if (-1 == SocketFD)
{