stream.js
  1  'use strict';
  2  
  3  const { Duplex } = require('stream');
  4  
  5  /**
  6   * Emits the `'close'` event on a stream.
  7   *
  8   * @param {Duplex} stream The stream.
  9   * @private
 10   */
 11  function emitClose(stream) {
 12    stream.emit('close');
 13  }
 14  
 15  /**
 16   * The listener of the `'end'` event.
 17   *
 18   * @private
 19   */
 20  function duplexOnEnd() {
 21    if (!this.destroyed && this._writableState.finished) {
 22      this.destroy();
 23    }
 24  }
 25  
 26  /**
 27   * The listener of the `'error'` event.
 28   *
 29   * @param {Error} err The error
 30   * @private
 31   */
 32  function duplexOnError(err) {
 33    this.removeListener('error', duplexOnError);
 34    this.destroy();
 35    if (this.listenerCount('error') === 0) {
 36      // Do not suppress the throwing behavior.
 37      this.emit('error', err);
 38    }
 39  }
 40  
 41  /**
 42   * Wraps a `WebSocket` in a duplex stream.
 43   *
 44   * @param {WebSocket} ws The `WebSocket` to wrap
 45   * @param {Object} [options] The options for the `Duplex` constructor
 46   * @return {Duplex} The duplex stream
 47   * @public
 48   */
 49  function createWebSocketStream(ws, options) {
 50    let resumeOnReceiverDrain = true;
 51    let terminateOnDestroy = true;
 52  
 53    function receiverOnDrain() {
 54      if (resumeOnReceiverDrain) ws._socket.resume();
 55    }
 56  
 57    if (ws.readyState === ws.CONNECTING) {
 58      ws.once('open', function open() {
 59        ws._receiver.removeAllListeners('drain');
 60        ws._receiver.on('drain', receiverOnDrain);
 61      });
 62    } else {
 63      ws._receiver.removeAllListeners('drain');
 64      ws._receiver.on('drain', receiverOnDrain);
 65    }
 66  
 67    const duplex = new Duplex({
 68      ...options,
 69      autoDestroy: false,
 70      emitClose: false,
 71      objectMode: false,
 72      writableObjectMode: false
 73    });
 74  
 75    ws.on('message', function message(msg) {
 76      if (!duplex.push(msg)) {
 77        resumeOnReceiverDrain = false;
 78        ws._socket.pause();
 79      }
 80    });
 81  
 82    ws.once('error', function error(err) {
 83      if (duplex.destroyed) return;
 84  
 85      // Prevent `ws.terminate()` from being called by `duplex._destroy()`.
 86      //
 87      // - If the `'error'` event is emitted before the `'open'` event, then
 88      //   `ws.terminate()` is a noop as no socket is assigned.
 89      // - Otherwise, the error is re-emitted by the listener of the `'error'`
 90      //   event of the `Receiver` object. The listener already closes the
 91      //   connection by calling `ws.close()`. This allows a close frame to be
 92      //   sent to the other peer. If `ws.terminate()` is called right after this,
 93      //   then the close frame might not be sent.
 94      terminateOnDestroy = false;
 95      duplex.destroy(err);
 96    });
 97  
 98    ws.once('close', function close() {
 99      if (duplex.destroyed) return;
100  
101      duplex.push(null);
102    });
103  
104    duplex._destroy = function (err, callback) {
105      if (ws.readyState === ws.CLOSED) {
106        callback(err);
107        process.nextTick(emitClose, duplex);
108        return;
109      }
110  
111      let called = false;
112  
113      ws.once('error', function error(err) {
114        called = true;
115        callback(err);
116      });
117  
118      ws.once('close', function close() {
119        if (!called) callback(err);
120        process.nextTick(emitClose, duplex);
121      });
122  
123      if (terminateOnDestroy) ws.terminate();
124    };
125  
126    duplex._final = function (callback) {
127      if (ws.readyState === ws.CONNECTING) {
128        ws.once('open', function open() {
129          duplex._final(callback);
130        });
131        return;
132      }
133  
134      // If the value of the `_socket` property is `null` it means that `ws` is a
135      // client websocket and the handshake failed. In fact, when this happens, a
136      // socket is never assigned to the websocket. Wait for the `'error'` event
137      // that will be emitted by the websocket.
138      if (ws._socket === null) return;
139  
140      if (ws._socket._writableState.finished) {
141        callback();
142        if (duplex._readableState.endEmitted) duplex.destroy();
143      } else {
144        ws._socket.once('finish', function finish() {
145          // `duplex` is not destroyed here because the `'end'` event will be
146          // emitted on `duplex` after this `'finish'` event. The EOF signaling
147          // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
148          callback();
149        });
150        ws.close();
151      }
152    };
153  
154    duplex._read = function () {
155      if (
156        (ws.readyState === ws.OPEN || ws.readyState === ws.CLOSING) &&
157        !resumeOnReceiverDrain
158      ) {
159        resumeOnReceiverDrain = true;
160        if (!ws._receiver._writableState.needDrain) ws._socket.resume();
161      }
162    };
163  
164    duplex._write = function (chunk, encoding, callback) {
165      if (ws.readyState === ws.CONNECTING) {
166        ws.once('open', function open() {
167          duplex._write(chunk, encoding, callback);
168        });
169        return;
170      }
171  
172      ws.send(chunk, callback);
173    };
174  
175    duplex.on('end', duplexOnEnd);
176    duplex.on('error', duplexOnError);
177    return duplex;
178  }
179  
180  module.exports = createWebSocketStream;