diff options
author | Anthony Pesch <inolen@gmail.com> | 2013-08-24 09:31:29 -0700 |
---|---|---|
committer | Anthony Pesch <inolen@gmail.com> | 2013-08-29 01:45:38 -0700 |
commit | 462875aa0e5b2820868b539e2a185db7783653f8 (patch) | |
tree | 38ad416ee28ac0a8c5db34157608d532929e6b0d | |
parent | f7744fcb18ab5d12c30cf1340c7bacabfc13d1ab (diff) |
- created SOCKFS
- added support for node-based listen servers
- updated tests to also test against compiled listen servers
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | package.json | 5 | ||||
-rw-r--r-- | src/library.js | 764 | ||||
-rw-r--r-- | src/library_fs.js | 12 | ||||
-rw-r--r-- | src/library_sockfs.js | 554 | ||||
-rw-r--r-- | src/settings.js | 1 | ||||
-rwxr-xr-x | tests/runner.py | 4 | ||||
-rw-r--r-- | tests/sockets/test_sockets_msg.h | 1 | ||||
-rw-r--r-- | tests/sockets/test_sockets_select_server_down_client.c | 2 | ||||
-rw-r--r-- | tests/test_sockets.py | 110 |
10 files changed, 1043 insertions, 412 deletions
@@ -4,6 +4,8 @@ *.bc src/relooper*.js +node_modules/ + # Ignore generated files src/relooper.js src/relooper.js.raw.js diff --git a/package.json b/package.json new file mode 100644 index 00000000..157885fb --- /dev/null +++ b/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "ws": "~0.4.28" + } +} diff --git a/src/library.js b/src/library.js index b5542609..307df9c6 100644 --- a/src/library.js +++ b/src/library.js @@ -600,31 +600,32 @@ LibraryManager.library = { // poll.h // ========================================================================== + __DEFAULT_POLLMASK: {{{ cDefine('POLLIN') }}} | {{{ cDefine('POLLOUT') }}}, __pollfd_struct_layout: Runtime.generateStructInfo([ ['i32', 'fd'], ['i16', 'events'], ['i16', 'revents']]), - poll__deps: ['$FS', '__pollfd_struct_layout'], + poll__deps: ['$FS', '__DEFAULT_POLLMASK', '__pollfd_struct_layout'], poll: function(fds, nfds, timeout) { // int poll(struct pollfd fds[], nfds_t nfds, int timeout); // http://pubs.opengroup.org/onlinepubs/009695399/functions/poll.html - // NOTE: This is pretty much a no-op mimicking glibc. var offsets = ___pollfd_struct_layout; var nonzero = 0; for (var i = 0; i < nfds; i++) { var pollfd = fds + ___pollfd_struct_layout.__size__ * i; var fd = {{{ makeGetValue('pollfd', 'offsets.fd', 'i32') }}}; var events = {{{ makeGetValue('pollfd', 'offsets.events', 'i16') }}}; - var revents = 0; + var mask = {{{ cDefine('POLLNVAL') }}}; var stream = FS.getStream(fd); if (stream) { - if (events & {{{ cDefine('POLLIN') }}}) revents |= {{{ cDefine('POLLIN') }}}; - if (events & {{{ cDefine('POLLOUT') }}}) revents |= {{{ cDefine('POLLOUT') }}}; - } else { - if (events & {{{ cDefine('POLLNVAL') }}}) revents |= {{{ cDefine('POLLNVAL') }}}; + mask = ___DEFAULT_POLLMASK; + if (stream.stream_ops.poll) { + mask = stream.stream_ops.poll(stream); + } } - if (revents) nonzero++; - {{{ makeSetValue('pollfd', 'offsets.revents', 'revents', 'i16') }}} + mask &= events | {{{ cDefine('POLLERR') }}} | {{{ cDefine('POLLHUP') }}}; + if (mask) nonzero++; + {{{ makeSetValue('pollfd', 'offsets.revents', 'mask', 'i16') }}} } return nonzero; }, @@ -723,7 +724,7 @@ LibraryManager.library = { FS.close(stream); return 0; } catch (e) { - FS.handleFSError(e);; + FS.handleFSError(e); return -1; } }, @@ -1006,9 +1007,11 @@ LibraryManager.library = { return -1; } +#if SOCKET_WEBRTC if (stream && ('socket' in stream)) { return _recv(fildes, buf, nbyte, 0); } +#endif try { var slab = {{{ makeGetSlabs('buf', 'i8', true) }}}; @@ -1139,9 +1142,11 @@ LibraryManager.library = { return -1; } +#if SOCKET_WEBRTC if (stream && ('socket' in stream)) { return _send(fildes, buf, nbyte, 0); } +#endif try { var slab = {{{ makeGetSlabs('buf', 'i8', true) }}}; @@ -7537,7 +7542,7 @@ LibraryManager.library = { ['i32', 'sin6_family'], ['i16', 'sin6_port'], ['i32', 'sin6_flowinfo'], - ['b128', 'sin6_addr'], + ['b16', 'sin6_addr'], ['i32', 'sin6_scope_id'] ]), msghdr_layout: Runtime.generateStructInfo([ @@ -7549,6 +7554,10 @@ LibraryManager.library = { ['i32', 'msg_controllen'], ['i32', 'msg_flags'], ]), + iovec_layout: Runtime.generateStructInfo([ + ['i8*', 'iov_base'], + ['i32', 'iov_len'] + ]) }, #if SOCKET_WEBRTC @@ -7994,424 +8003,471 @@ LibraryManager.library = { return {}; }, - socket__deps: ['$FS', '$Sockets'], + socket__deps: ['$FS', '$SOCKFS'], socket: function(family, type, protocol) { - var stream = type == {{{ cDefine('SOCK_STREAM') }}}; - if (protocol) { - assert(stream == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp + var sock = SOCKFS.createSocket(family, type, protocol); + assert(sock.stream.fd < 64); // select() assumes socket fd values are in 0..63 + return sock.stream.fd; + }, + + socketpair__deps: ['$ERRNO_CODES', '__setErrNo'], + socketpair: function(domain, type, protocol, sv) { + // int socketpair(int domain, int type, int protocol, int sv[2]); + // http://pubs.opengroup.org/onlinepubs/009695399/functions/socketpair.html + ___setErrNo(ERRNO_CODES.EOPNOTSUPP); + return -1; + }, + + shutdown__deps: ['$SOCKFS', '$ERRNO_CODES', '__setErrNo'], + shutdown: function(fd, how) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - var stream = FS.createStream({ - connected: false, - stream: stream, - socket: true, - stream_ops: {} - }); - assert(stream.fd < 64); // select() assumes socket fd values are in 0..63 - return stream.fd; + _close(fd); }, - connect__deps: ['$FS', '$Sockets', '_inet_ntop4_raw', 'ntohs', '_lookup_addr'], - connect: function(fd, addr, addrlen) { - var info = FS.getStream(fd); - if (!info) return -1; - // TODO support ipv6 - info.connected = true; - info.addr = getValue(addr + Sockets.sockaddr_in_layout.sin_addr, 'i32'); - info.port = _htons(getValue(addr + Sockets.sockaddr_in_layout.sin_port, 'i16')); - info.host = __inet_ntop4_raw(info.addr); - // The incoming address could perhaps be a fake address generated by gethostbyname, - // look it up to be sure. - var lookup = __lookup_addr(info.host); - if (lookup) { - info.host = lookup; + bind__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_addr', '_read_sockaddr'], + bind: function(fd, addrp, addrlen) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - try { - console.log('opening ws://' + info.host + ':' + info.port); - info.socket = new WebSocket('ws://' + info.host + ':' + info.port, ['binary']); - info.socket.binaryType = 'arraybuffer'; - var i32Temp = new Uint32Array(1); - var i8Temp = new Uint8Array(i32Temp.buffer); + var info = __read_sockaddr(addrp, addrlen); + if (info.errno) { + ___setErrNo(info.errno); + return -1; + } + var port = info.port; + var addr = __lookup_addr(info.addr) || info.addr; - info.inQueue = []; - info.hasData = function() { return info.inQueue.length > 0 } - if (!info.stream) { - var partialBuffer = null; // in datagram mode, inQueue contains full dgram messages; this buffers incomplete data. Must begin with the beginning of a message - } + try { + sock.sock_ops.bind(sock, addr, port); + return 0; + } catch (e) { + FS.handleFSError(e); + return -1; + } + }, - info.socket.onmessage = function(event) { - assert(typeof event.data !== 'string' && event.data.byteLength); // must get binary data! - var data = new Uint8Array(event.data); // make a typed array view on the array buffer -#if SOCKET_DEBUG - Module.print(['onmessage', data.length, '|', Array.prototype.slice.call(data)]); -#endif - if (info.stream) { - info.inQueue.push(data); - } else { - // we added headers with message sizes, read those to find discrete messages - if (partialBuffer) { - // append to the partial buffer - var newBuffer = new Uint8Array(partialBuffer.length + data.length); - newBuffer.set(partialBuffer); - newBuffer.set(data, partialBuffer.length); - // forget the partial buffer and work on data - data = newBuffer; - partialBuffer = null; - } - var currPos = 0; - while (currPos+4 < data.length) { - i8Temp.set(data.subarray(currPos, currPos+4)); - var currLen = i32Temp[0]; - assert(currLen > 0); - if (currPos + 4 + currLen > data.length) { - break; // not enough data has arrived - } - currPos += 4; -#if SOCKET_DEBUG - Module.print(['onmessage message', currLen, '|', Array.prototype.slice.call(data.subarray(currPos, currPos+currLen))]); -#endif - info.inQueue.push(data.subarray(currPos, currPos+currLen)); - currPos += currLen; - } - // If data remains, buffer it - if (currPos < data.length) { - partialBuffer = data.subarray(currPos); - } - } - } - function send(data) { - // TODO: if browser accepts views, can optimize this -#if SOCKET_DEBUG - Module.print('sender actually sending ' + Array.prototype.slice.call(data)); -#endif - // ok to use the underlying buffer, we created data and know that the buffer starts at the beginning - info.socket.send(data.buffer); - } - var outQueue = []; - var intervalling = false, interval; - function trySend() { - if (info.socket.readyState != info.socket.OPEN) { - if (!intervalling) { - intervalling = true; - console.log('waiting for socket in order to send'); - interval = setInterval(trySend, 100); - } - return; - } - for (var i = 0; i < outQueue.length; i++) { - send(outQueue[i]); - } - outQueue.length = 0; - if (intervalling) { - intervalling = false; - clearInterval(interval); - } - } - info.sender = function(data) { - if (!info.stream) { - // add a header with the message size - var header = new Uint8Array(4); - i32Temp[0] = data.length; - header.set(i8Temp); - outQueue.push(header); - } - outQueue.push(new Uint8Array(data)); - trySend(); - }; - } catch(e) { - Module.printErr('Error in connect(): ' + e); - ___setErrNo(ERRNO_CODES.EACCES); + connect__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_addr', '_read_sockaddr'], + connect: function(fd, addrp, addrlen) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); return -1; } - // always "fail" in non-blocking mode - ___setErrNo(ERRNO_CODES.EINPROGRESS); - return -1; + var info = __read_sockaddr(addrp, addrlen); + if (info.errno) { + ___setErrNo(info.errno); + return -1; + } + var port = info.port; + var addr = __lookup_addr(info.addr) || info.addr; + + try { + sock.sock_ops.connect(sock, addr, port); + return 0; + } catch (e) { + FS.handleFSError(e); + return -1; + } }, - recv__deps: ['$FS'], - recv: function(fd, buf, len, flags) { - var info = FS.getStream(fd); - if (!info) { + listen__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo'], + listen: function(fd, backlog) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { ___setErrNo(ERRNO_CODES.EBADF); return -1; } -#if SOCKET_WEBRTC == 0 - if (!info.hasData()) { - if (info.socket.readyState === WebSocket.CLOSING || info.socket.readyState === WebSocket.CLOSED) { - // socket has closed - return 0; - } else { - // else, our socket is in a valid state but truly has nothing available - ___setErrNo(ERRNO_CODES.EAGAIN); - return -1; - } + try { + sock.sock_ops.listen(sock, backlog); + return 0; + } catch (e) { + FS.handleFSError(e); + return -1; } -#endif - var buffer = info.inQueue.shift(); -#if SOCKET_DEBUG - Module.print('recv: ' + [Array.prototype.slice.call(buffer)]); -#endif - if (len < buffer.length) { - if (info.stream) { - // This is tcp (reliable), so if not all was read, keep it - info.inQueue.unshift(buffer.subarray(len)); -#if SOCKET_DEBUG - Module.print('recv: put back: ' + (len - buffer.length)); -#endif + }, + + accept__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_write_sockaddr', 'socket'], + accept: function(fd, addrp, addrlen) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } + var newfd = _socket(sock.family, sock.type, sock.protocol); + var newsock = SOCKFS.getSocket(newfd); + assert(newsock); + try { + sock.sock_ops.accept(sock, newsock, sock.stream.flags); + if (addrp) { + var res = __write_sockaddr(addr, newsock.family, __lookup_name(newsock.daddr), newsock.dport); + assert(!res.errno); } - buffer = buffer.subarray(0, len); + return newfd; + } catch (e) { + FS.handleFSError(e); + return -1; } - HEAPU8.set(buffer, buf); - return buffer.length; }, - send__deps: ['$FS'], - send: function(fd, buf, len, flags) { - var info = FS.getStream(fd); - if (!info) { + getsockname__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_write_sockaddr', '_lookup_name', '_inet_pton_raw'], + getsockname: function (fd, addr, addrlen) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { ___setErrNo(ERRNO_CODES.EBADF); return -1; } -#if SOCKET_WEBRTC == 0 - if (info.socket.readyState === WebSocket.CLOSING || info.socket.readyState === WebSocket.CLOSED) { - ___setErrNo(ERRNO_CODES.ENOTCONN); - return -1; - } else if (info.socket.readyState === WebSocket.CONNECTING) { - ___setErrNo(ERRNO_CODES.EAGAIN); + try { + var info = sock.sock_ops.getname(sock); + var res = __write_sockaddr(addr, sock.family, __lookup_name(info.addr), info.port); + assert(!res.errno); + return 0; + } catch (e) { + FS.handleFSError(e); return -1; } -#endif - info.sender(HEAPU8.subarray(buf, buf+len)); - return len; }, - sendmsg__deps: ['$FS', '$Sockets', 'connect'], - sendmsg: function(fd, msg, flags) { - var info = FS.getStream(fd); - if (!info) return -1; - // if we are not connected, use the address info in the message - if (!info.connected) { - var name = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_name', '*') }}}; - assert(name, 'sendmsg on non-connected socket, and no name/address in the message'); - _connect(fd, name, {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_namelen', 'i32') }}}); + getpeername__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_write_sockaddr', '_lookup_name', '_inet_pton_raw'], + getpeername: function (fd, addr, addrlen) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - var iov = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_iov', 'i8*') }}}; - var num = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_iovlen', 'i32') }}}; -#if SOCKET_DEBUG - Module.print('sendmsg vecs: ' + num); -#endif - var totalSize = 0; - for (var i = 0; i < num; i++) { - totalSize += {{{ makeGetValue('iov', '8*i + 4', 'i32') }}}; + try { + var info = sock.sock_ops.getname(sock, true); + var res = __write_sockaddr(addr, sock.family, __lookup_name(info.addr), info.port); + assert(!res.errno); + return 0; + } catch (e) { + FS.handleFSError(e); + return -1; } - var buffer = new Uint8Array(totalSize); - var ret = 0; - for (var i = 0; i < num; i++) { - var currNum = {{{ makeGetValue('iov', '8*i + 4', 'i32') }}}; -#if SOCKET_DEBUG - Module.print('sendmsg curr size: ' + currNum); -#endif - if (!currNum) continue; - var currBuf = {{{ makeGetValue('iov', '8*i', 'i8*') }}}; - buffer.set(HEAPU8.subarray(currBuf, currBuf+currNum), ret); - ret += currNum; + }, + + send__deps: ['$SOCKFS', '$ERRNO_CODES', '__setErrNo', 'write'], + send: function(fd, buf, len, flags) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - info.sender(buffer); // send all the iovs as a single message - return ret; + // TODO honor flags + return _write(fd, buf, len); }, - recvmsg__deps: ['$FS', '$Sockets', 'connect', 'recv', '__setErrNo', '$ERRNO_CODES', 'htons'], - recvmsg: function(fd, msg, flags) { - var info = FS.getStream(fd); - if (!info) return -1; - // if we are not connected, use the address info in the message - if (!info.connected) { -#if SOCKET_DEBUG - Module.print('recvmsg connecting'); -#endif - var name = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_name', '*') }}}; - assert(name, 'sendmsg on non-connected socket, and no name/address in the message'); - _connect(fd, name, {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_namelen', 'i32') }}}); + recv__deps: ['$SOCKFS', '$ERRNO_CODES', '__setErrNo', 'read'], + recv: function(fd, buf, len, flags) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - if (!info.hasData()) { - ___setErrNo(ERRNO_CODES.EWOULDBLOCK); + // TODO honor flags + return _read(fd, buf, len); + }, + + sendto__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_addr', '_read_sockaddr'], + sendto: function(fd, message, length, flags, dest_addr, dest_len) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); return -1; } - var buffer = info.inQueue.shift(); - var bytes = buffer.length; -#if SOCKET_DEBUG - Module.print('recvmsg bytes: ' + bytes); -#endif - // write source - var name = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_name', '*') }}}; - {{{ makeSetValue('name', 'Sockets.sockaddr_in_layout.sin_addr', 'info.addr', 'i32') }}}; - {{{ makeSetValue('name', 'Sockets.sockaddr_in_layout.sin_port', '_htons(info.port)', 'i16') }}}; - // write data - var ret = bytes; - var iov = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_iov', 'i8*') }}}; - var num = {{{ makeGetValue('msg', 'Sockets.msghdr_layout.msg_iovlen', 'i32') }}}; - var bufferPos = 0; - for (var i = 0; i < num && bytes > 0; i++) { - var currNum = {{{ makeGetValue('iov', '8*i + 4', 'i32') }}}; -#if SOCKET_DEBUG - Module.print('recvmsg loop ' + [i, num, bytes, currNum]); -#endif - if (!currNum) continue; - currNum = Math.min(currNum, bytes); // XXX what should happen when we partially fill a buffer..? - bytes -= currNum; - var currBuf = {{{ makeGetValue('iov', '8*i', 'i8*') }}}; -#if SOCKET_DEBUG - Module.print('recvmsg call recv ' + currNum); -#endif - HEAPU8.set(buffer.subarray(bufferPos, bufferPos + currNum), currBuf); - bufferPos += currNum; + + // read the address and port to send to + var info = __read_sockaddr(dest_addr, dest_len); + if (info.errno) { + ___setErrNo(info.errno); + return -1; } - if (info.stream) { - // This is tcp (reliable), so if not all was read, keep it - if (bufferPos < bytes) { - info.inQueue.unshift(buffer.subarray(bufferPos)); -#if SOCKET_DEBUG - Module.print('recvmsg: put back: ' + (bytes - bufferPos)); -#endif - } + var port = info.port; + var addr = __lookup_addr(info.addr) || info.addr; + + // send the message + try { + var slab = {{{ makeGetSlabs('message', 'i8', true) }}}; + return sock.sock_ops.sendmsg(sock, slab, message, length, addr, port); + } catch (e) { + FS.handleFSError(e); + return -1; } - return ret; }, - recvfrom__deps: ['$FS', 'connect', 'recv'], + recvfrom__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_name', '_write_sockaddr'], recvfrom: function(fd, buf, len, flags, addr, addrlen) { - var info = FS.getStream(fd); - if (!info) return -1; - // if we are not connected, use the address info in the message - if (!info.connected) { - //var name = {{{ makeGetValue('addr', '0', '*') }}}; - _connect(fd, addr, addrlen); + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; } - return _recv(fd, buf, len, flags); - }, - shutdown__deps: ['$FS'], - shutdown: function(fd, how) { - var stream = FS.getStream(fd); - if (!stream) return -1; - stream.socket.close(); - FS.closeStream(stream); - }, + // read from the socket + var msg; + try { + msg = sock.sock_ops.recvmsg(sock, len); + } catch (e) { + FS.handleFSError(e); + return -1; + } - ioctl__deps: ['$FS'], - ioctl: function(fd, request, varargs) { - var info = FS.getStream(fd); - if (!info) return -1; - var bytes = 0; - if (info.hasData()) { - bytes = info.inQueue[0].length; + if (!msg) { + // socket is closed + return 0; } - var dest = {{{ makeGetValue('varargs', '0', 'i32') }}}; - {{{ makeSetValue('dest', '0', 'bytes', 'i32') }}}; - return 0; + + // write the source address out + if (addr) { + var res = __write_sockaddr(addr, sock.family, __lookup_name(msg.addr), msg.port); + assert(!res.errno); + } + // write the buffer out + HEAPU8.set(msg.buffer, buf); + + return msg.buffer.byteLength; }, - setsockopt: function(d, level, optname, optval, optlen) { - console.log('ignoring setsockopt command'); - return 0; + sendmsg__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_addr', '_read_sockaddr'], + sendmsg: function(fd, message, flags) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } + + var iov = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_iov', '*') }}}; + var num = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_iovlen', 'i32') }}}; + + // read the address and port to send to + var addr; + var port; + var name = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_name', '*') }}}; + var namelen = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_namelen', 'i32') }}}; + if (name) { + var info = __read_sockaddr(name, namelen); + if (info.errno) { + ___setErrNo(info.errno); + return -1; + } + port = info.port; + addr = __lookup_addr(info.addr) || info.addr; + } + + // concatenate scatter-gather arrays into one message buffer + var total = 0; + for (var i = 0; i < num; i++) { + total += {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_len', 'i32') }}}; + } + var view = new Uint8Array(total); + var offset = 0; + for (var i = 0; i < num; i++) { + var iovbase = {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_base', 'i8*') }}}; + var iovlen = {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_len', 'i32') }}}; + for (var j = 0; j < iovlen; j++) { + view[offset++] = {{{ makeGetValue('iovbase', 'j', 'i8') }}}; + } + } + + // write the buffer + try { + return sock.sock_ops.sendmsg(sock, view, 0, total, addr, port); + } catch (e) { + FS.handleFSError(e); + return -1; + } }, - bind__deps: ['connect'], - bind: function(fd, addr, addrlen) { - _connect(fd, addr, addrlen); - return 0; + recvmsg__deps: ['$FS', '$SOCKFS', '$ERRNO_CODES', '__setErrNo', '_lookup_name', '_inet_pton_raw', '_write_sockaddr'], + recvmsg: function(fd, message, flags) { + var sock = SOCKFS.getSocket(fd); + if (!sock) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } + + var iov = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_iov', 'i8*') }}}; + var num = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_iovlen', 'i32') }}}; + + // get the total amount of data we can read across all arrays + var total = 0; + for (var i = 0; i < num; i++) { + total += {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_len', 'i32') }}}; + } + + // try to read total data + var msg; + try { + msg = sock.sock_ops.recvmsg(sock, total); + } catch (e) { + FS.handleFSError(e); + return -1; + } + + if (!msg) { + // socket is closed + return 0; + } + + // TODO honor flags: + // MSG_OOB + // Requests out-of-band data. The significance and semantics of out-of-band data are protocol-specific. + // MSG_PEEK + // Peeks at the incoming message. + // MSG_WAITALL + // Requests that the function block until the full amount of data requested can be returned. The function may return a smaller amount of data if a signal is caught, if the connection is terminated, if MSG_PEEK was specified, or if an error is pending for the socket. + + // write the source address out + var name = {{{ makeGetValue('message', 'Sockets.msghdr_layout.msg_name', '*') }}}; + if (name) { + var res = __write_sockaddr(name, sock.family, __lookup_name(msg.addr), msg.port); + assert(!res.errno); + } + // write the buffer out to the scatter-gather arrays + var bytesRead = 0; + var bytesRemaining = msg.buffer.byteLength; + + for (var i = 0; bytesRemaining > 0 && i < num; i++) { + var iovbase = {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_base', 'i8*') }}}; + var iovlen = {{{ makeGetValue('iov', '(Sockets.iovec_layout.__size__ * i) + Sockets.iovec_layout.iov_len', 'i32') }}}; + if (!iovlen) { + continue; + } + var length = Math.min(iovlen, bytesRemaining); + var buf = msg.buffer.subarray(bytesRead, bytesRead + length); + HEAPU8.set(buf, iovbase + bytesRead); + bytesRead += length; + bytesRemaining -= length; + } + + // TODO set msghdr.msg_flags + // MSG_EOR + // End of record was received (if supported by the protocol). + // MSG_OOB + // Out-of-band data was received. + // MSG_TRUNC + // Normal data was truncated. + // MSG_CTRUNC + + return bytesRead; }, - listen: function(fd, backlog) { + setsockopt: function(fd, level, optname, optval, optlen) { + console.log('ignoring setsockopt command'); return 0; }, - accept__deps: ['$FS', '$Sockets'], - accept: function(fd, addr, addrlen) { - // TODO: webrtc queued incoming connections, etc. - // For now, the model is that bind does a connect, and we "accept" that one connection, - // which has host:port the same as ours. We also return the same socket fd. - var info = FS.getStream(fd); - if (!info) return -1; - if (addr) { - setValue(addr + Sockets.sockaddr_in_layout.sin_addr, info.addr, 'i32'); - setValue(addr + Sockets.sockaddr_in_layout.sin_port, info.port, 'i32'); - setValue(addrlen, Sockets.sockaddr_in_layout.__size__, 'i32'); - } - return fd; - }, + // ========================================================================== + // select.h + // ========================================================================== - select__deps: ['$FS'], + select__deps: ['$FS', '__DEFAULT_POLLMASK'], select: function(nfds, readfds, writefds, exceptfds, timeout) { // readfds are supported, // writefds checks socket open status // exceptfds not supported // timeout is always 0 - fully async - assert(!exceptfds); - - var errorCondition = 0; + assert(nfds <= 64, 'nfds must be less than or equal to 64'); // fd sets have 64 bits + assert(!exceptfds, 'exceptfds not supported'); - function canRead(info) { - return (info.hasData && info.hasData()) || - info.socket.readyState == WebSocket.CLOSING || // let recv return 0 once closed - info.socket.readyState == WebSocket.CLOSED; - } + var total = 0; + + var srcReadLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0), + srcReadHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0); + var srcWriteLow = (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0), + srcWriteHigh = (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0); + var srcExceptLow = (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0), + srcExceptHigh = (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0); + + var dstReadLow = 0, + dstReadHigh = 0; + var dstWriteLow = 0, + dstWriteHigh = 0; + var dstExceptLow = 0, + dstExceptHigh = 0; + + var allLow = (readfds ? {{{ makeGetValue('readfds', 0, 'i32') }}} : 0) | + (writefds ? {{{ makeGetValue('writefds', 0, 'i32') }}} : 0) | + (exceptfds ? {{{ makeGetValue('exceptfds', 0, 'i32') }}} : 0); + var allHigh = (readfds ? {{{ makeGetValue('readfds', 4, 'i32') }}} : 0) | + (writefds ? {{{ makeGetValue('writefds', 4, 'i32') }}} : 0) | + (exceptfds ? {{{ makeGetValue('exceptfds', 4, 'i32') }}} : 0); + + function get(fd, low, high, val) { + return (fd < 32 ? (low & val) : (high & val)); + } + + for (var fd = 0; fd < nfds; fd++) { + var mask = 1 << (fd % 32); + if (!(get(fd, allLow, allHigh, mask))) { + continue; // index isn't in the set + } - function canWrite(info) { - return info.socket && (info.socket.readyState == info.socket.OPEN); - } + var stream = FS.getStream(fd); + if (!stream) { + ___setErrNo(ERRNO_CODES.EBADF); + return -1; + } - function checkfds(nfds, fds, can) { - if (!fds) return 0; + var flags = ___DEFAULT_POLLMASK; - var bitsSet = 0; - var dstLow = 0; - var dstHigh = 0; - var srcLow = {{{ makeGetValue('fds', 0, 'i32') }}}; - var srcHigh = {{{ makeGetValue('fds', 4, 'i32') }}}; - nfds = Math.min(64, nfds); // fd sets have 64 bits + if (stream.stream_ops.poll) { + flags = stream.stream_ops.poll(stream); + } - for (var fd = 0; fd < nfds; fd++) { - var mask = 1 << (fd % 32), int_ = fd < 32 ? srcLow : srcHigh; - if (int_ & mask) { - // index is in the set, check if it is ready for read - var info = FS.getStream(fd); - if (!info) { - ___setErrNo(ERRNO_CODES.EBADF); - return -1; - } - if (can(info)) { - // set bit - fd < 32 ? (dstLow = dstLow | mask) : (dstHigh = dstHigh | mask); - bitsSet++; - } - } + if ((flags & {{{ cDefine('POLLIN') }}}) && get(fd, srcReadLow, srcReadHigh, mask)) { + fd < 32 ? (dstReadLow = dstReadLow | mask) : (dstReadHigh = dstReadHigh | mask); + total++; + } + if ((flags & {{{ cDefine('POLLOUT') }}}) && get(fd, srcWriteLow, srcWriteHigh, mask)) { + fd < 32 ? (dstWriteLow = dstWriteLow | mask) : (dstWriteHigh = dstWriteHigh | mask); + total++; } + if ((flags & {{{ cDefine('POLLPRI') }}}) && get(fd, srcExceptLow, srcExceptHigh, mask)) { + fd < 32 ? (dstExceptLow = dstExceptLow | mask) : (dstExceptHigh = dstExceptHigh | mask); + total++; + } + } - {{{ makeSetValue('fds', 0, 'dstLow', 'i32') }}}; - {{{ makeSetValue('fds', 4, 'dstHigh', 'i32') }}}; - return bitsSet; + if (readfds) { + {{{ makeSetValue('readfds', '0', 'dstReadLow', 'i32') }}}; + {{{ makeSetValue('readfds', '4', 'dstReadHigh', 'i32') }}}; } + if (writefds) { + {{{ makeSetValue('writefds', '0', 'dstWriteLow', 'i32') }}}; + {{{ makeSetValue('writefds', '4', 'dstWriteHigh', 'i32') }}}; + } + if (exceptfds) { + {{{ makeSetValue('exceptfds', '0', 'dstExceptLow', 'i32') }}}; + {{{ makeSetValue('exceptfds', '4', 'dstExceptHigh', 'i32') }}}; + } + + return total; + }, - var totalHandles = checkfds(nfds, readfds, canRead) + checkfds(nfds, writefds, canWrite); - if (errorCondition) { + // ========================================================================== + // sys/ioctl.h + // ========================================================================== + + ioctl__deps: ['$FS'], + ioctl: function(fd, request, varargs) { + var stream = FS.getStream(fd); + if (!stream) { ___setErrNo(ERRNO_CODES.EBADF); return -1; - } else { - return totalHandles; } + var arg = {{{ makeGetValue('varargs', '0', 'i32') }}}; + return FS.ioctl(stream, request, arg); }, #endif - socketpair__deps: ['__setErrNo', '$ERRNO_CODES'], - socketpair: function(domain, type, protocol, sv) { - // int socketpair(int domain, int type, int protocol, int sv[2]); - // http://pubs.opengroup.org/onlinepubs/009695399/functions/socketpair.html - ___setErrNo(ERRNO_CODES.EOPNOTSUPP); - return -1; - }, - // pty.h openpty: function() { throw 'openpty: TODO' }, diff --git a/src/library_fs.js b/src/library_fs.js index 63ad7c8d..77066059 100644 --- a/src/library_fs.js +++ b/src/library_fs.js @@ -159,6 +159,9 @@ mergeInto(LibraryManager.library, { isFIFO: function(mode) { return (mode & {{{ cDefine('S_IFMT') }}}) === {{{ cDefine('S_IFIFO') }}}; }, + isSocket: function(mode) { + return (mode & {{{ cDefine('S_IFSOCK') }}}) === {{{ cDefine('S_IFSOCK') }}}; + }, // // paths @@ -400,6 +403,9 @@ mergeInto(LibraryManager.library, { getStream: function(fd) { return FS.streams[fd]; }, + // TODO parameterize this function such that a stream + // object isn't directly passed in. not possible until + // SOCKFS is completed. createStream: function(stream, fd_start, fd_end) { var fd = FS.nextfd(fd_start, fd_end); stream.fd = fd; @@ -1459,6 +1465,12 @@ mergeInto(LibraryManager.library, { throw new FS.errnoError(ERRNO_CODES.ENODEV); } return stream.stream_ops.mmap(stream, buffer, offset, length, position, prot, flags); + }, + ioctl: function(stream, cmd, arg) { + if (!stream.stream_ops.ioctl) { + throw new FS.ErrnoError(ERRNO_CODES.ENOTTY); + } + return stream.stream_ops.ioctl(stream, cmd, arg); } } }); diff --git a/src/library_sockfs.js b/src/library_sockfs.js index 13118b71..0736224d 100644 --- a/src/library_sockfs.js +++ b/src/library_sockfs.js @@ -1,18 +1,560 @@ mergeInto(LibraryManager.library, { - $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', + $SOCKFS__postset: '__ATINIT__.push({ func: function() { if (ENVIRONMENT_IS_NODE) { WebSocket = require("ws"); } } });\n' + + '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', $SOCKFS__deps: ['$FS'], $SOCKFS: { mount: function(mount) { - var node = FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); - node.node_ops = SOCKFS.node_ops; - node.stream_ops = SOCKFS.stream_ops; - return node; + return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); }, - node_ops: { + nextname: function() { + if (!SOCKFS.nextname.current) { + SOCKFS.nextname.current = 0; + } + return 'socket[' + (SOCKFS.nextname.current++) + ']'; }, + createSocket: function(family, type, protocol) { + var streaming = type == {{{ cDefine('SOCK_STREAM') }}}; + if (protocol) { + assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp + } + + // create our internal socket structure + var sock = { + family: family, + type: type, + protocol: protocol, + server: null, + peers: {}, + pending: [], + recv_queue: [], +#if SOCKET_WEBRTC +#else + sock_ops: SOCKFS.websocket_sock_ops +#endif + }; + + // create the filesystem node to store the socket structure + var name = SOCKFS.nextname(); + var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0); + node.sock = sock; + + // and the wrapping stream that enables library functions such + // as read and write to indirectly interact with the socket + var stream = FS.createStream({ + path: name, + node: node, + flags: FS.modeStringToFlags('r+'), + seekable: false, + stream_ops: SOCKFS.stream_ops + }); + + // map the new stream to the socket structure (sockets have a 1:1 + // relationship with a stream) + sock.stream = stream; + + return sock; + }, + getSocket: function(fd) { + var stream = FS.getStream(fd); + if (!stream || !FS.isSocket(stream.node.mode)) { + return null; + } + return stream.node.sock; + }, + // node and stream ops are backend agnostic stream_ops: { + poll: function(stream) { + var sock = stream.node.sock; + return sock.sock_ops.poll(sock); + }, + ioctl: function(stream, request, varargs) { + var sock = stream.node.sock; + return sock.sock_ops.ioctl(sock, request, varargs); + }, + read: function(stream, buffer, offset, length, position /* ignored */) { + var sock = stream.node.sock; + var msg = sock.sock_ops.recvmsg(sock, length); + if (!msg) { + // socket is closed + return 0; + } +#if USE_TYPED_ARRAYS == 2 + buffer.set(msg.buffer, offset); +#else + for (var i = 0; i < size; i++) { + buffer[offset + i] = msg.buffer[i]; + } +#endif + return msg.buffer.length; + }, + write: function(stream, buffer, offset, length, position /* ignored */) { + var sock = stream.node.sock; + return sock.sock_ops.sendmsg(sock, buffer, offset, length); + }, + close: function(stream) { + var sock = stream.node.sock; + sock.sock_ops.close(sock); + } }, + // backend-specific stream ops websocket_sock_ops: { + // + // peers are a small wrapper around a WebSocket to help in + // emulating dgram sockets + // + // these functions aren't actually sock_ops members, but we're + // abusing the namespace to organize them + // + createPeer: function(sock, addr, port) { + var ws; + + if (typeof addr === 'object') { + ws = addr; + addr = null; + port = null; + } + + if (ws) { + // for sockets that've already connected (e.g. we're the server) + // we can inspect the _socket property for the address + if (ws._socket) { + addr = ws._socket.remoteAddress; + port = ws._socket.remotePort; + } + // if we're just now initializing a connection to the remote, + // inspect the url property + else { + var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url); + if (!result) { + throw new Error('WebSocket URL must be in the format ws(s)://address:port'); + } + addr = result[1]; + port = parseInt(result[2], 10); + } + } else { + // create the actual websocket object and connect + try { + var url = 'ws://' + addr + ':' + port; +#if SOCKET_DEBUG + console.log('connect: ' + url); +#endif + // the node ws library API is slightly different than the browser's + var opts = ENVIRONMENT_IS_NODE ? {} : ['binary']; + ws = new WebSocket(url, opts); + ws.binaryType = 'arraybuffer'; + } catch (e) { + throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH); + } + } + + var peer = { + addr: addr, + port: port, + socket: ws, + send_queue: [] + }; + + SOCKFS.websocket_sock_ops.addPeer(sock, peer); + SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer); + + // if this is a bound dgram socket, send the port number first to allow + // us to override the ephemeral port reported to us by remotePort on the + // remote end. + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') { + peer.send_queue.push(new Uint8Array([ + 255, 255, 255, 255, + 'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0), + ((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff) + ])); + } + + return peer; + }, + getPeer: function(sock, addr, port) { + return sock.peers[addr + ':' + port]; + }, + addPeer: function(sock, peer) { + sock.peers[peer.addr + ':' + peer.port] = peer; + }, + removePeer: function(sock, peer) { + delete sock.peers[peer.addr + ':' + peer.port]; + }, + handlePeerEvents: function(sock, peer) { + var first = true; + + var handleOpen = function () { + try { + var queued = peer.send_queue.shift(); + while (queued) { +#if SOCKET_DEBUG + Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]); +#endif + peer.socket.send(queued); + queued = peer.send_queue.shift(); + } + } catch (e) { + peer.socket.close(); + } + }; + + var handleMessage = function(data) { + assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer + data = new Uint8Array(data); // make a typed array view on the array buffer + + // if this is the port message, override the peer's port with it + var wasfirst = first; + first = false; + if (wasfirst && + data.length === 10 && + data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 && + data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) { + // update the peer's port and it's key in the peer map + var newport = ((data[8] << 8) | data[9]); + SOCKFS.websocket_sock_ops.removePeer(sock, peer); + peer.port = newport; + SOCKFS.websocket_sock_ops.addPeer(sock, peer); + return; + } + + sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); + }; + + if (ENVIRONMENT_IS_NODE) { + peer.socket.on('open', handleOpen); + peer.socket.on('message', function(data, flags) { + if (!flags.binary) { + return; + } + handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer + }); + peer.socket.on('error', function() { + // don't throw + }); + } else { + peer.socket.onopen = handleOpen; + peer.socket.onmessage = function(event) { + handleMessage(event.data); + }; + } + }, + + // + // actual sock ops + // + poll: function(sock) { + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { + // listen sockets should only say they're available for reading + // if there are pending clients. + return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0; + } + + var mask = 0; + var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets + SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) : + null; + + if (sock.recv_queue.length || + !dest || // connection-less sockets are always ready to read + (dest && dest.socket.readyState === WebSocket.CLOSING) || + (dest && dest.socket.readyState === WebSocket.CLOSED)) { // let recv return 0 once closed + mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}); + } + + if (!dest || // connection-less sockets are always ready to write + (dest && dest.socket.readyState === WebSocket.OPEN)) { + mask |= {{{ cDefine('POLLOUT') }}}; + } + + if ((dest && dest.socket.readyState === WebSocket.CLOSING) || + (dest && dest.socket.readyState === WebSocket.CLOSED)) { + mask |= {{{ cDefine('POLLHUP') }}}; + } + + return mask; + }, + ioctl: function(sock, request, arg) { + switch (request) { + case {{{ cDefine('FIONREAD') }}}: + var bytes = 0; + if (sock.recv_queue.length) { + bytes = sock.recv_queue[0].data.length; + } + {{{ makeSetValue('arg', '0', 'bytes', 'i32') }}}; + return 0; + default: + return ERRNO_CODES.EINVAL; + } + }, + close: function(sock) { + // if we've spawned a listen server, close it + if (sock.server) { + try { + sock.server.close(); + } catch (e) { + } + sock.server = null; + } + // close any peer connections + var peers = Object.keys(sock.peers); + for (var i = 0; i < peers.length; i++) { + var peer = sock.peers[peers[i]]; + try { + peer.socket.close(); + } catch (e) { + } + SOCKFS.websocket_sock_ops.removePeer(sock, peer); + } + return 0; + }, + bind: function(sock, addr, port) { + if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound + } + sock.saddr = addr; + sock.sport = port || _mkport(); + // in order to emulate dgram sockets, we need to launch a listen server when + // binding on a connection-less socket + // note: this is only required on the server side + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + // close the existing server if it exists + if (sock.server) { + sock.server.close(); + sock.server = null; + } + // swallow error operation not supported error that occurs when binding in the + // browser where this isn't supported + try { + sock.sock_ops.listen(sock, 0); + } catch (e) { + if (!(e instanceof FS.ErrnoError)) throw e; + if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e; + } + } + }, + connect: function(sock, addr, port) { + if (sock.server) { + throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP); + } + + // TODO autobind + // if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) { + // } + + // early out if we're already connected / in the middle of connecting + if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') { + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); + if (dest) { + if (dest.socket.readyState === WebSocket.CONNECTING) { + throw new FS.ErrnoError(ERRNO_CODES.EALREADY); + } else { + throw new FS.ErrnoError(ERRNO_CODES.EISCONN); + } + } + } + + // add the socket to our peer list and set our + // defination address / port to match + var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); + sock.daddr = peer.addr; + sock.dport = peer.port; + + // always "fail" in non-blocking mode + throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS); + }, + listen: function(sock, backlog) { + if (!ENVIRONMENT_IS_NODE) { + throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP); + } + if (sock.server) { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening + } + var WebSocketServer = WebSocket.Server; + var host = sock.saddr; +#if SOCKET_DEBUG + console.log('listen: ' + host + ':' + sock.sport); +#endif + sock.server = new WebSocketServer({ + host: host, + port: sock.sport + // TODO support backlog + }); + + sock.server.on('connection', function(ws) { +#if SOCKET_DEBUG + console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort); +#endif + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + // push to queue for accept to pick up + sock.pending.push(ws); + } else { + // auto-accept, adding the peer to the listen socket so + // calling sendto with the listen socket and an address + // will resolve to the correct client + SOCKFS.websocket_sock_ops.createPeer(sock, ws); + } + }); + sock.server.on('closed', function() { + sock.server = null; + }); + sock.server.on('error', function() { + // don't throw + }); + }, + accept: function(listensock, newsock, flags) { + if (!listensock.server) { + ___setErrNo(ERRNO_CODES.EINVAL); + return -1; + } + + // create a peer on the server's client socket + var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, listensock.pending.shift()); + newsock.daddr = peer.addr; + newsock.dport = peer.port; + newsock.stream.flags = flags; + + return 0; + }, + getname: function(sock, peer) { + var addr, port; + if (peer) { + if (sock.daddr === undefined || sock.dport === undefined) { + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + addr = sock.daddr; + port = sock.dport; + } else { + // TODO saddr and sport will be set for bind()'d UDP sockets, but what + // should we be returning for TCP sockets that've been connect()'d? + addr = sock.saddr || 0; + port = sock.sport || 0; + } + return { addr: addr, port: port }; + }, + sendmsg: function(sock, buffer, offset, length, addr, port) { + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + // connection-less sockets will honor the message address, + // and otherwise fall back to the bound destination address + if (addr === undefined || port === undefined) { + addr = sock.daddr; + port = sock.dport; + } + // if there was no address to fall back to, error out + if (addr === undefined || port === undefined) { + throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ); + } + } else { + // connection-based sockets will only use the bound + addr = sock.daddr; + port = sock.dport; + } + + // find the peer for the destination address + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port); + + // early out if not connected with a connection-based socket + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + if (!dest || dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) { + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } else if (dest.socket.readyState === WebSocket.CONNECTING) { + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } + + // create a copy of the incoming data to send, as the WebSocket API + // doesn't work entirely with an ArrayBufferView, it'll just send + // the entire underlying buffer + var data; + if (buffer instanceof Array || buffer instanceof ArrayBuffer) { + data = buffer.slice(offset, offset + length); + } else { // ArrayBufferView + data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length); + } + + // if we're emulating a connection-less dgram socket and don't have + // a cached connection, queue the buffer to send upon connect and + // lie, saying the data was sent now. + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + if (!dest || dest.socket.readyState !== dest.socket.OPEN) { + // if we're not connected, open a new connection + if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) { + dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); + } +#if SOCKET_DEBUG + Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); +#endif + dest.send_queue.push(data); + return length; + } + } + + try { +#if SOCKET_DEBUG + Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); +#endif + // send the actual data + dest.socket.send(data); + return length; + } catch (e) { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); + } + }, + recvmsg: function(sock, length) { + // http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { + // tcp servers should not be recv()'ing on the listen socket + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + + var queued = sock.recv_queue.shift(); + if (!queued) { + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); + + if (!dest) { + // if we have a destination address but are not connected, error out + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + else if (dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) { + // return null if the socket has closed + return null; + } + else { + // else, our socket is in a valid state but truly has nothing available + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } else { + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } + + // queued.data will be an ArrayBuffer if it's unadulterated, but if it's + // requeued TCP data it'll be an ArrayBufferView + var queuedLength = queued.data.byteLength || queued.data.length; + var queuedOffset = queued.data.byteOffset || 0; + var queuedBuffer = queued.data.buffer || queued.data; + var bytesRead = Math.min(length, queuedLength); + var res = { + buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead), + addr: queued.addr, + port: queued.port + }; + +#if SOCKET_DEBUG + Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]); +#endif + + // push back any unread data for TCP connections + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) { + var bytesRemaining = queuedLength - bytesRead; +#if SOCKET_DEBUG + Module.print('websocket read: put back ' + bytesRemaining + ' bytes'); +#endif + queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining); + sock.recv_queue.unshift(queued); + } + + return res; + } } } });
\ No newline at end of file diff --git a/src/settings.js b/src/settings.js index bf2736b6..258d4960 100644 --- a/src/settings.js +++ b/src/settings.js @@ -719,6 +719,7 @@ var C_DEFINES = { 'PM_STR': '6', 'POLLERR': '8', 'POLLHUP': '16', + 'POLLPRI': '32', 'POLLIN': '1', 'POLLNVAL': '4', 'POLLOUT': '2', diff --git a/tests/runner.py b/tests/runner.py index bbbc23e5..f0e61c4e 100755 --- a/tests/runner.py +++ b/tests/runner.py @@ -63,6 +63,10 @@ class RunnerCore(unittest.TestCase): self.working_dir = dirname os.chdir(dirname) + # Use emscripten root for node module lookup + scriptdir = os.path.dirname(os.path.abspath(__file__)) + os.environ['NODE_PATH'] = os.path.join(scriptdir, '..', 'node_modules') + if not self.save_dir: self.has_prev_ll = False for temp_file in os.listdir(TEMP_DIR): diff --git a/tests/sockets/test_sockets_msg.h b/tests/sockets/test_sockets_msg.h index 30094d65..b60b056a 100644 --- a/tests/sockets/test_sockets_msg.h +++ b/tests/sockets/test_sockets_msg.h @@ -52,6 +52,7 @@ int do_msg_write(int sockfd, msg_t *msg, int offset, int length, struct sockaddr assert(errno == EAGAIN); return res; } + printf("do_msg_write: sending message header for %d bytes\n", msg->length); assert(res == sizeof(int)); } diff --git a/tests/sockets/test_sockets_select_server_down_client.c b/tests/sockets/test_sockets_select_server_down_client.c index e05bd4c8..27e200e0 100644 --- a/tests/sockets/test_sockets_select_server_down_client.c +++ b/tests/sockets/test_sockets_select_server_down_client.c @@ -48,7 +48,7 @@ void iter(void *arg) { char buffer[1024]; int n = recv(sockfd, buffer, sizeof(buffer), 0); if (n == -1 && retries++ > 10) { - perror("revv failed"); + perror("recv failed"); finish(EXIT_FAILURE); } else if (!n) { perror("Connection to websocket server failed as expected."); diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 85813447..3bf1d310 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -77,7 +77,11 @@ class CompiledServerHarness: self.args = args or [] def __enter__(self): - import socket, websockify + # assuming this is only used for WebSocket tests at the moment, validate that + # the ws module is installed + child = Popen([NODE_JS, '-e', 'require("ws");']) + child.communicate() + assert child.returncode == 0, 'ws module for Node.js not installed. Please run \'npm install\' from %s' % EMSCRIPTEN_ROOT # compile the server Popen([PYTHON, EMCC, path_from_root('tests', self.filename), '-o', 'server.js'] + self.args).communicate() @@ -226,48 +230,52 @@ class sockets(BrowserCore): def test_sockets_echo(self): sockets_include = '-I'+path_from_root('tests', 'sockets') - for datagram in [0]: - dgram_define = '-DTEST_DGRAM=%d' % datagram + # Websockify-proxied servers can't run dgram tests + harnesses = [ + (WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8990', sockets_include], 8991, 8990), 0) + ] + for datagram in [0, 1]: + harnesses.append((CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8991', '-DTEST_DGRAM=%d' % datagram, sockets_include]), datagram)) - for harness in [ - WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8990', dgram_define, sockets_include], 8991, 8990) - # CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8990', dgram_define, sockets_include]) - ]: - with harness: - self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=8991', dgram_define, sockets_include]) + for harness, datagram in harnesses: + with harness: + self.btest(os.path.join('sockets', 'test_sockets_echo_client.c'), expected='0', args=['-DSOCKK=8991', '-DTEST_DGRAM=%d' % datagram, sockets_include]) def test_sockets_echo_bigdata(self): sockets_include = '-I'+path_from_root('tests', 'sockets') - for datagram in [0]: - dgram_define = '-DTEST_DGRAM=%d' % datagram + # generate a large string literal to use as our message + message = '' + for i in range(256*256*2): + message += str(unichr(ord('a') + (i % 26))) - # generate a large string literal to use as our message - message = '' - for i in range(256*256*2): - message += str(unichr(ord('a') + (i % 26))) + # re-write the client test with this literal (it's too big to pass via command line) + input_filename = path_from_root('tests', 'sockets', 'test_sockets_echo_client.c') + input = open(input_filename).read() + output = input.replace('#define MESSAGE "pingtothepong"', '#define MESSAGE "%s"' % message) - # re-write the client test with this literal (it's too big to pass via command line) - input_filename = path_from_root('tests', 'sockets', 'test_sockets_echo_client.c') - input = open(input_filename).read() - output = input.replace('#define MESSAGE "pingtothepong"', '#define MESSAGE "%s"' % message) + harnesses = [ + (WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8992', sockets_include], 8993, 8992), 0) + ] + for datagram in [0, 1]: + harnesses.append((CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8993', '-DTEST_DGRAM=%d' % datagram, sockets_include]), datagram)) - for harness in [ - WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=8992', dgram_define, sockets_include], 8993, 8992) - ]: - with harness: - self.btest(output, expected='0', args=['-DSOCKK=8993', dgram_define, sockets_include], force_c=True) + for harness, datagram in harnesses: + with harness: + self.btest(output, expected='0', args=['-DSOCKK=8993', '-DTEST_DGRAM=%d' % datagram, sockets_include], force_c=True) def test_sockets_partial(self): for harness in [ - WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_partial_server.c'), ['-DSOCKK=8994'], 8995, 8994) + WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_partial_server.c'), ['-DSOCKK=8994'], 8995, 8994), + CompiledServerHarness(os.path.join('sockets', 'test_sockets_partial_server.c'), ['-DSOCKK=8995']) ]: with harness: self.btest(os.path.join('sockets', 'test_sockets_partial_client.c'), expected='165', args=['-DSOCKK=8995']) def test_sockets_select_server_down(self): for harness in [ - WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_select_server_down_server.c'), ['-DSOCKK=9002'], 9003, 9002) + WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_select_server_down_server.c'), ['-DSOCKK=9002'], 9003, 9002), + CompiledServerHarness(os.path.join('sockets', 'test_sockets_select_server_down_server.c'), ['-DSOCKK=9003']) ]: with harness: self.btest(os.path.join('sockets', 'test_sockets_select_server_down_client.c'), expected='266', args=['-DSOCKK=9003']) @@ -276,15 +284,12 @@ class sockets(BrowserCore): sockets_include = '-I'+path_from_root('tests', 'sockets') for harness in [ - WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=9004', sockets_include], 9005, 9004) + WebsockifyServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=9004', sockets_include], 9005, 9004), + CompiledServerHarness(os.path.join('sockets', 'test_sockets_echo_server.c'), ['-DSOCKK=9005']) ]: with harness: self.btest(os.path.join('sockets', 'test_sockets_select_server_closes_connection_client_rw.c'), expected='266', args=['-DSOCKK=9005', sockets_include]) - # TODO remove this once we have proper listen server support built into emscripten. - # being that enet uses datagram sockets, we can't proxy to a native server with - # websockify, so we're emulating the listen server in the browser and relaying - # between two TCP servers. def test_enet(self): try_delete(self.in_dir('enet')) shutil.copytree(path_from_root('tests', 'enet'), self.in_dir('enet')) @@ -294,20 +299,18 @@ class sockets(BrowserCore): Popen([PYTHON, path_from_root('emmake'), 'make']).communicate() enet = [self.in_dir('enet', '.libs', 'libenet.a'), '-I'+path_from_root('tests', 'enet', 'include')] os.chdir(pwd) - Popen([PYTHON, EMCC, path_from_root('tests', 'sockets', 'test_enet_server.c'), '-o', 'server.html', '-DSOCKK=2235'] + enet).communicate() - - with WebsockifyServerHarness('', [], 2235, 2234): - with WebsockifyServerHarness('', [], 2237, 2236): - pids = [] - try: - proc = make_relay_server(2234, 2236) - pids.append(proc.pid) - self.btest(os.path.join('sockets', 'test_enet_client.c'), expected='0', args=['-DSOCKK=2237', '-DUSE_IFRAME'] + enet) - finally: - clean_pids(pids); - - # TODO use this once we have listen server support - # def test_enet(self): + + for harness in [ + CompiledServerHarness(os.path.join('sockets', 'test_enet_server.c'), ['-DSOCKK=9010'] + enet) + ]: + with harness: + self.btest(os.path.join('sockets', 'test_enet_client.c'), expected='0', args=['-DSOCKK=9010'] + enet) + + # This test is no longer in use for WebSockets as we can't truly emulate + # a server in the browser (in the past, there were some hacks to make it + # somewhat work, but those have been removed). However, with WebRTC it + # should be able to resurect this test. + # def test_enet_in_browser(self): # try_delete(self.in_dir('enet')) # shutil.copytree(path_from_root('tests', 'enet'), self.in_dir('enet')) # pwd = os.getcwd() @@ -316,12 +319,17 @@ class sockets(BrowserCore): # Popen([PYTHON, path_from_root('emmake'), 'make']).communicate() # enet = [self.in_dir('enet', '.libs', 'libenet.a'), '-I'+path_from_root('tests', 'enet', 'include')] # os.chdir(pwd) - - # for harness in [ - # self.CompiledServerHarness(os.path.join('sockets', 'test_enet_server.c'), ['-DSOCKK=9010'] + enet, 9011, 9010) - # ]: - # with harness: - # self.btest(os.path.join('sockets', 'test_enet_client.c'), expected='0', args=['-DSOCKK=9011'] + enet) + # Popen([PYTHON, EMCC, path_from_root('tests', 'sockets', 'test_enet_server.c'), '-o', 'server.html', '-DSOCKK=2235'] + enet).communicate() + + # with WebsockifyServerHarness('', [], 2235, 2234): + # with WebsockifyServerHarness('', [], 2237, 2236): + # pids = [] + # try: + # proc = make_relay_server(2234, 2236) + # pids.append(proc.pid) + # self.btest(os.path.join('sockets', 'test_enet_client.c'), expected='0', args=['-DSOCKK=2237', '-DUSE_IFRAME=1'] + enet) + # finally: + # clean_pids(pids); def test_webrtc(self): host_src = 'webrtc_host.c' |