stream.js
1 'use strict'; 2 3 const { Duplex } = require('stream'); 4 5 /** 6 * Emits the `'close'` event on a stream. 7 * 8 * @param {Duplex} stream The stream. 9 * @private 10 */ 11 function emitClose(stream) { 12 stream.emit('close'); 13 } 14 15 /** 16 * The listener of the `'end'` event. 17 * 18 * @private 19 */ 20 function duplexOnEnd() { 21 if (!this.destroyed && this._writableState.finished) { 22 this.destroy(); 23 } 24 } 25 26 /** 27 * The listener of the `'error'` event. 28 * 29 * @param {Error} err The error 30 * @private 31 */ 32 function duplexOnError(err) { 33 this.removeListener('error', duplexOnError); 34 this.destroy(); 35 if (this.listenerCount('error') === 0) { 36 // Do not suppress the throwing behavior. 37 this.emit('error', err); 38 } 39 } 40 41 /** 42 * Wraps a `WebSocket` in a duplex stream. 43 * 44 * @param {WebSocket} ws The `WebSocket` to wrap 45 * @param {Object} [options] The options for the `Duplex` constructor 46 * @return {Duplex} The duplex stream 47 * @public 48 */ 49 function createWebSocketStream(ws, options) { 50 let resumeOnReceiverDrain = true; 51 let terminateOnDestroy = true; 52 53 function receiverOnDrain() { 54 if (resumeOnReceiverDrain) ws._socket.resume(); 55 } 56 57 if (ws.readyState === ws.CONNECTING) { 58 ws.once('open', function open() { 59 ws._receiver.removeAllListeners('drain'); 60 ws._receiver.on('drain', receiverOnDrain); 61 }); 62 } else { 63 ws._receiver.removeAllListeners('drain'); 64 ws._receiver.on('drain', receiverOnDrain); 65 } 66 67 const duplex = new Duplex({ 68 ...options, 69 autoDestroy: false, 70 emitClose: false, 71 objectMode: false, 72 writableObjectMode: false 73 }); 74 75 ws.on('message', function message(msg) { 76 if (!duplex.push(msg)) { 77 resumeOnReceiverDrain = false; 78 ws._socket.pause(); 79 } 80 }); 81 82 ws.once('error', function error(err) { 83 if (duplex.destroyed) return; 84 85 // Prevent `ws.terminate()` from being called by `duplex._destroy()`. 86 // 87 // - If the `'error'` event is emitted before the `'open'` event, then 88 // `ws.terminate()` is a noop as no socket is assigned. 89 // - Otherwise, the error is re-emitted by the listener of the `'error'` 90 // event of the `Receiver` object. The listener already closes the 91 // connection by calling `ws.close()`. This allows a close frame to be 92 // sent to the other peer. If `ws.terminate()` is called right after this, 93 // then the close frame might not be sent. 94 terminateOnDestroy = false; 95 duplex.destroy(err); 96 }); 97 98 ws.once('close', function close() { 99 if (duplex.destroyed) return; 100 101 duplex.push(null); 102 }); 103 104 duplex._destroy = function (err, callback) { 105 if (ws.readyState === ws.CLOSED) { 106 callback(err); 107 process.nextTick(emitClose, duplex); 108 return; 109 } 110 111 let called = false; 112 113 ws.once('error', function error(err) { 114 called = true; 115 callback(err); 116 }); 117 118 ws.once('close', function close() { 119 if (!called) callback(err); 120 process.nextTick(emitClose, duplex); 121 }); 122 123 if (terminateOnDestroy) ws.terminate(); 124 }; 125 126 duplex._final = function (callback) { 127 if (ws.readyState === ws.CONNECTING) { 128 ws.once('open', function open() { 129 duplex._final(callback); 130 }); 131 return; 132 } 133 134 // If the value of the `_socket` property is `null` it means that `ws` is a 135 // client websocket and the handshake failed. In fact, when this happens, a 136 // socket is never assigned to the websocket. Wait for the `'error'` event 137 // that will be emitted by the websocket. 138 if (ws._socket === null) return; 139 140 if (ws._socket._writableState.finished) { 141 callback(); 142 if (duplex._readableState.endEmitted) duplex.destroy(); 143 } else { 144 ws._socket.once('finish', function finish() { 145 // `duplex` is not destroyed here because the `'end'` event will be 146 // emitted on `duplex` after this `'finish'` event. The EOF signaling 147 // `null` chunk is, in fact, pushed when the websocket emits `'close'`. 148 callback(); 149 }); 150 ws.close(); 151 } 152 }; 153 154 duplex._read = function () { 155 if ( 156 (ws.readyState === ws.OPEN || ws.readyState === ws.CLOSING) && 157 !resumeOnReceiverDrain 158 ) { 159 resumeOnReceiverDrain = true; 160 if (!ws._receiver._writableState.needDrain) ws._socket.resume(); 161 } 162 }; 163 164 duplex._write = function (chunk, encoding, callback) { 165 if (ws.readyState === ws.CONNECTING) { 166 ws.once('open', function open() { 167 duplex._write(chunk, encoding, callback); 168 }); 169 return; 170 } 171 172 ws.send(chunk, callback); 173 }; 174 175 duplex.on('end', duplexOnEnd); 176 duplex.on('error', duplexOnError); 177 return duplex; 178 } 179 180 module.exports = createWebSocketStream;