diff options
author | Fraser Adams <fraser.adams@blueyonder.co.uk> | 2014-05-30 17:15:30 +0100 |
---|---|---|
committer | Fraser Adams <fraser.adams@blueyonder.co.uk> | 2014-05-30 17:15:30 +0100 |
commit | d7d2fcc94b918e4dc477561c925444bdceb3e6d4 (patch) | |
tree | 6225e689885d1f239a28862a57486a572d8740e7 | |
parent | 807e419d4a73a4b7b6f60bc9e2bf770924c3442d (diff) |
Feature to enable asynchronous event driven of network events
-rw-r--r-- | src/library.js | 23 | ||||
-rw-r--r-- | src/library_sockfs.js | 140 | ||||
-rw-r--r-- | src/struct_info.json | 4 | ||||
-rw-r--r-- | system/include/emscripten/emscripten.h | 28 | ||||
-rw-r--r-- | tests/sockets/test_sockets_echo_client.c | 34 | ||||
-rw-r--r-- | tests/sockets/test_sockets_echo_server.c | 18 | ||||
-rw-r--r-- | tests/test_sockets.py | 20 |
7 files changed, 258 insertions, 9 deletions
diff --git a/src/library.js b/src/library.js index fb1a8998..22616335 100644 --- a/src/library.js +++ b/src/library.js @@ -8307,6 +8307,29 @@ LibraryManager.library = { return 0; }, + getsockopt__deps: ['$SOCKFS', '__setErrNo', '$ERRNO_CODES'], + getsockopt: function(fd, level, optname, optval, optlen) { + // int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen); + // http://pubs.opengroup.org/onlinepubs/000095399/functions/getsockopt.html + // Minimal getsockopt aimed at resolving https://github.com/kripken/emscripten/issues/2211 + // so only supports SOL_SOCKET with SO_ERROR. + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } + + if (level === {{{ cDefine('SOL_SOCKET') }}} && optname === {{{ cDefine('SO_ERROR') }}}) { + {{{ makeSetValue('optval', 0, 'sock.error', 'i32') }}}; + {{{ makeSetValue('optlen', 0, 4, 'i32') }}}; + sock.error = null; // Clear the error (The SO_ERROR option obtains and then clears this field). + return 0; + } else { + ___setErrNo(ERRNO_CODES.EINVAL); + return -1; + } + }, + mkport: function() { throw 'TODO' }, // ========================================================================== diff --git a/src/library_sockfs.js b/src/library_sockfs.js index 23641464..68da69d8 100644 --- a/src/library_sockfs.js +++ b/src/library_sockfs.js @@ -3,6 +3,37 @@ mergeInto(LibraryManager.library, { $SOCKFS__deps: ['$FS', 'mkport'], $SOCKFS: { mount: function(mount) { + // If Module['websocket'] has already been defined (e.g. for configuring + // the subprotocol/url) use that, if not initialise it to a new object. + Module['websocket'] = (Module['websocket'] && + ('object' === typeof Module['websocket'])) ? Module['websocket'] : {}; + + // Add the Event registration mechanism to the exported websocket configuration + // object so we can register network callbacks from native JavaScript too. + Module['websocket']._callbacks = {}; + Module['websocket']['on'] = function(event, callback) { + if ('function' === typeof callback) { + this._callbacks[event] = callback; + } + return this; + }; + + Module['websocket'].emit = function(event, param) { + if ('function' === typeof this._callbacks[event]) { + this._callbacks[event].call(this, param); + } + }; + + // If debug is enabled register simple default logging callbacks for each Event. +#if SOCKET_DEBUG + Module['websocket']['on']('error', function(error) {Module.print('Socket error ' + error);}); + Module['websocket']['on']('open', function(fd) {Module.print('Socket open fd = ' + fd);}); + Module['websocket']['on']('listen', function(fd) {Module.print('Socket listen fd = ' + fd);}); + Module['websocket']['on']('connection', function(fd) {Module.print('Socket connection fd = ' + fd);}); + Module['websocket']['on']('message', function(fd) {Module.print('Socket message fd = ' + fd);}); + Module['websocket']['on']('close', function(fd) {Module.print('Socket close fd = ' + fd);}); +#endif + return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 511 /* 0777 */, 0); }, createSocket: function(family, type, protocol) { @@ -17,6 +48,7 @@ mergeInto(LibraryManager.library, { type: type, protocol: protocol, server: null, + error: null, // Used in getsockopt for SOL_SOCKET/SO_ERROR test peers: {}, pending: [], recv_queue: [], @@ -136,8 +168,8 @@ mergeInto(LibraryManager.library, { // runtimeConfig gets set to true if WebSocket runtime configuration is available. var runtimeConfig = (Module['websocket'] && ('object' === typeof Module['websocket'])); - // The default value is 'ws://' the replace is needed because the compiler replaces "//" comments with '#' - // comments without checking context, so we'd end up with ws:#, the replace swaps the "#" for "//" again. + // The default value is 'ws://' the replace is needed because the compiler replaces '//' comments with '#' + // comments without checking context, so we'd end up with ws:#, the replace swaps the '#' for '//' again. var url = '{{{ WEBSOCKET_URL }}}'.replace('#', '//'); if (runtimeConfig) { @@ -224,6 +256,9 @@ mergeInto(LibraryManager.library, { #if SOCKET_DEBUG Module.print('websocket handle open'); #endif + + Module['websocket'].emit('open', sock.stream.fd); + try { var queued = peer.dgram_send_queue.shift(); while (queued) { @@ -264,6 +299,7 @@ mergeInto(LibraryManager.library, { } sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); + Module['websocket'].emit('message', sock.stream.fd); }; if (ENVIRONMENT_IS_NODE) { @@ -274,14 +310,32 @@ mergeInto(LibraryManager.library, { } handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer }); - peer.socket.on('error', function() { + peer.socket.on('close', function() { + Module['websocket'].emit('close', sock.stream.fd); + }); + peer.socket.on('error', function(error) { + // Although the ws library may pass errors that may be more descriptive than + // ECONNREFUSED they are not necessarily the expected error code e.g. + // ENOTFOUND on getaddrinfo seems to be node.js specific, so using ECONNREFUSED + // is still probably the most useful thing to do. + sock.error = 111; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, 111, 'ECONNREFUSED: Connection refused']); // don't throw }); } else { peer.socket.onopen = handleOpen; + peer.socket.onclose = function() { + Module['websocket'].emit('close', sock.stream.fd); + }; peer.socket.onmessage = function peer_socket_onmessage(event) { handleMessage(event.data); }; + peer.socket.onerror = function(error) { + // The WebSocket spec only allows a 'simple event' to be thrown on error, + // so we only really know as much as ECONNREFUSED. + sock.error = 111; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, 111, 'ECONNREFUSED: Connection refused']); + }; } }, @@ -380,7 +434,7 @@ mergeInto(LibraryManager.library, { }, connect: function(sock, addr, port) { if (sock.server) { - throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP); + throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP); } // TODO autobind @@ -425,6 +479,7 @@ mergeInto(LibraryManager.library, { port: sock.sport // TODO support backlog }); + Module['websocket'].emit('listen', sock.stream.fd); // Send Event with listen fd. sock.server.on('connection', function(ws) { #if SOCKET_DEBUG @@ -440,17 +495,31 @@ mergeInto(LibraryManager.library, { // push to queue for accept to pick up sock.pending.push(newsock); + Module['websocket'].emit('connection', newsock.stream.fd); } else { // create a peer on the listen socket so calling sendto // with the listen socket and an address will resolve // to the correct client SOCKFS.websocket_sock_ops.createPeer(sock, ws); + Module['websocket'].emit('connection', sock.stream.fd); } }); sock.server.on('closed', function() { + Module['websocket'].emit('close', sock.stream.fd); sock.server = null; }); - sock.server.on('error', function() { + sock.server.on('error', function(error) { + // For server error messages we try to use the message passed by the ws library + // but ENOTFOUND is node.js specific so we map it to EHOSTUNREACH. That's + // not *really* the correct error as ENOTFOUND is a getaddrinfo error but + // the gai_errno value would look even weirder here. This error shouldn't + // occur in a well written app as errors should get trapped in the compiled + // app's own getaddrinfo call. + error.errno = (error.errno === 'ENOTFOUND') ? 'EHOSTUNREACH' : error.errno; + var errno = ERRNO_CODES[error.errno]; + var msg = error.errno + ': ' + ERRNO_MESSAGES[errno]; + sock.error = errno; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, errno, msg]); // don't throw }); }, @@ -604,5 +673,66 @@ mergeInto(LibraryManager.library, { return res; } } + }, + + /* + * Mechanism to register handlers for the various Socket Events from C code. + * The registration functions are mostly variations on a theme, so we use this + * generic handler. Most of the callback functions take a file descriptor as a + * parameter, which will get passed to them by the emitting call. The error + * callback also takes an int representing the errno and a char* representing the + * error message, which we extract from the data passed to _callback and convert + * to a char* string before calling the registered C callback. + * Passing a NULL callback function to a emscripten_set_socket_*_callback call + * will deregister the callback registered for that Event. + */ + __set_network_callback: function(event, callback) { + function _callback(data) { + try { + if (event === 'error') { + var sp = Runtime.stackSave(); + var msg = allocate(intArrayFromString(data[2]), 'i8', ALLOC_STACK); + Runtime.dynCall('viii', callback, [data[0], data[1], msg]); + Runtime.stackRestore(sp); + } else { + Runtime.dynCall('vi', callback, [data]); + } + } catch (e) { + if (e instanceof ExitStatus) { + return; + } else { + if (e && typeof e === 'object' && e.stack) Module.printErr('exception thrown: ' + [e, e.stack]); + throw e; + } + } + }; + + Module['noExitRuntime'] = true; + Module['websocket']['on'](event, callback ? _callback : null); + }, + emscripten_set_socket_error_callback__deps: ['__set_network_callback'], + emscripten_set_socket_error_callback: function(callback) { + ___set_network_callback('error', callback); + }, + emscripten_set_socket_open_callback__deps: ['__set_network_callback'], + emscripten_set_socket_open_callback: function(callback) { + ___set_network_callback('open', callback); + }, + emscripten_set_socket_listen_callback__deps: ['__set_network_callback'], + emscripten_set_socket_listen_callback: function(callback) { + ___set_network_callback('listen', callback); + + }, + emscripten_set_socket_connection_callback__deps: ['__set_network_callback'], + emscripten_set_socket_connection_callback: function(callback) { + ___set_network_callback('connection', callback); + }, + emscripten_set_socket_message_callback__deps: ['__set_network_callback'], + emscripten_set_socket_message_callback: function(callback) { + ___set_network_callback('message', callback); + }, + emscripten_set_socket_close_callback__deps: ['__set_network_callback'], + emscripten_set_socket_close_callback: function(callback) { + ___set_network_callback('close', callback); } }); diff --git a/src/struct_info.json b/src/struct_info.json index 54c89fd7..2fb9962d 100644 --- a/src/struct_info.json +++ b/src/struct_info.json @@ -441,7 +441,9 @@ "SOCK_STREAM", "AF_INET", "AF_UNSPEC", - "AF_INET6" + "AF_INET6", + "SOL_SOCKET", + "SO_ERROR" ], "structs": {} }, diff --git a/system/include/emscripten/emscripten.h b/system/include/emscripten/emscripten.h index 66017a8d..d3a3a257 100644 --- a/system/include/emscripten/emscripten.h +++ b/system/include/emscripten/emscripten.h @@ -155,6 +155,34 @@ extern void emscripten_cancel_main_loop(void); #endif /* + * Registers callback functions for receiving socket events. + * These events are analogous to WebSocket events but are emitted + * *after* the internal emscripten socket processing has occurred + * so, for example, the message callback will be triggered after + * the data has been added to the recv_queue this means that an + * application receiving this callback can simply read/recv the data + * using the file descriptor passed as a parameter to the callback. + * All of the callbacks except the error callback are passed a file + * descriptor representing the fd that the notified activity took + * place on. The error callback takes an int representing errno and + * a char* representing the error message. Passing a NULL callback + * function to a emscripten_set_socket_*_callback call will deregister + * the callback registered for that Event. + */ +// Triggered by a WebSocket error. +extern void emscripten_set_socket_error_callback(void (*func)(int fd, int err, const char* msg)); +// Triggered when the WebSocket has actually opened. +extern void emscripten_set_socket_open_callback(void (*func)(int fd)); +// Triggered when listen has been called (synthetic event). +extern void emscripten_set_socket_listen_callback(void (*func)(int fd)); +// Triggered when the connection has actually been established. +extern void emscripten_set_socket_connection_callback(void (*func)(int fd)); +// Triggered when data is available to be read from the socket. +extern void emscripten_set_socket_message_callback(void (*func)(int fd)); +// Triggered when the WebSocket has actually closed. +extern void emscripten_set_socket_close_callback(void (*func)(int fd)); + +/* * Add a function to a queue of events that will execute * before the main loop will continue. The event is pushed * into the back of the queue. (Note that in the native version diff --git a/tests/sockets/test_sockets_echo_client.c b/tests/sockets/test_sockets_echo_client.c index 58d005c4..6f1c01c4 100644 --- a/tests/sockets/test_sockets_echo_client.c +++ b/tests/sockets/test_sockets_echo_client.c @@ -95,7 +95,9 @@ void main_loop() { assert(!strcmp(server.msg.buffer, MESSAGE)); finish(EXIT_SUCCESS); } - } else { + } + + if (server.state == MSG_WRITE) { if (!FD_ISSET(server.fd, &fdw)) { return; } @@ -117,6 +119,28 @@ void main_loop() { } } +// The callbacks for the async network events have a different signature than from +// emscripten_set_main_loop (they get passed the fd of the socket triggering the event). +// In this test application we want to try and keep as much in common as the timed loop +// version but in a real application the fd can be used instead of needing to select(). +void async_main_loop(int fd) { + main_loop(); +} + +void error_callback(int fd, int err, const char* msg) { + int error; + socklen_t len = sizeof(error); + + int ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + printf("error message: %s\n", msg); + + if (err == error) { + finish(EXIT_SUCCESS); + } else { + finish(EXIT_FAILURE); + } +} + int main() { struct sockaddr_in addr; int res; @@ -161,7 +185,13 @@ int main() { } #ifdef __EMSCRIPTEN__ - emscripten_set_main_loop(main_loop, 0, 0); +#if TEST_ASYNC + emscripten_set_socket_error_callback(error_callback); + emscripten_set_socket_open_callback(async_main_loop); + emscripten_set_socket_message_callback(async_main_loop); +#else + emscripten_set_main_loop(main_loop, 60, 0); +#endif #else while (1) main_loop(); #endif diff --git a/tests/sockets/test_sockets_echo_server.c b/tests/sockets/test_sockets_echo_server.c index 55898add..492fc6bb 100644 --- a/tests/sockets/test_sockets_echo_server.c +++ b/tests/sockets/test_sockets_echo_server.c @@ -114,7 +114,9 @@ void main_loop() { client.read = 0; client.state = MSG_WRITE; } - } else { + } + + if (client.state == MSG_WRITE) { if (!FD_ISSET(fd, &fdw)) { return; } @@ -142,6 +144,14 @@ void main_loop() { } } +// The callbacks for the async network events have a different signature than from +// emscripten_set_main_loop (they get passed the fd of the socket triggering the event). +// In this test application we want to try and keep as much in common as the timed loop +// version but in a real application the fd can be used instead of needing to select(). +void async_main_loop(int fd) { + main_loop(); +} + int main() { struct sockaddr_in addr; int res; @@ -187,7 +197,13 @@ int main() { #endif #ifdef __EMSCRIPTEN__ +#if TEST_ASYNC + emscripten_set_socket_connection_callback(async_main_loop); + emscripten_set_socket_message_callback(async_main_loop); + emscripten_set_socket_close_callback(async_main_loop); +#else emscripten_set_main_loop(main_loop, 60, 0); +#endif #else while (1) main_loop(); #endif diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 8c2889df..4de8d145 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -248,6 +248,26 @@ class sockets(BrowserCore): with harness: self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=%d' % harness.listen_port, '-DTEST_DGRAM=%d' % datagram, sockets_include]) + def test_sockets_async_echo(self): + # Run with ./runner.py sockets.test_sockets_async_echo + sockets_include = '-I'+path_from_root('tests', 'sockets') + + # Websockify-proxied servers can't run dgram tests + harnesses = [ + (WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_ASYNC=1'], 49165), 0), + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=0', '-DTEST_ASYNC=1'], 49166), 0), + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=1', '-DTEST_ASYNC=1'], 49167), 1), + # The following forces non-NULL addr and addlen parameters for the accept call + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=0', '-DTEST_ACCEPT_ADDR=1', '-DTEST_ASYNC=1'], 49168), 0) + ] + + for harness, datagram in harnesses: + with harness: + self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=%d' % harness.listen_port, '-DTEST_DGRAM=%d' % datagram, '-DTEST_ASYNC=1', sockets_include]) + + # Deliberately attempt a connection on a port that will fail to test the error callback and getsockopt + self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=49169', '-DTEST_ASYNC=1', sockets_include]) + def test_sockets_echo_bigdata(self): sockets_include = '-I'+path_from_root('tests', 'sockets') |