receiver.js
1 'use strict'; 2 3 const { Writable } = require('stream'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const { 7 BINARY_TYPES, 8 EMPTY_BUFFER, 9 kStatusCode, 10 kWebSocket 11 } = require('./constants'); 12 const { concat, toArrayBuffer, unmask } = require('./buffer-util'); 13 const { isValidStatusCode, isValidUTF8 } = require('./validation'); 14 15 const GET_INFO = 0; 16 const GET_PAYLOAD_LENGTH_16 = 1; 17 const GET_PAYLOAD_LENGTH_64 = 2; 18 const GET_MASK = 3; 19 const GET_DATA = 4; 20 const INFLATING = 5; 21 22 /** 23 * HyBi Receiver implementation. 24 * 25 * @extends Writable 26 */ 27 class Receiver extends Writable { 28 /** 29 * Creates a Receiver instance. 30 * 31 * @param {String} [binaryType=nodebuffer] The type for binary data 32 * @param {Object} [extensions] An object containing the negotiated extensions 33 * @param {Boolean} [isServer=false] Specifies whether to operate in client or 34 * server mode 35 * @param {Number} [maxPayload=0] The maximum allowed message length 36 */ 37 constructor(binaryType, extensions, isServer, maxPayload) { 38 super(); 39 40 this._binaryType = binaryType || BINARY_TYPES[0]; 41 this[kWebSocket] = undefined; 42 this._extensions = extensions || {}; 43 this._isServer = !!isServer; 44 this._maxPayload = maxPayload | 0; 45 46 this._bufferedBytes = 0; 47 this._buffers = []; 48 49 this._compressed = false; 50 this._payloadLength = 0; 51 this._mask = undefined; 52 this._fragmented = 0; 53 this._masked = false; 54 this._fin = false; 55 this._opcode = 0; 56 57 this._totalPayloadLength = 0; 58 this._messageLength = 0; 59 this._fragments = []; 60 61 this._state = GET_INFO; 62 this._loop = false; 63 } 64 65 /** 66 * Implements `Writable.prototype._write()`. 67 * 68 * @param {Buffer} chunk The chunk of data to write 69 * @param {String} encoding The character encoding of `chunk` 70 * @param {Function} cb Callback 71 * @private 72 */ 73 _write(chunk, encoding, cb) { 74 if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 75 76 this._bufferedBytes += chunk.length; 77 this._buffers.push(chunk); 78 this.startLoop(cb); 79 } 80 81 /** 82 * Consumes `n` bytes from the buffered data. 83 * 84 * @param {Number} n The number of bytes to consume 85 * @return {Buffer} The consumed bytes 86 * @private 87 */ 88 consume(n) { 89 this._bufferedBytes -= n; 90 91 if (n === this._buffers[0].length) return this._buffers.shift(); 92 93 if (n < this._buffers[0].length) { 94 const buf = this._buffers[0]; 95 this._buffers[0] = buf.slice(n); 96 return buf.slice(0, n); 97 } 98 99 const dst = Buffer.allocUnsafe(n); 100 101 do { 102 const buf = this._buffers[0]; 103 const offset = dst.length - n; 104 105 if (n >= buf.length) { 106 dst.set(this._buffers.shift(), offset); 107 } else { 108 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset); 109 this._buffers[0] = buf.slice(n); 110 } 111 112 n -= buf.length; 113 } while (n > 0); 114 115 return dst; 116 } 117 118 /** 119 * Starts the parsing loop. 120 * 121 * @param {Function} cb Callback 122 * @private 123 */ 124 startLoop(cb) { 125 let err; 126 this._loop = true; 127 128 do { 129 switch (this._state) { 130 case GET_INFO: 131 err = this.getInfo(); 132 break; 133 case GET_PAYLOAD_LENGTH_16: 134 err = this.getPayloadLength16(); 135 break; 136 case GET_PAYLOAD_LENGTH_64: 137 err = this.getPayloadLength64(); 138 break; 139 case GET_MASK: 140 this.getMask(); 141 break; 142 case GET_DATA: 143 err = this.getData(cb); 144 break; 145 default: 146 // `INFLATING` 147 this._loop = false; 148 return; 149 } 150 } while (this._loop); 151 152 cb(err); 153 } 154 155 /** 156 * Reads the first two bytes of a frame. 157 * 158 * @return {(RangeError|undefined)} A possible error 159 * @private 160 */ 161 getInfo() { 162 if (this._bufferedBytes < 2) { 163 this._loop = false; 164 return; 165 } 166 167 const buf = this.consume(2); 168 169 if ((buf[0] & 0x30) !== 0x00) { 170 this._loop = false; 171 return error( 172 RangeError, 173 'RSV2 and RSV3 must be clear', 174 true, 175 1002, 176 'WS_ERR_UNEXPECTED_RSV_2_3' 177 ); 178 } 179 180 const compressed = (buf[0] & 0x40) === 0x40; 181 182 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) { 183 this._loop = false; 184 return error( 185 RangeError, 186 'RSV1 must be clear', 187 true, 188 1002, 189 'WS_ERR_UNEXPECTED_RSV_1' 190 ); 191 } 192 193 this._fin = (buf[0] & 0x80) === 0x80; 194 this._opcode = buf[0] & 0x0f; 195 this._payloadLength = buf[1] & 0x7f; 196 197 if (this._opcode === 0x00) { 198 if (compressed) { 199 this._loop = false; 200 return error( 201 RangeError, 202 'RSV1 must be clear', 203 true, 204 1002, 205 'WS_ERR_UNEXPECTED_RSV_1' 206 ); 207 } 208 209 if (!this._fragmented) { 210 this._loop = false; 211 return error( 212 RangeError, 213 'invalid opcode 0', 214 true, 215 1002, 216 'WS_ERR_INVALID_OPCODE' 217 ); 218 } 219 220 this._opcode = this._fragmented; 221 } else if (this._opcode === 0x01 || this._opcode === 0x02) { 222 if (this._fragmented) { 223 this._loop = false; 224 return error( 225 RangeError, 226 `invalid opcode ${this._opcode}`, 227 true, 228 1002, 229 'WS_ERR_INVALID_OPCODE' 230 ); 231 } 232 233 this._compressed = compressed; 234 } else if (this._opcode > 0x07 && this._opcode < 0x0b) { 235 if (!this._fin) { 236 this._loop = false; 237 return error( 238 RangeError, 239 'FIN must be set', 240 true, 241 1002, 242 'WS_ERR_EXPECTED_FIN' 243 ); 244 } 245 246 if (compressed) { 247 this._loop = false; 248 return error( 249 RangeError, 250 'RSV1 must be clear', 251 true, 252 1002, 253 'WS_ERR_UNEXPECTED_RSV_1' 254 ); 255 } 256 257 if (this._payloadLength > 0x7d) { 258 this._loop = false; 259 return error( 260 RangeError, 261 `invalid payload length ${this._payloadLength}`, 262 true, 263 1002, 264 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH' 265 ); 266 } 267 } else { 268 this._loop = false; 269 return error( 270 RangeError, 271 `invalid opcode ${this._opcode}`, 272 true, 273 1002, 274 'WS_ERR_INVALID_OPCODE' 275 ); 276 } 277 278 if (!this._fin && !this._fragmented) this._fragmented = this._opcode; 279 this._masked = (buf[1] & 0x80) === 0x80; 280 281 if (this._isServer) { 282 if (!this._masked) { 283 this._loop = false; 284 return error( 285 RangeError, 286 'MASK must be set', 287 true, 288 1002, 289 'WS_ERR_EXPECTED_MASK' 290 ); 291 } 292 } else if (this._masked) { 293 this._loop = false; 294 return error( 295 RangeError, 296 'MASK must be clear', 297 true, 298 1002, 299 'WS_ERR_UNEXPECTED_MASK' 300 ); 301 } 302 303 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16; 304 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64; 305 else return this.haveLength(); 306 } 307 308 /** 309 * Gets extended payload length (7+16). 310 * 311 * @return {(RangeError|undefined)} A possible error 312 * @private 313 */ 314 getPayloadLength16() { 315 if (this._bufferedBytes < 2) { 316 this._loop = false; 317 return; 318 } 319 320 this._payloadLength = this.consume(2).readUInt16BE(0); 321 return this.haveLength(); 322 } 323 324 /** 325 * Gets extended payload length (7+64). 326 * 327 * @return {(RangeError|undefined)} A possible error 328 * @private 329 */ 330 getPayloadLength64() { 331 if (this._bufferedBytes < 8) { 332 this._loop = false; 333 return; 334 } 335 336 const buf = this.consume(8); 337 const num = buf.readUInt32BE(0); 338 339 // 340 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned 341 // if payload length is greater than this number. 342 // 343 if (num > Math.pow(2, 53 - 32) - 1) { 344 this._loop = false; 345 return error( 346 RangeError, 347 'Unsupported WebSocket frame: payload length > 2^53 - 1', 348 false, 349 1009, 350 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH' 351 ); 352 } 353 354 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); 355 return this.haveLength(); 356 } 357 358 /** 359 * Payload length has been read. 360 * 361 * @return {(RangeError|undefined)} A possible error 362 * @private 363 */ 364 haveLength() { 365 if (this._payloadLength && this._opcode < 0x08) { 366 this._totalPayloadLength += this._payloadLength; 367 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) { 368 this._loop = false; 369 return error( 370 RangeError, 371 'Max payload size exceeded', 372 false, 373 1009, 374 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' 375 ); 376 } 377 } 378 379 if (this._masked) this._state = GET_MASK; 380 else this._state = GET_DATA; 381 } 382 383 /** 384 * Reads mask bytes. 385 * 386 * @private 387 */ 388 getMask() { 389 if (this._bufferedBytes < 4) { 390 this._loop = false; 391 return; 392 } 393 394 this._mask = this.consume(4); 395 this._state = GET_DATA; 396 } 397 398 /** 399 * Reads data bytes. 400 * 401 * @param {Function} cb Callback 402 * @return {(Error|RangeError|undefined)} A possible error 403 * @private 404 */ 405 getData(cb) { 406 let data = EMPTY_BUFFER; 407 408 if (this._payloadLength) { 409 if (this._bufferedBytes < this._payloadLength) { 410 this._loop = false; 411 return; 412 } 413 414 data = this.consume(this._payloadLength); 415 if (this._masked) unmask(data, this._mask); 416 } 417 418 if (this._opcode > 0x07) return this.controlMessage(data); 419 420 if (this._compressed) { 421 this._state = INFLATING; 422 this.decompress(data, cb); 423 return; 424 } 425 426 if (data.length) { 427 // 428 // This message is not compressed so its lenght is the sum of the payload 429 // length of all fragments. 430 // 431 this._messageLength = this._totalPayloadLength; 432 this._fragments.push(data); 433 } 434 435 return this.dataMessage(); 436 } 437 438 /** 439 * Decompresses data. 440 * 441 * @param {Buffer} data Compressed data 442 * @param {Function} cb Callback 443 * @private 444 */ 445 decompress(data, cb) { 446 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 447 448 perMessageDeflate.decompress(data, this._fin, (err, buf) => { 449 if (err) return cb(err); 450 451 if (buf.length) { 452 this._messageLength += buf.length; 453 if (this._messageLength > this._maxPayload && this._maxPayload > 0) { 454 return cb( 455 error( 456 RangeError, 457 'Max payload size exceeded', 458 false, 459 1009, 460 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' 461 ) 462 ); 463 } 464 465 this._fragments.push(buf); 466 } 467 468 const er = this.dataMessage(); 469 if (er) return cb(er); 470 471 this.startLoop(cb); 472 }); 473 } 474 475 /** 476 * Handles a data message. 477 * 478 * @return {(Error|undefined)} A possible error 479 * @private 480 */ 481 dataMessage() { 482 if (this._fin) { 483 const messageLength = this._messageLength; 484 const fragments = this._fragments; 485 486 this._totalPayloadLength = 0; 487 this._messageLength = 0; 488 this._fragmented = 0; 489 this._fragments = []; 490 491 if (this._opcode === 2) { 492 let data; 493 494 if (this._binaryType === 'nodebuffer') { 495 data = concat(fragments, messageLength); 496 } else if (this._binaryType === 'arraybuffer') { 497 data = toArrayBuffer(concat(fragments, messageLength)); 498 } else { 499 data = fragments; 500 } 501 502 this.emit('message', data); 503 } else { 504 const buf = concat(fragments, messageLength); 505 506 if (!isValidUTF8(buf)) { 507 this._loop = false; 508 return error( 509 Error, 510 'invalid UTF-8 sequence', 511 true, 512 1007, 513 'WS_ERR_INVALID_UTF8' 514 ); 515 } 516 517 this.emit('message', buf.toString()); 518 } 519 } 520 521 this._state = GET_INFO; 522 } 523 524 /** 525 * Handles a control message. 526 * 527 * @param {Buffer} data Data to handle 528 * @return {(Error|RangeError|undefined)} A possible error 529 * @private 530 */ 531 controlMessage(data) { 532 if (this._opcode === 0x08) { 533 this._loop = false; 534 535 if (data.length === 0) { 536 this.emit('conclude', 1005, ''); 537 this.end(); 538 } else if (data.length === 1) { 539 return error( 540 RangeError, 541 'invalid payload length 1', 542 true, 543 1002, 544 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH' 545 ); 546 } else { 547 const code = data.readUInt16BE(0); 548 549 if (!isValidStatusCode(code)) { 550 return error( 551 RangeError, 552 `invalid status code ${code}`, 553 true, 554 1002, 555 'WS_ERR_INVALID_CLOSE_CODE' 556 ); 557 } 558 559 const buf = data.slice(2); 560 561 if (!isValidUTF8(buf)) { 562 return error( 563 Error, 564 'invalid UTF-8 sequence', 565 true, 566 1007, 567 'WS_ERR_INVALID_UTF8' 568 ); 569 } 570 571 this.emit('conclude', code, buf.toString()); 572 this.end(); 573 } 574 } else if (this._opcode === 0x09) { 575 this.emit('ping', data); 576 } else { 577 this.emit('pong', data); 578 } 579 580 this._state = GET_INFO; 581 } 582 } 583 584 module.exports = Receiver; 585 586 /** 587 * Builds an error object. 588 * 589 * @param {function(new:Error|RangeError)} ErrorCtor The error constructor 590 * @param {String} message The error message 591 * @param {Boolean} prefix Specifies whether or not to add a default prefix to 592 * `message` 593 * @param {Number} statusCode The status code 594 * @param {String} errorCode The exposed error code 595 * @return {(Error|RangeError)} The error 596 * @private 597 */ 598 function error(ErrorCtor, message, prefix, statusCode, errorCode) { 599 const err = new ErrorCtor( 600 prefix ? `Invalid WebSocket frame: ${message}` : message 601 ); 602 603 Error.captureStackTrace(err, error); 604 err.code = errorCode; 605 err[kStatusCode] = statusCode; 606 return err; 607 }