diff options
author | juj <jujjyl@gmail.com> | 2014-07-15 09:59:29 +0200 |
---|---|---|
committer | juj <jujjyl@gmail.com> | 2014-07-15 09:59:29 +0200 |
commit | e316fe35ad5c241012b4a59503e4ffa90c5d77f7 (patch) | |
tree | 834fca8a014702443c7f1f474e61fc575435f9af | |
parent | eca09b5500d7c5dcc61ff368d7438616d298c2a7 (diff) | |
parent | 53e5ce6ee01e01131a1fd8894b4baab9c629f4d3 (diff) |
Merge pull request #2390 from fadams/add-network-callbacks
Feature to enable asynchronous event driven network event handling
-rw-r--r-- | src/library.js | 34 | ||||
-rw-r--r-- | src/library_sockfs.js | 137 | ||||
-rw-r--r-- | src/struct_info.json | 4 | ||||
-rw-r--r-- | system/include/emscripten/emscripten.h | 60 | ||||
-rw-r--r-- | tests/sockets/test_sockets_echo_client.c | 38 | ||||
-rw-r--r-- | tests/sockets/test_sockets_echo_server.c | 21 | ||||
-rw-r--r-- | tests/test_sockets.py | 20 |
7 files changed, 305 insertions, 9 deletions
diff --git a/src/library.js b/src/library.js index 7bc3f494..3ec16c16 100644 --- a/src/library.js +++ b/src/library.js @@ -8242,6 +8242,40 @@ LibraryManager.library = { return 0; }, + getsockopt__deps: ['$SOCKFS', '__setErrNo', '$ERRNO_CODES'], + getsockopt: function(fd, level, optname, optval, optlen) { + // int getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen); + // http://pubs.opengroup.org/onlinepubs/000095399/functions/getsockopt.html + // Minimal getsockopt aimed at resolving https://github.com/kripken/emscripten/issues/2211 + // so only supports SOL_SOCKET with SO_ERROR. + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } + + if (level === {{{ cDefine('SOL_SOCKET') }}}) { + if (optname === {{{ cDefine('SO_ERROR') }}}) { + {{{ makeSetValue('optval', 0, 'sock.error', 'i32') }}}; + {{{ makeSetValue('optlen', 0, 4, 'i32') }}}; + sock.error = null; // Clear the error (The SO_ERROR option obtains and then clears this field). + return 0; + } else { + ___setErrNo(ERRNO_CODES.ENOPROTOOPT); // The option is unknown at the level indicated. +#if ASSERTIONS + Runtime.warnOnce('getsockopt() returning an error as we currently only support optname SO_ERROR'); +#endif + return -1; + } + } else { + ___setErrNo(ERRNO_CODES.ENOPROTOOPT); //The option is unknown at the level indicated. +#if ASSERTIONS + Runtime.warnOnce('getsockopt() returning an error as we only support level SOL_SOCKET'); +#endif + return -1; + } + }, + mkport: function() { throw 'TODO' }, // ========================================================================== diff --git a/src/library_sockfs.js b/src/library_sockfs.js index 23641464..6bd68b65 100644 --- a/src/library_sockfs.js +++ b/src/library_sockfs.js @@ -3,6 +3,38 @@ mergeInto(LibraryManager.library, { $SOCKFS__deps: ['$FS', 'mkport'], $SOCKFS: { mount: function(mount) { + // If Module['websocket'] has already been defined (e.g. for configuring + // the subprotocol/url) use that, if not initialise it to a new object. + Module['websocket'] = (Module['websocket'] && + ('object' === typeof Module['websocket'])) ? Module['websocket'] : {}; + + // Add the Event registration mechanism to the exported websocket configuration + // object so we can register network callbacks from native JavaScript too. + // For more documentation see system/include/emscripten/emscripten.h + Module['websocket']._callbacks = {}; + Module['websocket']['on'] = function(event, callback) { + if ('function' === typeof callback) { + this._callbacks[event] = callback; + } + return this; + }; + + Module['websocket'].emit = function(event, param) { + if ('function' === typeof this._callbacks[event]) { + this._callbacks[event].call(this, param); + } + }; + + // If debug is enabled register simple default logging callbacks for each Event. +#if SOCKET_DEBUG + Module['websocket']['on']('error', function(error) {Module.printErr('Socket error ' + error);}); + Module['websocket']['on']('open', function(fd) {Module.print('Socket open fd = ' + fd);}); + Module['websocket']['on']('listen', function(fd) {Module.print('Socket listen fd = ' + fd);}); + Module['websocket']['on']('connection', function(fd) {Module.print('Socket connection fd = ' + fd);}); + Module['websocket']['on']('message', function(fd) {Module.print('Socket message fd = ' + fd);}); + Module['websocket']['on']('close', function(fd) {Module.print('Socket close fd = ' + fd);}); +#endif + return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 511 /* 0777 */, 0); }, createSocket: function(family, type, protocol) { @@ -17,6 +49,7 @@ mergeInto(LibraryManager.library, { type: type, protocol: protocol, server: null, + error: null, // Used in getsockopt for SOL_SOCKET/SO_ERROR test peers: {}, pending: [], recv_queue: [], @@ -136,8 +169,8 @@ mergeInto(LibraryManager.library, { // runtimeConfig gets set to true if WebSocket runtime configuration is available. var runtimeConfig = (Module['websocket'] && ('object' === typeof Module['websocket'])); - // The default value is 'ws://' the replace is needed because the compiler replaces "//" comments with '#' - // comments without checking context, so we'd end up with ws:#, the replace swaps the "#" for "//" again. + // The default value is 'ws://' the replace is needed because the compiler replaces '//' comments with '#' + // comments without checking context, so we'd end up with ws:#, the replace swaps the '#' for '//' again. var url = '{{{ WEBSOCKET_URL }}}'.replace('#', '//'); if (runtimeConfig) { @@ -224,6 +257,9 @@ mergeInto(LibraryManager.library, { #if SOCKET_DEBUG Module.print('websocket handle open'); #endif + + Module['websocket'].emit('open', sock.stream.fd); + try { var queued = peer.dgram_send_queue.shift(); while (queued) { @@ -264,6 +300,7 @@ mergeInto(LibraryManager.library, { } sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); + Module['websocket'].emit('message', sock.stream.fd); }; if (ENVIRONMENT_IS_NODE) { @@ -274,14 +311,32 @@ mergeInto(LibraryManager.library, { } handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer }); - peer.socket.on('error', function() { + peer.socket.on('close', function() { + Module['websocket'].emit('close', sock.stream.fd); + }); + peer.socket.on('error', function(error) { + // Although the ws library may pass errors that may be more descriptive than + // ECONNREFUSED they are not necessarily the expected error code e.g. + // ENOTFOUND on getaddrinfo seems to be node.js specific, so using ECONNREFUSED + // is still probably the most useful thing to do. + sock.error = ERRNO_CODES.ECONNREFUSED; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, sock.error, 'ECONNREFUSED: Connection refused']); // don't throw }); } else { peer.socket.onopen = handleOpen; + peer.socket.onclose = function() { + Module['websocket'].emit('close', sock.stream.fd); + }; peer.socket.onmessage = function peer_socket_onmessage(event) { handleMessage(event.data); }; + peer.socket.onerror = function(error) { + // The WebSocket spec only allows a 'simple event' to be thrown on error, + // so we only really know as much as ECONNREFUSED. + sock.error = ERRNO_CODES.ECONNREFUSED; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, sock.error, 'ECONNREFUSED: Connection refused']); + }; } }, @@ -380,7 +435,7 @@ mergeInto(LibraryManager.library, { }, connect: function(sock, addr, port) { if (sock.server) { - throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP); + throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP); } // TODO autobind @@ -425,6 +480,7 @@ mergeInto(LibraryManager.library, { port: sock.sport // TODO support backlog }); + Module['websocket'].emit('listen', sock.stream.fd); // Send Event with listen fd. sock.server.on('connection', function(ws) { #if SOCKET_DEBUG @@ -440,17 +496,28 @@ mergeInto(LibraryManager.library, { // push to queue for accept to pick up sock.pending.push(newsock); + Module['websocket'].emit('connection', newsock.stream.fd); } else { // create a peer on the listen socket so calling sendto // with the listen socket and an address will resolve // to the correct client SOCKFS.websocket_sock_ops.createPeer(sock, ws); + Module['websocket'].emit('connection', sock.stream.fd); } }); sock.server.on('closed', function() { + Module['websocket'].emit('close', sock.stream.fd); sock.server = null; }); - sock.server.on('error', function() { + sock.server.on('error', function(error) { + // Although the ws library may pass errors that may be more descriptive than + // ECONNREFUSED they are not necessarily the expected error code e.g. + // ENOTFOUND on getaddrinfo seems to be node.js specific, so using EHOSTUNREACH + // is still probably the most useful thing to do. This error shouldn't + // occur in a well written app as errors should get trapped in the compiled + // app's own getaddrinfo call. + sock.error = ERRNO_CODES.EHOSTUNREACH; // Used in getsockopt for SOL_SOCKET/SO_ERROR test. + Module['websocket'].emit('error', [sock.stream.fd, sock.error, 'EHOSTUNREACH: Host is unreachable']); // don't throw }); }, @@ -604,5 +671,65 @@ mergeInto(LibraryManager.library, { return res; } } + }, + + /* + * Mechanism to register handlers for the various Socket Events from C code. + * The registration functions are mostly variations on a theme, so we use this + * generic handler. Most of the callback functions take a file descriptor as a + * parameter, which will get passed to them by the emitting call. The error + * callback also takes an int representing the errno and a char* representing the + * error message, which we extract from the data passed to _callback and convert + * to a char* string before calling the registered C callback. + * Passing a NULL callback function to a emscripten_set_socket_*_callback call + * will deregister the callback registered for that Event. + */ + __set_network_callback: function(event, userData, callback) { + function _callback(data) { + try { + if (event === 'error') { + var sp = Runtime.stackSave(); + var msg = allocate(intArrayFromString(data[2]), 'i8', ALLOC_STACK); + Runtime.dynCall('viiii', callback, [data[0], data[1], msg, userData]); + Runtime.stackRestore(sp); + } else { + Runtime.dynCall('vii', callback, [data, userData]); + } + } catch (e) { + if (e instanceof ExitStatus) { + return; + } else { + if (e && typeof e === 'object' && e.stack) Module.printErr('exception thrown: ' + [e, e.stack]); + throw e; + } + } + }; + + Module['noExitRuntime'] = true; + Module['websocket']['on'](event, callback ? _callback : null); + }, + emscripten_set_socket_error_callback__deps: ['__set_network_callback'], + emscripten_set_socket_error_callback: function(userData, callback) { + ___set_network_callback('error', userData, callback); + }, + emscripten_set_socket_open_callback__deps: ['__set_network_callback'], + emscripten_set_socket_open_callback: function(userData, callback) { + ___set_network_callback('open', userData, callback); + }, + emscripten_set_socket_listen_callback__deps: ['__set_network_callback'], + emscripten_set_socket_listen_callback: function(userData, callback) { + ___set_network_callback('listen', userData, callback); + }, + emscripten_set_socket_connection_callback__deps: ['__set_network_callback'], + emscripten_set_socket_connection_callback: function(userData, callback) { + ___set_network_callback('connection', userData, callback); + }, + emscripten_set_socket_message_callback__deps: ['__set_network_callback'], + emscripten_set_socket_message_callback: function(userData, callback) { + ___set_network_callback('message', userData, callback); + }, + emscripten_set_socket_close_callback__deps: ['__set_network_callback'], + emscripten_set_socket_close_callback: function(userData, callback) { + ___set_network_callback('close', userData, callback); } }); diff --git a/src/struct_info.json b/src/struct_info.json index 655b6bc9..94a64845 100644 --- a/src/struct_info.json +++ b/src/struct_info.json @@ -441,7 +441,9 @@ "SOCK_STREAM", "AF_INET", "AF_UNSPEC", - "AF_INET6" + "AF_INET6", + "SOL_SOCKET", + "SO_ERROR" ], "structs": {} }, diff --git a/system/include/emscripten/emscripten.h b/system/include/emscripten/emscripten.h index 0500b4bb..7533be8c 100644 --- a/system/include/emscripten/emscripten.h +++ b/system/include/emscripten/emscripten.h @@ -190,6 +190,66 @@ extern void emscripten_cancel_main_loop(void); #endif /* + * Registers callback functions for receiving socket events. + * These events are analogous to WebSocket events but are emitted + * *after* the internal emscripten socket processing has occurred + * so, for example, the message callback will be triggered after + * the data has been added to the recv_queue. This means that an + * application receiving this callback can simply read/recv the data + * using the file descriptor passed as a parameter to the callback. + * All of the callbacks are passed a file descriptor representing + * the fd that the notified activity took place on. The error + * callback also takes an int representing errno and a char* that + * represents the error message. + * + * Only a single callback function may be registered to handle any + * given Event, so calling a given registration function more than + * once will cause the first callback to be replaced by the second. + * Similarly passing a NULL callback function to any + * emscripten_set_socket_*_callback call will deregister the callback + * registered for that Event. + * + * The userData pointer allows arbitrary data specified during Event + * registration to be passed to the callback, this is particularly + * useful for passing "this" pointers around in Object Oriented code. + * + * In addition to being able to register network callbacks from C + * it is also possible for native JavaScript code to directly use the + * underlying mechanism used to implement the callback registration. + * For example, the following are the simple logging callbacks that + * are registered by default when SOCKET_DEBUG is enabled + * Module['websocket']['on']('error', function(error) {console.log('Socket error ' + error);}); + * Module['websocket']['on']('open', function(fd) {console.log('Socket open fd = ' + fd);}); + * Module['websocket']['on']('listen', function(fd) {console.log('Socket listen fd = ' + fd);}); + * Module['websocket']['on']('connection', function(fd) {console.log('Socket connection fd = ' + fd);}); + * Module['websocket']['on']('message', function(fd) {console.log('Socket message fd = ' + fd);}); + * Module['websocket']['on']('close', function(fd) {console.log('Socket close fd = ' + fd);}); + * + * Most of the JavaScript callback functions above get passed the + * file descriptor of the socket that triggered the callback, the + * on error callback however gets passed an *array* that contains + * the file descriptor, the error code and an error message. + * + * Note that the underlying JavaScript implementation doesn't pass + * userData, this is actually mostly of use to C/C++ code and the + * emscripten_set_socket_*_callback calls actually create a closure + * containing the userData and pass that as the callback to the + * underlying JavaScript Event registration mechanism. + */ +// Triggered by a WebSocket error. +extern void emscripten_set_socket_error_callback(void *userData, void (*func)(int fd, int err, const char* msg, void *userData)); +// Triggered when the WebSocket has actually opened. +extern void emscripten_set_socket_open_callback(void *userData, void (*func)(int fd, void *userData)); +// Triggered when listen has been called (synthetic event). +extern void emscripten_set_socket_listen_callback(void *userData, void (*func)(int fd, void *userData)); +// Triggered when the connection has actually been established. +extern void emscripten_set_socket_connection_callback(void *userData, void (*func)(int fd, void *userData)); +// Triggered when data is available to be read from the socket. +extern void emscripten_set_socket_message_callback(void *userData, void (*func)(int fd, void *userData)); +// Triggered when the WebSocket has actually closed. +extern void emscripten_set_socket_close_callback(void *userData, void (*func)(int fd, void *userData)); + +/* * Add a function to a queue of events that will execute * before the main loop will continue. The event is pushed * into the back of the queue. (Note that in the native version diff --git a/tests/sockets/test_sockets_echo_client.c b/tests/sockets/test_sockets_echo_client.c index 58d005c4..48c031a4 100644 --- a/tests/sockets/test_sockets_echo_client.c +++ b/tests/sockets/test_sockets_echo_client.c @@ -95,7 +95,9 @@ void main_loop() { assert(!strcmp(server.msg.buffer, MESSAGE)); finish(EXIT_SUCCESS); } - } else { + } + + if (server.state == MSG_WRITE) { if (!FD_ISSET(server.fd, &fdw)) { return; } @@ -117,6 +119,30 @@ void main_loop() { } } +// The callbacks for the async network events have a different signature than from +// emscripten_set_main_loop (they get passed the fd of the socket triggering the event). +// In this test application we want to try and keep as much in common as the timed loop +// version but in a real application the fd can be used instead of needing to select(). +void async_main_loop(int fd, void* userData) { + printf("%s callback\n", userData); + main_loop(); +} + +void error_callback(int fd, int err, const char* msg, void* userData) { + int error; + socklen_t len = sizeof(error); + + int ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len); + printf("%s callback\n", userData); + printf("error message: %s\n", msg); + + if (err == error) { + finish(EXIT_SUCCESS); + } else { + finish(EXIT_FAILURE); + } +} + int main() { struct sockaddr_in addr; int res; @@ -161,7 +187,15 @@ int main() { } #ifdef __EMSCRIPTEN__ - emscripten_set_main_loop(main_loop, 0, 0); +#if TEST_ASYNC + // The first parameter being passed is actually an arbitrary userData pointer + // for simplicity this test just passes a basic char* + emscripten_set_socket_error_callback("error", error_callback); + emscripten_set_socket_open_callback("open", async_main_loop); + emscripten_set_socket_message_callback("message", async_main_loop); +#else + emscripten_set_main_loop(main_loop, 60, 0); +#endif #else while (1) main_loop(); #endif diff --git a/tests/sockets/test_sockets_echo_server.c b/tests/sockets/test_sockets_echo_server.c index 55898add..4b5b75c6 100644 --- a/tests/sockets/test_sockets_echo_server.c +++ b/tests/sockets/test_sockets_echo_server.c @@ -114,7 +114,9 @@ void main_loop() { client.read = 0; client.state = MSG_WRITE; } - } else { + } + + if (client.state == MSG_WRITE) { if (!FD_ISSET(fd, &fdw)) { return; } @@ -142,6 +144,15 @@ void main_loop() { } } +// The callbacks for the async network events have a different signature than from +// emscripten_set_main_loop (they get passed the fd of the socket triggering the event). +// In this test application we want to try and keep as much in common as the timed loop +// version but in a real application the fd can be used instead of needing to select(). +void async_main_loop(int fd, void* userData) { + printf("%s callback\n", userData); + main_loop(); +} + int main() { struct sockaddr_in addr; int res; @@ -187,7 +198,15 @@ int main() { #endif #ifdef __EMSCRIPTEN__ +#if TEST_ASYNC + // The first parameter being passed is actually an arbitrary userData pointer + // for simplicity this test just passes a basic char* + emscripten_set_socket_connection_callback("connection", async_main_loop); + emscripten_set_socket_message_callback("message", async_main_loop); + emscripten_set_socket_close_callback("close", async_main_loop); +#else emscripten_set_main_loop(main_loop, 60, 0); +#endif #else while (1) main_loop(); #endif diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 8c2889df..4de8d145 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -248,6 +248,26 @@ class sockets(BrowserCore): with harness: self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=%d' % harness.listen_port, '-DTEST_DGRAM=%d' % datagram, sockets_include]) + def test_sockets_async_echo(self): + # Run with ./runner.py sockets.test_sockets_async_echo + sockets_include = '-I'+path_from_root('tests', 'sockets') + + # Websockify-proxied servers can't run dgram tests + harnesses = [ + (WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_ASYNC=1'], 49165), 0), + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=0', '-DTEST_ASYNC=1'], 49166), 0), + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=1', '-DTEST_ASYNC=1'], 49167), 1), + # The following forces non-NULL addr and addlen parameters for the accept call + (CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), [sockets_include, '-DTEST_DGRAM=0', '-DTEST_ACCEPT_ADDR=1', '-DTEST_ASYNC=1'], 49168), 0) + ] + + for harness, datagram in harnesses: + with harness: + self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=%d' % harness.listen_port, '-DTEST_DGRAM=%d' % datagram, '-DTEST_ASYNC=1', sockets_include]) + + # Deliberately attempt a connection on a port that will fail to test the error callback and getsockopt + self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=49169', '-DTEST_ASYNC=1', sockets_include]) + def test_sockets_echo_bigdata(self): sockets_include = '-I'+path_from_root('tests', 'sockets') |