receiver.js
  1  'use strict';
  2  
  3  const { Writable } = require('stream');
  4  
  5  const PerMessageDeflate = require('./permessage-deflate');
  6  const {
  7    BINARY_TYPES,
  8    EMPTY_BUFFER,
  9    kStatusCode,
 10    kWebSocket
 11  } = require('./constants');
 12  const { concat, toArrayBuffer, unmask } = require('./buffer-util');
 13  const { isValidStatusCode, isValidUTF8 } = require('./validation');
 14  
 15  const GET_INFO = 0;
 16  const GET_PAYLOAD_LENGTH_16 = 1;
 17  const GET_PAYLOAD_LENGTH_64 = 2;
 18  const GET_MASK = 3;
 19  const GET_DATA = 4;
 20  const INFLATING = 5;
 21  
 22  /**
 23   * HyBi Receiver implementation.
 24   *
 25   * @extends Writable
 26   */
 27  class Receiver extends Writable {
 28    /**
 29     * Creates a Receiver instance.
 30     *
 31     * @param {String} [binaryType=nodebuffer] The type for binary data
 32     * @param {Object} [extensions] An object containing the negotiated extensions
 33     * @param {Boolean} [isServer=false] Specifies whether to operate in client or
 34     *     server mode
 35     * @param {Number} [maxPayload=0] The maximum allowed message length
 36     */
 37    constructor(binaryType, extensions, isServer, maxPayload) {
 38      super();
 39  
 40      this._binaryType = binaryType || BINARY_TYPES[0];
 41      this[kWebSocket] = undefined;
 42      this._extensions = extensions || {};
 43      this._isServer = !!isServer;
 44      this._maxPayload = maxPayload | 0;
 45  
 46      this._bufferedBytes = 0;
 47      this._buffers = [];
 48  
 49      this._compressed = false;
 50      this._payloadLength = 0;
 51      this._mask = undefined;
 52      this._fragmented = 0;
 53      this._masked = false;
 54      this._fin = false;
 55      this._opcode = 0;
 56  
 57      this._totalPayloadLength = 0;
 58      this._messageLength = 0;
 59      this._fragments = [];
 60  
 61      this._state = GET_INFO;
 62      this._loop = false;
 63    }
 64  
 65    /**
 66     * Implements `Writable.prototype._write()`.
 67     *
 68     * @param {Buffer} chunk The chunk of data to write
 69     * @param {String} encoding The character encoding of `chunk`
 70     * @param {Function} cb Callback
 71     * @private
 72     */
 73    _write(chunk, encoding, cb) {
 74      if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
 75  
 76      this._bufferedBytes += chunk.length;
 77      this._buffers.push(chunk);
 78      this.startLoop(cb);
 79    }
 80  
 81    /**
 82     * Consumes `n` bytes from the buffered data.
 83     *
 84     * @param {Number} n The number of bytes to consume
 85     * @return {Buffer} The consumed bytes
 86     * @private
 87     */
 88    consume(n) {
 89      this._bufferedBytes -= n;
 90  
 91      if (n === this._buffers[0].length) return this._buffers.shift();
 92  
 93      if (n < this._buffers[0].length) {
 94        const buf = this._buffers[0];
 95        this._buffers[0] = buf.slice(n);
 96        return buf.slice(0, n);
 97      }
 98  
 99      const dst = Buffer.allocUnsafe(n);
100  
101      do {
102        const buf = this._buffers[0];
103        const offset = dst.length - n;
104  
105        if (n >= buf.length) {
106          dst.set(this._buffers.shift(), offset);
107        } else {
108          dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
109          this._buffers[0] = buf.slice(n);
110        }
111  
112        n -= buf.length;
113      } while (n > 0);
114  
115      return dst;
116    }
117  
118    /**
119     * Starts the parsing loop.
120     *
121     * @param {Function} cb Callback
122     * @private
123     */
124    startLoop(cb) {
125      let err;
126      this._loop = true;
127  
128      do {
129        switch (this._state) {
130          case GET_INFO:
131            err = this.getInfo();
132            break;
133          case GET_PAYLOAD_LENGTH_16:
134            err = this.getPayloadLength16();
135            break;
136          case GET_PAYLOAD_LENGTH_64:
137            err = this.getPayloadLength64();
138            break;
139          case GET_MASK:
140            this.getMask();
141            break;
142          case GET_DATA:
143            err = this.getData(cb);
144            break;
145          default:
146            // `INFLATING`
147            this._loop = false;
148            return;
149        }
150      } while (this._loop);
151  
152      cb(err);
153    }
154  
155    /**
156     * Reads the first two bytes of a frame.
157     *
158     * @return {(RangeError|undefined)} A possible error
159     * @private
160     */
161    getInfo() {
162      if (this._bufferedBytes < 2) {
163        this._loop = false;
164        return;
165      }
166  
167      const buf = this.consume(2);
168  
169      if ((buf[0] & 0x30) !== 0x00) {
170        this._loop = false;
171        return error(
172          RangeError,
173          'RSV2 and RSV3 must be clear',
174          true,
175          1002,
176          'WS_ERR_UNEXPECTED_RSV_2_3'
177        );
178      }
179  
180      const compressed = (buf[0] & 0x40) === 0x40;
181  
182      if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
183        this._loop = false;
184        return error(
185          RangeError,
186          'RSV1 must be clear',
187          true,
188          1002,
189          'WS_ERR_UNEXPECTED_RSV_1'
190        );
191      }
192  
193      this._fin = (buf[0] & 0x80) === 0x80;
194      this._opcode = buf[0] & 0x0f;
195      this._payloadLength = buf[1] & 0x7f;
196  
197      if (this._opcode === 0x00) {
198        if (compressed) {
199          this._loop = false;
200          return error(
201            RangeError,
202            'RSV1 must be clear',
203            true,
204            1002,
205            'WS_ERR_UNEXPECTED_RSV_1'
206          );
207        }
208  
209        if (!this._fragmented) {
210          this._loop = false;
211          return error(
212            RangeError,
213            'invalid opcode 0',
214            true,
215            1002,
216            'WS_ERR_INVALID_OPCODE'
217          );
218        }
219  
220        this._opcode = this._fragmented;
221      } else if (this._opcode === 0x01 || this._opcode === 0x02) {
222        if (this._fragmented) {
223          this._loop = false;
224          return error(
225            RangeError,
226            `invalid opcode ${this._opcode}`,
227            true,
228            1002,
229            'WS_ERR_INVALID_OPCODE'
230          );
231        }
232  
233        this._compressed = compressed;
234      } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
235        if (!this._fin) {
236          this._loop = false;
237          return error(
238            RangeError,
239            'FIN must be set',
240            true,
241            1002,
242            'WS_ERR_EXPECTED_FIN'
243          );
244        }
245  
246        if (compressed) {
247          this._loop = false;
248          return error(
249            RangeError,
250            'RSV1 must be clear',
251            true,
252            1002,
253            'WS_ERR_UNEXPECTED_RSV_1'
254          );
255        }
256  
257        if (this._payloadLength > 0x7d) {
258          this._loop = false;
259          return error(
260            RangeError,
261            `invalid payload length ${this._payloadLength}`,
262            true,
263            1002,
264            'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
265          );
266        }
267      } else {
268        this._loop = false;
269        return error(
270          RangeError,
271          `invalid opcode ${this._opcode}`,
272          true,
273          1002,
274          'WS_ERR_INVALID_OPCODE'
275        );
276      }
277  
278      if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
279      this._masked = (buf[1] & 0x80) === 0x80;
280  
281      if (this._isServer) {
282        if (!this._masked) {
283          this._loop = false;
284          return error(
285            RangeError,
286            'MASK must be set',
287            true,
288            1002,
289            'WS_ERR_EXPECTED_MASK'
290          );
291        }
292      } else if (this._masked) {
293        this._loop = false;
294        return error(
295          RangeError,
296          'MASK must be clear',
297          true,
298          1002,
299          'WS_ERR_UNEXPECTED_MASK'
300        );
301      }
302  
303      if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
304      else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
305      else return this.haveLength();
306    }
307  
308    /**
309     * Gets extended payload length (7+16).
310     *
311     * @return {(RangeError|undefined)} A possible error
312     * @private
313     */
314    getPayloadLength16() {
315      if (this._bufferedBytes < 2) {
316        this._loop = false;
317        return;
318      }
319  
320      this._payloadLength = this.consume(2).readUInt16BE(0);
321      return this.haveLength();
322    }
323  
324    /**
325     * Gets extended payload length (7+64).
326     *
327     * @return {(RangeError|undefined)} A possible error
328     * @private
329     */
330    getPayloadLength64() {
331      if (this._bufferedBytes < 8) {
332        this._loop = false;
333        return;
334      }
335  
336      const buf = this.consume(8);
337      const num = buf.readUInt32BE(0);
338  
339      //
340      // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
341      // if payload length is greater than this number.
342      //
343      if (num > Math.pow(2, 53 - 32) - 1) {
344        this._loop = false;
345        return error(
346          RangeError,
347          'Unsupported WebSocket frame: payload length > 2^53 - 1',
348          false,
349          1009,
350          'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
351        );
352      }
353  
354      this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
355      return this.haveLength();
356    }
357  
358    /**
359     * Payload length has been read.
360     *
361     * @return {(RangeError|undefined)} A possible error
362     * @private
363     */
364    haveLength() {
365      if (this._payloadLength && this._opcode < 0x08) {
366        this._totalPayloadLength += this._payloadLength;
367        if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
368          this._loop = false;
369          return error(
370            RangeError,
371            'Max payload size exceeded',
372            false,
373            1009,
374            'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
375          );
376        }
377      }
378  
379      if (this._masked) this._state = GET_MASK;
380      else this._state = GET_DATA;
381    }
382  
383    /**
384     * Reads mask bytes.
385     *
386     * @private
387     */
388    getMask() {
389      if (this._bufferedBytes < 4) {
390        this._loop = false;
391        return;
392      }
393  
394      this._mask = this.consume(4);
395      this._state = GET_DATA;
396    }
397  
398    /**
399     * Reads data bytes.
400     *
401     * @param {Function} cb Callback
402     * @return {(Error|RangeError|undefined)} A possible error
403     * @private
404     */
405    getData(cb) {
406      let data = EMPTY_BUFFER;
407  
408      if (this._payloadLength) {
409        if (this._bufferedBytes < this._payloadLength) {
410          this._loop = false;
411          return;
412        }
413  
414        data = this.consume(this._payloadLength);
415        if (this._masked) unmask(data, this._mask);
416      }
417  
418      if (this._opcode > 0x07) return this.controlMessage(data);
419  
420      if (this._compressed) {
421        this._state = INFLATING;
422        this.decompress(data, cb);
423        return;
424      }
425  
426      if (data.length) {
427        //
428        // This message is not compressed so its lenght is the sum of the payload
429        // length of all fragments.
430        //
431        this._messageLength = this._totalPayloadLength;
432        this._fragments.push(data);
433      }
434  
435      return this.dataMessage();
436    }
437  
438    /**
439     * Decompresses data.
440     *
441     * @param {Buffer} data Compressed data
442     * @param {Function} cb Callback
443     * @private
444     */
445    decompress(data, cb) {
446      const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
447  
448      perMessageDeflate.decompress(data, this._fin, (err, buf) => {
449        if (err) return cb(err);
450  
451        if (buf.length) {
452          this._messageLength += buf.length;
453          if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
454            return cb(
455              error(
456                RangeError,
457                'Max payload size exceeded',
458                false,
459                1009,
460                'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
461              )
462            );
463          }
464  
465          this._fragments.push(buf);
466        }
467  
468        const er = this.dataMessage();
469        if (er) return cb(er);
470  
471        this.startLoop(cb);
472      });
473    }
474  
475    /**
476     * Handles a data message.
477     *
478     * @return {(Error|undefined)} A possible error
479     * @private
480     */
481    dataMessage() {
482      if (this._fin) {
483        const messageLength = this._messageLength;
484        const fragments = this._fragments;
485  
486        this._totalPayloadLength = 0;
487        this._messageLength = 0;
488        this._fragmented = 0;
489        this._fragments = [];
490  
491        if (this._opcode === 2) {
492          let data;
493  
494          if (this._binaryType === 'nodebuffer') {
495            data = concat(fragments, messageLength);
496          } else if (this._binaryType === 'arraybuffer') {
497            data = toArrayBuffer(concat(fragments, messageLength));
498          } else {
499            data = fragments;
500          }
501  
502          this.emit('message', data);
503        } else {
504          const buf = concat(fragments, messageLength);
505  
506          if (!isValidUTF8(buf)) {
507            this._loop = false;
508            return error(
509              Error,
510              'invalid UTF-8 sequence',
511              true,
512              1007,
513              'WS_ERR_INVALID_UTF8'
514            );
515          }
516  
517          this.emit('message', buf.toString());
518        }
519      }
520  
521      this._state = GET_INFO;
522    }
523  
524    /**
525     * Handles a control message.
526     *
527     * @param {Buffer} data Data to handle
528     * @return {(Error|RangeError|undefined)} A possible error
529     * @private
530     */
531    controlMessage(data) {
532      if (this._opcode === 0x08) {
533        this._loop = false;
534  
535        if (data.length === 0) {
536          this.emit('conclude', 1005, '');
537          this.end();
538        } else if (data.length === 1) {
539          return error(
540            RangeError,
541            'invalid payload length 1',
542            true,
543            1002,
544            'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
545          );
546        } else {
547          const code = data.readUInt16BE(0);
548  
549          if (!isValidStatusCode(code)) {
550            return error(
551              RangeError,
552              `invalid status code ${code}`,
553              true,
554              1002,
555              'WS_ERR_INVALID_CLOSE_CODE'
556            );
557          }
558  
559          const buf = data.slice(2);
560  
561          if (!isValidUTF8(buf)) {
562            return error(
563              Error,
564              'invalid UTF-8 sequence',
565              true,
566              1007,
567              'WS_ERR_INVALID_UTF8'
568            );
569          }
570  
571          this.emit('conclude', code, buf.toString());
572          this.end();
573        }
574      } else if (this._opcode === 0x09) {
575        this.emit('ping', data);
576      } else {
577        this.emit('pong', data);
578      }
579  
580      this._state = GET_INFO;
581    }
582  }
583  
584  module.exports = Receiver;
585  
586  /**
587   * Builds an error object.
588   *
589   * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
590   * @param {String} message The error message
591   * @param {Boolean} prefix Specifies whether or not to add a default prefix to
592   *     `message`
593   * @param {Number} statusCode The status code
594   * @param {String} errorCode The exposed error code
595   * @return {(Error|RangeError)} The error
596   * @private
597   */
598  function error(ErrorCtor, message, prefix, statusCode, errorCode) {
599    const err = new ErrorCtor(
600      prefix ? `Invalid WebSocket frame: ${message}` : message
601    );
602  
603    Error.captureStackTrace(err, error);
604    err.code = errorCode;
605    err[kStatusCode] = statusCode;
606    return err;
607  }