aboutsummaryrefslogtreecommitdiff
path: root/src/library_sockfs.js
diff options
context:
space:
mode:
authorAnthony Pesch <inolen@gmail.com>2013-08-24 09:31:29 -0700
committerAnthony Pesch <inolen@gmail.com>2013-08-29 01:45:38 -0700
commit462875aa0e5b2820868b539e2a185db7783653f8 (patch)
tree38ad416ee28ac0a8c5db34157608d532929e6b0d /src/library_sockfs.js
parentf7744fcb18ab5d12c30cf1340c7bacabfc13d1ab (diff)
- created SOCKFS
- added support for node-based listen servers - updated tests to also test against compiled listen servers
Diffstat (limited to 'src/library_sockfs.js')
-rw-r--r--src/library_sockfs.js554
1 files changed, 548 insertions, 6 deletions
diff --git a/src/library_sockfs.js b/src/library_sockfs.js
index 13118b71..0736224d 100644
--- a/src/library_sockfs.js
+++ b/src/library_sockfs.js
@@ -1,18 +1,560 @@
mergeInto(LibraryManager.library, {
- $SOCKFS__postset: '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });',
+ $SOCKFS__postset: '__ATINIT__.push({ func: function() { if (ENVIRONMENT_IS_NODE) { WebSocket = require("ws"); } } });\n' +
+ '__ATINIT__.push({ func: function() { SOCKFS.root = FS.mount(SOCKFS, {}, null); } });',
$SOCKFS__deps: ['$FS'],
$SOCKFS: {
mount: function(mount) {
- var node = FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0);
- node.node_ops = SOCKFS.node_ops;
- node.stream_ops = SOCKFS.stream_ops;
- return node;
+ return FS.createNode(null, '/', {{{ cDefine('S_IFDIR') }}} | 0777, 0);
},
- node_ops: {
+ nextname: function() {
+ if (!SOCKFS.nextname.current) {
+ SOCKFS.nextname.current = 0;
+ }
+ return 'socket[' + (SOCKFS.nextname.current++) + ']';
},
+ createSocket: function(family, type, protocol) {
+ var streaming = type == {{{ cDefine('SOCK_STREAM') }}};
+ if (protocol) {
+ assert(streaming == (protocol == {{{ cDefine('IPPROTO_TCP') }}})); // if SOCK_STREAM, must be tcp
+ }
+
+ // create our internal socket structure
+ var sock = {
+ family: family,
+ type: type,
+ protocol: protocol,
+ server: null,
+ peers: {},
+ pending: [],
+ recv_queue: [],
+#if SOCKET_WEBRTC
+#else
+ sock_ops: SOCKFS.websocket_sock_ops
+#endif
+ };
+
+ // create the filesystem node to store the socket structure
+ var name = SOCKFS.nextname();
+ var node = FS.createNode(SOCKFS.root, name, {{{ cDefine('S_IFSOCK') }}}, 0);
+ node.sock = sock;
+
+ // and the wrapping stream that enables library functions such
+ // as read and write to indirectly interact with the socket
+ var stream = FS.createStream({
+ path: name,
+ node: node,
+ flags: FS.modeStringToFlags('r+'),
+ seekable: false,
+ stream_ops: SOCKFS.stream_ops
+ });
+
+ // map the new stream to the socket structure (sockets have a 1:1
+ // relationship with a stream)
+ sock.stream = stream;
+
+ return sock;
+ },
+ getSocket: function(fd) {
+ var stream = FS.getStream(fd);
+ if (!stream || !FS.isSocket(stream.node.mode)) {
+ return null;
+ }
+ return stream.node.sock;
+ },
+ // node and stream ops are backend agnostic
stream_ops: {
+ poll: function(stream) {
+ var sock = stream.node.sock;
+ return sock.sock_ops.poll(sock);
+ },
+ ioctl: function(stream, request, varargs) {
+ var sock = stream.node.sock;
+ return sock.sock_ops.ioctl(sock, request, varargs);
+ },
+ read: function(stream, buffer, offset, length, position /* ignored */) {
+ var sock = stream.node.sock;
+ var msg = sock.sock_ops.recvmsg(sock, length);
+ if (!msg) {
+ // socket is closed
+ return 0;
+ }
+#if USE_TYPED_ARRAYS == 2
+ buffer.set(msg.buffer, offset);
+#else
+ for (var i = 0; i < size; i++) {
+ buffer[offset + i] = msg.buffer[i];
+ }
+#endif
+ return msg.buffer.length;
+ },
+ write: function(stream, buffer, offset, length, position /* ignored */) {
+ var sock = stream.node.sock;
+ return sock.sock_ops.sendmsg(sock, buffer, offset, length);
+ },
+ close: function(stream) {
+ var sock = stream.node.sock;
+ sock.sock_ops.close(sock);
+ }
},
+ // backend-specific stream ops
websocket_sock_ops: {
+ //
+ // peers are a small wrapper around a WebSocket to help in
+ // emulating dgram sockets
+ //
+ // these functions aren't actually sock_ops members, but we're
+ // abusing the namespace to organize them
+ //
+ createPeer: function(sock, addr, port) {
+ var ws;
+
+ if (typeof addr === 'object') {
+ ws = addr;
+ addr = null;
+ port = null;
+ }
+
+ if (ws) {
+ // for sockets that've already connected (e.g. we're the server)
+ // we can inspect the _socket property for the address
+ if (ws._socket) {
+ addr = ws._socket.remoteAddress;
+ port = ws._socket.remotePort;
+ }
+ // if we're just now initializing a connection to the remote,
+ // inspect the url property
+ else {
+ var result = /ws[s]?:\/\/([^:]+):(\d+)/.exec(ws.url);
+ if (!result) {
+ throw new Error('WebSocket URL must be in the format ws(s)://address:port');
+ }
+ addr = result[1];
+ port = parseInt(result[2], 10);
+ }
+ } else {
+ // create the actual websocket object and connect
+ try {
+ var url = 'ws://' + addr + ':' + port;
+#if SOCKET_DEBUG
+ console.log('connect: ' + url);
+#endif
+ // the node ws library API is slightly different than the browser's
+ var opts = ENVIRONMENT_IS_NODE ? {} : ['binary'];
+ ws = new WebSocket(url, opts);
+ ws.binaryType = 'arraybuffer';
+ } catch (e) {
+ throw new FS.ErrnoError(ERRNO_CODES.EHOSTUNREACH);
+ }
+ }
+
+ var peer = {
+ addr: addr,
+ port: port,
+ socket: ws,
+ send_queue: []
+ };
+
+ SOCKFS.websocket_sock_ops.addPeer(sock, peer);
+ SOCKFS.websocket_sock_ops.handlePeerEvents(sock, peer);
+
+ // if this is a bound dgram socket, send the port number first to allow
+ // us to override the ephemeral port reported to us by remotePort on the
+ // remote end.
+ if (sock.type === {{{ cDefine('SOCK_DGRAM') }}} && typeof sock.sport !== 'undefined') {
+ peer.send_queue.push(new Uint8Array([
+ 255, 255, 255, 255,
+ 'p'.charCodeAt(0), 'o'.charCodeAt(0), 'r'.charCodeAt(0), 't'.charCodeAt(0),
+ ((sock.sport & 0xff00) >> 8) , (sock.sport & 0xff)
+ ]));
+ }
+
+ return peer;
+ },
+ getPeer: function(sock, addr, port) {
+ return sock.peers[addr + ':' + port];
+ },
+ addPeer: function(sock, peer) {
+ sock.peers[peer.addr + ':' + peer.port] = peer;
+ },
+ removePeer: function(sock, peer) {
+ delete sock.peers[peer.addr + ':' + peer.port];
+ },
+ handlePeerEvents: function(sock, peer) {
+ var first = true;
+
+ var handleOpen = function () {
+ try {
+ var queued = peer.send_queue.shift();
+ while (queued) {
+#if SOCKET_DEBUG
+ Module.print('websocket sending queued data (' + queued.byteLength + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(queued))]);
+#endif
+ peer.socket.send(queued);
+ queued = peer.send_queue.shift();
+ }
+ } catch (e) {
+ peer.socket.close();
+ }
+ };
+
+ var handleMessage = function(data) {
+ assert(typeof data !== 'string' && data.byteLength !== undefined); // must receive an ArrayBuffer
+ data = new Uint8Array(data); // make a typed array view on the array buffer
+
+ // if this is the port message, override the peer's port with it
+ var wasfirst = first;
+ first = false;
+ if (wasfirst &&
+ data.length === 10 &&
+ data[0] === 255 && data[1] === 255 && data[2] === 255 && data[3] === 255 &&
+ data[4] === 'p'.charCodeAt(0) && data[5] === 'o'.charCodeAt(0) && data[6] === 'r'.charCodeAt(0) && data[7] === 't'.charCodeAt(0)) {
+ // update the peer's port and it's key in the peer map
+ var newport = ((data[8] << 8) | data[9]);
+ SOCKFS.websocket_sock_ops.removePeer(sock, peer);
+ peer.port = newport;
+ SOCKFS.websocket_sock_ops.addPeer(sock, peer);
+ return;
+ }
+
+ sock.recv_queue.push({ addr: peer.addr, port: peer.port, data: data });
+ };
+
+ if (ENVIRONMENT_IS_NODE) {
+ peer.socket.on('open', handleOpen);
+ peer.socket.on('message', function(data, flags) {
+ if (!flags.binary) {
+ return;
+ }
+ handleMessage((new Uint8Array(data)).buffer); // copy from node Buffer -> ArrayBuffer
+ });
+ peer.socket.on('error', function() {
+ // don't throw
+ });
+ } else {
+ peer.socket.onopen = handleOpen;
+ peer.socket.onmessage = function(event) {
+ handleMessage(event.data);
+ };
+ }
+ },
+
+ //
+ // actual sock ops
+ //
+ poll: function(sock) {
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
+ // listen sockets should only say they're available for reading
+ // if there are pending clients.
+ return sock.pending.length ? ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}}) : 0;
+ }
+
+ var mask = 0;
+ var dest = sock.type === {{{ cDefine('SOCK_STREAM') }}} ? // we only care about the socket state for connection-based sockets
+ SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport) :
+ null;
+
+ if (sock.recv_queue.length ||
+ !dest || // connection-less sockets are always ready to read
+ (dest && dest.socket.readyState === WebSocket.CLOSING) ||
+ (dest && dest.socket.readyState === WebSocket.CLOSED)) { // let recv return 0 once closed
+ mask |= ({{{ cDefine('POLLRDNORM') }}} | {{{ cDefine('POLLIN') }}});
+ }
+
+ if (!dest || // connection-less sockets are always ready to write
+ (dest && dest.socket.readyState === WebSocket.OPEN)) {
+ mask |= {{{ cDefine('POLLOUT') }}};
+ }
+
+ if ((dest && dest.socket.readyState === WebSocket.CLOSING) ||
+ (dest && dest.socket.readyState === WebSocket.CLOSED)) {
+ mask |= {{{ cDefine('POLLHUP') }}};
+ }
+
+ return mask;
+ },
+ ioctl: function(sock, request, arg) {
+ switch (request) {
+ case {{{ cDefine('FIONREAD') }}}:
+ var bytes = 0;
+ if (sock.recv_queue.length) {
+ bytes = sock.recv_queue[0].data.length;
+ }
+ {{{ makeSetValue('arg', '0', 'bytes', 'i32') }}};
+ return 0;
+ default:
+ return ERRNO_CODES.EINVAL;
+ }
+ },
+ close: function(sock) {
+ // if we've spawned a listen server, close it
+ if (sock.server) {
+ try {
+ sock.server.close();
+ } catch (e) {
+ }
+ sock.server = null;
+ }
+ // close any peer connections
+ var peers = Object.keys(sock.peers);
+ for (var i = 0; i < peers.length; i++) {
+ var peer = sock.peers[peers[i]];
+ try {
+ peer.socket.close();
+ } catch (e) {
+ }
+ SOCKFS.websocket_sock_ops.removePeer(sock, peer);
+ }
+ return 0;
+ },
+ bind: function(sock, addr, port) {
+ if (typeof sock.saddr !== 'undefined' || typeof sock.sport !== 'undefined') {
+ throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already bound
+ }
+ sock.saddr = addr;
+ sock.sport = port || _mkport();
+ // in order to emulate dgram sockets, we need to launch a listen server when
+ // binding on a connection-less socket
+ // note: this is only required on the server side
+ if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
+ // close the existing server if it exists
+ if (sock.server) {
+ sock.server.close();
+ sock.server = null;
+ }
+ // swallow error operation not supported error that occurs when binding in the
+ // browser where this isn't supported
+ try {
+ sock.sock_ops.listen(sock, 0);
+ } catch (e) {
+ if (!(e instanceof FS.ErrnoError)) throw e;
+ if (e.errno !== ERRNO_CODES.EOPNOTSUPP) throw e;
+ }
+ }
+ },
+ connect: function(sock, addr, port) {
+ if (sock.server) {
+ throw new FS.ErrnoError(ERRNO_CODS.EOPNOTSUPP);
+ }
+
+ // TODO autobind
+ // if (!sock.addr && sock.type == {{{ cDefine('SOCK_DGRAM') }}}) {
+ // }
+
+ // early out if we're already connected / in the middle of connecting
+ if (typeof sock.daddr !== 'undefined' && typeof sock.dport !== 'undefined') {
+ var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
+ if (dest) {
+ if (dest.socket.readyState === WebSocket.CONNECTING) {
+ throw new FS.ErrnoError(ERRNO_CODES.EALREADY);
+ } else {
+ throw new FS.ErrnoError(ERRNO_CODES.EISCONN);
+ }
+ }
+ }
+
+ // add the socket to our peer list and set our
+ // defination address / port to match
+ var peer = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
+ sock.daddr = peer.addr;
+ sock.dport = peer.port;
+
+ // always "fail" in non-blocking mode
+ throw new FS.ErrnoError(ERRNO_CODES.EINPROGRESS);
+ },
+ listen: function(sock, backlog) {
+ if (!ENVIRONMENT_IS_NODE) {
+ throw new FS.ErrnoError(ERRNO_CODES.EOPNOTSUPP);
+ }
+ if (sock.server) {
+ throw new FS.ErrnoError(ERRNO_CODES.EINVAL); // already listening
+ }
+ var WebSocketServer = WebSocket.Server;
+ var host = sock.saddr;
+#if SOCKET_DEBUG
+ console.log('listen: ' + host + ':' + sock.sport);
+#endif
+ sock.server = new WebSocketServer({
+ host: host,
+ port: sock.sport
+ // TODO support backlog
+ });
+
+ sock.server.on('connection', function(ws) {
+#if SOCKET_DEBUG
+ console.log('received connection from: ' + ws._socket.remoteAddress + ':' + ws._socket.remotePort);
+#endif
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
+ // push to queue for accept to pick up
+ sock.pending.push(ws);
+ } else {
+ // auto-accept, adding the peer to the listen socket so
+ // calling sendto with the listen socket and an address
+ // will resolve to the correct client
+ SOCKFS.websocket_sock_ops.createPeer(sock, ws);
+ }
+ });
+ sock.server.on('closed', function() {
+ sock.server = null;
+ });
+ sock.server.on('error', function() {
+ // don't throw
+ });
+ },
+ accept: function(listensock, newsock, flags) {
+ if (!listensock.server) {
+ ___setErrNo(ERRNO_CODES.EINVAL);
+ return -1;
+ }
+
+ // create a peer on the server's client socket
+ var peer = SOCKFS.websocket_sock_ops.createPeer(newsock, listensock.pending.shift());
+ newsock.daddr = peer.addr;
+ newsock.dport = peer.port;
+ newsock.stream.flags = flags;
+
+ return 0;
+ },
+ getname: function(sock, peer) {
+ var addr, port;
+ if (peer) {
+ if (sock.daddr === undefined || sock.dport === undefined) {
+ throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
+ }
+ addr = sock.daddr;
+ port = sock.dport;
+ } else {
+ // TODO saddr and sport will be set for bind()'d UDP sockets, but what
+ // should we be returning for TCP sockets that've been connect()'d?
+ addr = sock.saddr || 0;
+ port = sock.sport || 0;
+ }
+ return { addr: addr, port: port };
+ },
+ sendmsg: function(sock, buffer, offset, length, addr, port) {
+ if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
+ // connection-less sockets will honor the message address,
+ // and otherwise fall back to the bound destination address
+ if (addr === undefined || port === undefined) {
+ addr = sock.daddr;
+ port = sock.dport;
+ }
+ // if there was no address to fall back to, error out
+ if (addr === undefined || port === undefined) {
+ throw new FS.ErrnoError(ERRNO_CODES.EDESTADDRREQ);
+ }
+ } else {
+ // connection-based sockets will only use the bound
+ addr = sock.daddr;
+ port = sock.dport;
+ }
+
+ // find the peer for the destination address
+ var dest = SOCKFS.websocket_sock_ops.getPeer(sock, addr, port);
+
+ // early out if not connected with a connection-based socket
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
+ if (!dest || dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) {
+ throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
+ } else if (dest.socket.readyState === WebSocket.CONNECTING) {
+ throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
+ }
+ }
+
+ // create a copy of the incoming data to send, as the WebSocket API
+ // doesn't work entirely with an ArrayBufferView, it'll just send
+ // the entire underlying buffer
+ var data;
+ if (buffer instanceof Array || buffer instanceof ArrayBuffer) {
+ data = buffer.slice(offset, offset + length);
+ } else { // ArrayBufferView
+ data = buffer.buffer.slice(buffer.byteOffset + offset, buffer.byteOffset + offset + length);
+ }
+
+ // if we're emulating a connection-less dgram socket and don't have
+ // a cached connection, queue the buffer to send upon connect and
+ // lie, saying the data was sent now.
+ if (sock.type === {{{ cDefine('SOCK_DGRAM') }}}) {
+ if (!dest || dest.socket.readyState !== dest.socket.OPEN) {
+ // if we're not connected, open a new connection
+ if (!dest || dest.socket.readyState === dest.socket.CLOSING || dest.socket.readyState === dest.socket.CLOSED) {
+ dest = SOCKFS.websocket_sock_ops.createPeer(sock, addr, port);
+ }
+#if SOCKET_DEBUG
+ Module.print('websocket queuing (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
+#endif
+ dest.send_queue.push(data);
+ return length;
+ }
+ }
+
+ try {
+#if SOCKET_DEBUG
+ Module.print('websocket send (' + length + ' bytes): ' + [Array.prototype.slice.call(new Uint8Array(data))]);
+#endif
+ // send the actual data
+ dest.socket.send(data);
+ return length;
+ } catch (e) {
+ throw new FS.ErrnoError(ERRNO_CODES.EINVAL);
+ }
+ },
+ recvmsg: function(sock, length) {
+ // http://pubs.opengroup.org/onlinepubs/7908799/xns/recvmsg.html
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && sock.server) {
+ // tcp servers should not be recv()'ing on the listen socket
+ throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
+ }
+
+ var queued = sock.recv_queue.shift();
+ if (!queued) {
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}}) {
+ var dest = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport);
+
+ if (!dest) {
+ // if we have a destination address but are not connected, error out
+ throw new FS.ErrnoError(ERRNO_CODES.ENOTCONN);
+ }
+ else if (dest.socket.readyState === WebSocket.CLOSING || dest.socket.readyState === WebSocket.CLOSED) {
+ // return null if the socket has closed
+ return null;
+ }
+ else {
+ // else, our socket is in a valid state but truly has nothing available
+ throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
+ }
+ } else {
+ throw new FS.ErrnoError(ERRNO_CODES.EAGAIN);
+ }
+ }
+
+ // queued.data will be an ArrayBuffer if it's unadulterated, but if it's
+ // requeued TCP data it'll be an ArrayBufferView
+ var queuedLength = queued.data.byteLength || queued.data.length;
+ var queuedOffset = queued.data.byteOffset || 0;
+ var queuedBuffer = queued.data.buffer || queued.data;
+ var bytesRead = Math.min(length, queuedLength);
+ var res = {
+ buffer: new Uint8Array(queuedBuffer, queuedOffset, bytesRead),
+ addr: queued.addr,
+ port: queued.port
+ };
+
+#if SOCKET_DEBUG
+ Module.print('websocket read (' + bytesRead + ' bytes): ' + [Array.prototype.slice.call(res.buffer)]);
+#endif
+
+ // push back any unread data for TCP connections
+ if (sock.type === {{{ cDefine('SOCK_STREAM') }}} && bytesRead < queuedLength) {
+ var bytesRemaining = queuedLength - bytesRead;
+#if SOCKET_DEBUG
+ Module.print('websocket read: put back ' + bytesRemaining + ' bytes');
+#endif
+ queued.data = new Uint8Array(queuedBuffer, queuedOffset + bytesRead, bytesRemaining);
+ sock.recv_queue.unshift(queued);
+ }
+
+ return res;
+ }
}
}
}); \ No newline at end of file