diff options
author | Anthony Pesch <inolen@gmail.com> | 2013-08-24 09:31:29 -0700 |
---|---|---|
committer | Anthony Pesch <inolen@gmail.com> | 2013-08-29 01:45:38 -0700 |
commit | 462875aa0e5b2820868b539e2a185db7783653f8 (patch) | |
tree | 38ad416ee28ac0a8c5db34157608d532929e6b0d /src/library_sockfs.js | |
parent | f7744fcb18ab5d12c30cf1340c7bacabfc13d1ab (diff) |
- created SOCKFS
- added support for node-based listen servers
- updated tests to also test against compiled listen servers
Diffstat (limited to 'src/library_sockfs.js')
-rw-r--r-- | src/library_sockfs.js | 554 |
1 files changed, 548 insertions, 6 deletions
diff --git a/src/library_sockfs.js b/src/library_sockfs.js index 13118b71..0736224d 100644 --- a/src/library_sockfs.js +++ b/src/library_sockfs.js @@ -1,18 +1,560 @@ mergeInto(LibraryManager.library, { - $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', + $SOCKFS__postset: '__ATINIT__.push({ func: function() { if (ENVIRONMENT_IS_NODE) { WebSocket = require("ws"); } } });\n' + + '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });', $SOCKFS__deps: ['$FS'], $SOCKFS: { mount: function(mount) { - var node = FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); - node.node_ops = SOCKFS.node_ops; - node.stream_ops = SOCKFS.stream_ops; - return node; + return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0); }, - node_ops: { + nextname: function() { + if (!SOCKFS.nextname.current) { + SOCKFS.nextname.current = 0; + } + return 'socket[' + (SOCKFS.nextname.current++) + ']'; }, + createSocket: function(family, type, protocol) { + var streaming = type == {{{ cDefine('SOCK_STREAM') }}}; + if (protocol) { + assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp + } + + // create our internal socket structure + var sock = { + family: family, + type: type, + protocol: protocol, + server: null, + peers: {}, + pending: [], + recv_queue: [], +#if SOCKET_WEBRTC +#else + sock_ops: SOCKFS.websocket_sock_ops +#endif + }; + + // create the filesystem node to store the socket structure + var name = SOCKFS.nextname(); + var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0); + node.sock = sock; + + // and the wrapping stream that enables library functions such + // as read and write to indirectly interact with the socket + var stream = FS.createStream({ + path: name, + node: node, + flags: FS.modeStringToFlags('r+'), + seekable: false, + stream_ops: SOCKFS.stream_ops + }); + + // map the new stream to the socket structure (sockets have a 1:1 + // relationship with a stream) + sock.stream = stream; + + return sock; + }, + getSocket: function(fd) { + var stream = FS.getStream(fd); + if (!stream || !FS.isSocket(stream.node.mode)) { + return null; + } + return stream.node.sock; + }, + // node and stream ops are backend agnostic stream_ops: { + poll: function(stream) { + var sock = stream.node.sock; + return sock.sock_ops.poll(sock); + }, + ioctl: function(stream, request, varargs) { + var sock = stream.node.sock; + return sock.sock_ops.ioctl(sock, request, varargs); + }, + read: function(stream, buffer, offset, length, position /* ignored */) { + var sock = stream.node.sock; + var msg = sock.sock_ops.recvmsg(sock, length); + if (!msg) { + // socket is closed + return 0; + } +#if USE_TYPED_ARRAYS == 2 + buffer.set(msg.buffer, offset); +#else + for (var i = 0; i < size; i++) { + buffer[offset + i] = msg.buffer[i]; + } +#endif + return msg.buffer.length; + }, + write: function(stream, buffer, offset, length, position /* ignored */) { + var sock = stream.node.sock; + return sock.sock_ops.sendmsg(sock, buffer, offset, length); + }, + close: function(stream) { + var sock = stream.node.sock; + sock.sock_ops.close(sock); + } }, + // backend-specific stream ops websocket_sock_ops: { + // + // peers are a small wrapper around a WebSocket to help in + // emulating dgram sockets + // + // these functions aren't actually sock_ops members, but we're + // abusing the namespace to organize them + // + createPeer: function(sock, addr, port) { + var ws; + + if (typeof addr === 'object') { + ws = addr; + addr = null; + port = null; + } + + if (ws) { + // for sockets that've already connected (e.g. we're the server) + // we can inspect the _socket property for the address + if (ws._socket) { + addr = ws._socket.remoteAddress; + port = ws._socket.remotePort; + } + // if we're just now initializing a connection to the remote, + // inspect the url property + else { + var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url); + if (!result) { + throw new Error('WebSocket URL must be in the format ws(s)://address:port'); + } + addr = result[1]; + port = parseInt(result[2], 10); + } + } else { + // create the actual websocket object and connect + try { + var url = 'ws://' + addr + ':' + port; +#if SOCKET_DEBUG + console.log('connect: ' + url); +#endif + // the node ws library API is slightly different than the browser's + var opts = ENVIRONMENT_IS_NODE ? {} : ['binary']; + ws = new WebSocket(url, opts); + ws.binaryType = 'arraybuffer'; + } catch (e) { + throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH); + } + } + + var peer = { + addr: addr, + port: port, + socket: ws, + send_queue: [] + }; + + SOCKFS.websocket_sock_ops.addPeer(sock, peer); + SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer); + + // if this is a bound dgram socket, send the port number first to allow + // us to override the ephemeral port reported to us by remotePort on the + // remote end. + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') { + peer.send_queue.push(new Uint8Array([ + 255, 255, 255, 255, + 'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0), + ((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff) + ])); + } + + return peer; + }, + getPeer: function(sock, addr, port) { + return sock.peers[addr + ':' + port]; + }, + addPeer: function(sock, peer) { + sock.peers[peer.addr + ':' + peer.port] = peer; + }, + removePeer: function(sock, peer) { + delete sock.peers[peer.addr + ':' + peer.port]; + }, + handlePeerEvents: function(sock, peer) { + var first = true; + + var handleOpen = function () { + try { + var queued = peer.send_queue.shift(); + while (queued) { +#if SOCKET_DEBUG + Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]); +#endif + peer.socket.send(queued); + queued = peer.send_queue.shift(); + } + } catch (e) { + peer.socket.close(); + } + }; + + var handleMessage = function(data) { + assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer + data = new Uint8Array(data); // make a typed array view on the array buffer + + // if this is the port message, override the peer's port with it + var wasfirst = first; + first = false; + if (wasfirst && + data.length === 10 && + data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 && + data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) { + // update the peer's port and it's key in the peer map + var newport = ((data[8] << 8) | data[9]); + SOCKFS.websocket_sock_ops.removePeer(sock, peer); + peer.port = newport; + SOCKFS.websocket_sock_ops.addPeer(sock, peer); + return; + } + + sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data }); + }; + + if (ENVIRONMENT_IS_NODE) { + peer.socket.on('open', handleOpen); + peer.socket.on('message', function(data, flags) { + if (!flags.binary) { + return; + } + handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer + }); + peer.socket.on('error', function() { + // don't throw + }); + } else { + peer.socket.onopen = handleOpen; + peer.socket.onmessage = function(event) { + handleMessage(event.data); + }; + } + }, + + // + // actual sock ops + // + poll: function(sock) { + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { + // listen sockets should only say they're available for reading + // if there are pending clients. + return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0; + } + + var mask = 0; + var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets + SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) : + null; + + if (sock.recv_queue.length || + !dest || // connection-less sockets are always ready to read + (dest && dest.socket.readyState === WebSocket.CLOSING) || + (dest && dest.socket.readyState === WebSocket.CLOSED)) { // let recv return 0 once closed + mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}); + } + + if (!dest || // connection-less sockets are always ready to write + (dest && dest.socket.readyState === WebSocket.OPEN)) { + mask |= {{{ cDefine('POLLOUT') }}}; + } + + if ((dest && dest.socket.readyState === WebSocket.CLOSING) || + (dest && dest.socket.readyState === WebSocket.CLOSED)) { + mask |= {{{ cDefine('POLLHUP') }}}; + } + + return mask; + }, + ioctl: function(sock, request, arg) { + switch (request) { + case {{{ cDefine('FIONREAD') }}}: + var bytes = 0; + if (sock.recv_queue.length) { + bytes = sock.recv_queue[0].data.length; + } + {{{ makeSetValue('arg', '0', 'bytes', 'i32') }}}; + return 0; + default: + return ERRNO_CODES.EINVAL; + } + }, + close: function(sock) { + // if we've spawned a listen server, close it + if (sock.server) { + try { + sock.server.close(); + } catch (e) { + } + sock.server = null; + } + // close any peer connections + var peers = Object.keys(sock.peers); + for (var i = 0; i < peers.length; i++) { + var peer = sock.peers[peers[i]]; + try { + peer.socket.close(); + } catch (e) { + } + SOCKFS.websocket_sock_ops.removePeer(sock, peer); + } + return 0; + }, + bind: function(sock, addr, port) { + if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound + } + sock.saddr = addr; + sock.sport = port || _mkport(); + // in order to emulate dgram sockets, we need to launch a listen server when + // binding on a connection-less socket + // note: this is only required on the server side + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + // close the existing server if it exists + if (sock.server) { + sock.server.close(); + sock.server = null; + } + // swallow error operation not supported error that occurs when binding in the + // browser where this isn't supported + try { + sock.sock_ops.listen(sock, 0); + } catch (e) { + if (!(e instanceof FS.ErrnoError)) throw e; + if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e; + } + } + }, + connect: function(sock, addr, port) { + if (sock.server) { + throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP); + } + + // TODO autobind + // if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) { + // } + + // early out if we're already connected / in the middle of connecting + if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') { + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); + if (dest) { + if (dest.socket.readyState === WebSocket.CONNECTING) { + throw new FS.ErrnoError(ERRNO_CODES.EALREADY); + } else { + throw new FS.ErrnoError(ERRNO_CODES.EISCONN); + } + } + } + + // add the socket to our peer list and set our + // defination address / port to match + var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); + sock.daddr = peer.addr; + sock.dport = peer.port; + + // always "fail" in non-blocking mode + throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS); + }, + listen: function(sock, backlog) { + if (!ENVIRONMENT_IS_NODE) { + throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP); + } + if (sock.server) { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening + } + var WebSocketServer = WebSocket.Server; + var host = sock.saddr; +#if SOCKET_DEBUG + console.log('listen: ' + host + ':' + sock.sport); +#endif + sock.server = new WebSocketServer({ + host: host, + port: sock.sport + // TODO support backlog + }); + + sock.server.on('connection', function(ws) { +#if SOCKET_DEBUG + console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort); +#endif + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + // push to queue for accept to pick up + sock.pending.push(ws); + } else { + // auto-accept, adding the peer to the listen socket so + // calling sendto with the listen socket and an address + // will resolve to the correct client + SOCKFS.websocket_sock_ops.createPeer(sock, ws); + } + }); + sock.server.on('closed', function() { + sock.server = null; + }); + sock.server.on('error', function() { + // don't throw + }); + }, + accept: function(listensock, newsock, flags) { + if (!listensock.server) { + ___setErrNo(ERRNO_CODES.EINVAL); + return -1; + } + + // create a peer on the server's client socket + var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, listensock.pending.shift()); + newsock.daddr = peer.addr; + newsock.dport = peer.port; + newsock.stream.flags = flags; + + return 0; + }, + getname: function(sock, peer) { + var addr, port; + if (peer) { + if (sock.daddr === undefined || sock.dport === undefined) { + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + addr = sock.daddr; + port = sock.dport; + } else { + // TODO saddr and sport will be set for bind()'d UDP sockets, but what + // should we be returning for TCP sockets that've been connect()'d? + addr = sock.saddr || 0; + port = sock.sport || 0; + } + return { addr: addr, port: port }; + }, + sendmsg: function(sock, buffer, offset, length, addr, port) { + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + // connection-less sockets will honor the message address, + // and otherwise fall back to the bound destination address + if (addr === undefined || port === undefined) { + addr = sock.daddr; + port = sock.dport; + } + // if there was no address to fall back to, error out + if (addr === undefined || port === undefined) { + throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ); + } + } else { + // connection-based sockets will only use the bound + addr = sock.daddr; + port = sock.dport; + } + + // find the peer for the destination address + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port); + + // early out if not connected with a connection-based socket + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + if (!dest || dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) { + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } else if (dest.socket.readyState === WebSocket.CONNECTING) { + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } + + // create a copy of the incoming data to send, as the WebSocket API + // doesn't work entirely with an ArrayBufferView, it'll just send + // the entire underlying buffer + var data; + if (buffer instanceof Array || buffer instanceof ArrayBuffer) { + data = buffer.slice(offset, offset + length); + } else { // ArrayBufferView + data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length); + } + + // if we're emulating a connection-less dgram socket and don't have + // a cached connection, queue the buffer to send upon connect and + // lie, saying the data was sent now. + if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) { + if (!dest || dest.socket.readyState !== dest.socket.OPEN) { + // if we're not connected, open a new connection + if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) { + dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port); + } +#if SOCKET_DEBUG + Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); +#endif + dest.send_queue.push(data); + return length; + } + } + + try { +#if SOCKET_DEBUG + Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]); +#endif + // send the actual data + dest.socket.send(data); + return length; + } catch (e) { + throw new FS.ErrnoError(ERRNO_CODES.EINVAL); + } + }, + recvmsg: function(sock, length) { + // http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) { + // tcp servers should not be recv()'ing on the listen socket + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + + var queued = sock.recv_queue.shift(); + if (!queued) { + if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) { + var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); + + if (!dest) { + // if we have a destination address but are not connected, error out + throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN); + } + else if (dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) { + // return null if the socket has closed + return null; + } + else { + // else, our socket is in a valid state but truly has nothing available + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } else { + throw new FS.ErrnoError(ERRNO_CODES.EAGAIN); + } + } + + // queued.data will be an ArrayBuffer if it's unadulterated, but if it's + // requeued TCP data it'll be an ArrayBufferView + var queuedLength = queued.data.byteLength || queued.data.length; + var queuedOffset = queued.data.byteOffset || 0; + var queuedBuffer = queued.data.buffer || queued.data; + var bytesRead = Math.min(length, queuedLength); + var res = { + buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead), + addr: queued.addr, + port: queued.port + }; + +#if SOCKET_DEBUG + Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]); +#endif + + // push back any unread data for TCP connections + if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) { + var bytesRemaining = queuedLength - bytesRead; +#if SOCKET_DEBUG + Module.print('websocket read: put back ' + bytesRemaining + ' bytes'); +#endif + queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining); + sock.recv_queue.unshift(queued); + } + + return res; + } } } });
\ No newline at end of file |