_stream_writable.js
  1  // Copyright Joyent, Inc. and other Node contributors.
  2  //
  3  // Permission is hereby granted, free of charge, to any person obtaining a
  4  // copy of this software and associated documentation files (the
  5  // "Software"), to deal in the Software without restriction, including
  6  // without limitation the rights to use, copy, modify, merge, publish,
  7  // distribute, sublicense, and/or sell copies of the Software, and to permit
  8  // persons to whom the Software is furnished to do so, subject to the
  9  // following conditions:
 10  //
 11  // The above copyright notice and this permission notice shall be included
 12  // in all copies or substantial portions of the Software.
 13  //
 14  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 15  // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 16  // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 17  // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 18  // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 19  // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 20  // USE OR OTHER DEALINGS IN THE SOFTWARE.
 21  // A bit simpler than readable streams.
 22  // Implement an async ._write(chunk, encoding, cb), and it'll handle all
 23  // the drain event emission and buffering.
 24  'use strict';
 25  
 26  module.exports = Writable;
 27  /* <replacement> */
 28  
 29  function WriteReq(chunk, encoding, cb) {
 30    this.chunk = chunk;
 31    this.encoding = encoding;
 32    this.callback = cb;
 33    this.next = null;
 34  } // It seems a linked list but it is not
 35  // there will be only 2 of these for each stream
 36  
 37  
 38  function CorkedRequest(state) {
 39    var _this = this;
 40  
 41    this.next = null;
 42    this.entry = null;
 43  
 44    this.finish = function () {
 45      onCorkedFinish(_this, state);
 46    };
 47  }
 48  /* </replacement> */
 49  
 50  /*<replacement>*/
 51  
 52  
 53  var Duplex;
 54  /*</replacement>*/
 55  
 56  Writable.WritableState = WritableState;
 57  /*<replacement>*/
 58  
 59  var internalUtil = {
 60    deprecate: require('util-deprecate')
 61  };
 62  /*</replacement>*/
 63  
 64  /*<replacement>*/
 65  
 66  var Stream = require('./internal/streams/stream');
 67  /*</replacement>*/
 68  
 69  
 70  var Buffer = require('buffer').Buffer;
 71  
 72  var OurUint8Array = global.Uint8Array || function () {};
 73  
 74  function _uint8ArrayToBuffer(chunk) {
 75    return Buffer.from(chunk);
 76  }
 77  
 78  function _isUint8Array(obj) {
 79    return Buffer.isBuffer(obj) || obj instanceof OurUint8Array;
 80  }
 81  
 82  var destroyImpl = require('./internal/streams/destroy');
 83  
 84  var _require = require('./internal/streams/state'),
 85      getHighWaterMark = _require.getHighWaterMark;
 86  
 87  var _require$codes = require('../errors').codes,
 88      ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
 89      ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
 90      ERR_MULTIPLE_CALLBACK = _require$codes.ERR_MULTIPLE_CALLBACK,
 91      ERR_STREAM_CANNOT_PIPE = _require$codes.ERR_STREAM_CANNOT_PIPE,
 92      ERR_STREAM_DESTROYED = _require$codes.ERR_STREAM_DESTROYED,
 93      ERR_STREAM_NULL_VALUES = _require$codes.ERR_STREAM_NULL_VALUES,
 94      ERR_STREAM_WRITE_AFTER_END = _require$codes.ERR_STREAM_WRITE_AFTER_END,
 95      ERR_UNKNOWN_ENCODING = _require$codes.ERR_UNKNOWN_ENCODING;
 96  
 97  var errorOrDestroy = destroyImpl.errorOrDestroy;
 98  
 99  require('inherits')(Writable, Stream);
100  
101  function nop() {}
102  
103  function WritableState(options, stream, isDuplex) {
104    Duplex = Duplex || require('./_stream_duplex');
105    options = options || {}; // Duplex streams are both readable and writable, but share
106    // the same options object.
107    // However, some cases require setting options to different
108    // values for the readable and the writable sides of the duplex stream,
109    // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
110  
111    if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof Duplex; // object stream flag to indicate whether or not this stream
112    // contains buffers or objects.
113  
114    this.objectMode = !!options.objectMode;
115    if (isDuplex) this.objectMode = this.objectMode || !!options.writableObjectMode; // the point at which write() starts returning false
116    // Note: 0 is a valid value, means that we always return false if
117    // the entire buffer is not flushed immediately on write()
118  
119    this.highWaterMark = getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex); // if _final has been called
120  
121    this.finalCalled = false; // drain event flag.
122  
123    this.needDrain = false; // at the start of calling end()
124  
125    this.ending = false; // when end() has been called, and returned
126  
127    this.ended = false; // when 'finish' is emitted
128  
129    this.finished = false; // has it been destroyed
130  
131    this.destroyed = false; // should we decode strings into buffers before passing to _write?
132    // this is here so that some node-core streams can optimize string
133    // handling at a lower level.
134  
135    var noDecode = options.decodeStrings === false;
136    this.decodeStrings = !noDecode; // Crypto is kind of old and crusty.  Historically, its default string
137    // encoding is 'binary' so we have to make this configurable.
138    // Everything else in the universe uses 'utf8', though.
139  
140    this.defaultEncoding = options.defaultEncoding || 'utf8'; // not an actual buffer we keep track of, but a measurement
141    // of how much we're waiting to get pushed to some underlying
142    // socket or file.
143  
144    this.length = 0; // a flag to see when we're in the middle of a write.
145  
146    this.writing = false; // when true all writes will be buffered until .uncork() call
147  
148    this.corked = 0; // a flag to be able to tell if the onwrite cb is called immediately,
149    // or on a later tick.  We set this to true at first, because any
150    // actions that shouldn't happen until "later" should generally also
151    // not happen before the first write call.
152  
153    this.sync = true; // a flag to know if we're processing previously buffered items, which
154    // may call the _write() callback in the same tick, so that we don't
155    // end up in an overlapped onwrite situation.
156  
157    this.bufferProcessing = false; // the callback that's passed to _write(chunk,cb)
158  
159    this.onwrite = function (er) {
160      onwrite(stream, er);
161    }; // the callback that the user supplies to write(chunk,encoding,cb)
162  
163  
164    this.writecb = null; // the amount that is being written when _write is called.
165  
166    this.writelen = 0;
167    this.bufferedRequest = null;
168    this.lastBufferedRequest = null; // number of pending user-supplied write callbacks
169    // this must be 0 before 'finish' can be emitted
170  
171    this.pendingcb = 0; // emit prefinish if the only thing we're waiting for is _write cbs
172    // This is relevant for synchronous Transform streams
173  
174    this.prefinished = false; // True if the error was already emitted and should not be thrown again
175  
176    this.errorEmitted = false; // Should close be emitted on destroy. Defaults to true.
177  
178    this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'finish' (and potentially 'end')
179  
180    this.autoDestroy = !!options.autoDestroy; // count buffered requests
181  
182    this.bufferedRequestCount = 0; // allocate the first CorkedRequest, there is always
183    // one allocated and free to use, and we maintain at most two
184  
185    this.corkedRequestsFree = new CorkedRequest(this);
186  }
187  
188  WritableState.prototype.getBuffer = function getBuffer() {
189    var current = this.bufferedRequest;
190    var out = [];
191  
192    while (current) {
193      out.push(current);
194      current = current.next;
195    }
196  
197    return out;
198  };
199  
200  (function () {
201    try {
202      Object.defineProperty(WritableState.prototype, 'buffer', {
203        get: internalUtil.deprecate(function writableStateBufferGetter() {
204          return this.getBuffer();
205        }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.', 'DEP0003')
206      });
207    } catch (_) {}
208  })(); // Test _writableState for inheritance to account for Duplex streams,
209  // whose prototype chain only points to Readable.
210  
211  
212  var realHasInstance;
213  
214  if (typeof Symbol === 'function' && Symbol.hasInstance && typeof Function.prototype[Symbol.hasInstance] === 'function') {
215    realHasInstance = Function.prototype[Symbol.hasInstance];
216    Object.defineProperty(Writable, Symbol.hasInstance, {
217      value: function value(object) {
218        if (realHasInstance.call(this, object)) return true;
219        if (this !== Writable) return false;
220        return object && object._writableState instanceof WritableState;
221      }
222    });
223  } else {
224    realHasInstance = function realHasInstance(object) {
225      return object instanceof this;
226    };
227  }
228  
229  function Writable(options) {
230    Duplex = Duplex || require('./_stream_duplex'); // Writable ctor is applied to Duplexes, too.
231    // `realHasInstance` is necessary because using plain `instanceof`
232    // would return false, as no `_writableState` property is attached.
233    // Trying to use the custom `instanceof` for Writable here will also break the
234    // Node.js LazyTransform implementation, which has a non-trivial getter for
235    // `_writableState` that would lead to infinite recursion.
236    // Checking for a Stream.Duplex instance is faster here instead of inside
237    // the WritableState constructor, at least with V8 6.5
238  
239    var isDuplex = this instanceof Duplex;
240    if (!isDuplex && !realHasInstance.call(Writable, this)) return new Writable(options);
241    this._writableState = new WritableState(options, this, isDuplex); // legacy.
242  
243    this.writable = true;
244  
245    if (options) {
246      if (typeof options.write === 'function') this._write = options.write;
247      if (typeof options.writev === 'function') this._writev = options.writev;
248      if (typeof options.destroy === 'function') this._destroy = options.destroy;
249      if (typeof options.final === 'function') this._final = options.final;
250    }
251  
252    Stream.call(this);
253  } // Otherwise people can pipe Writable streams, which is just wrong.
254  
255  
256  Writable.prototype.pipe = function () {
257    errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
258  };
259  
260  function writeAfterEnd(stream, cb) {
261    var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb
262  
263    errorOrDestroy(stream, er);
264    process.nextTick(cb, er);
265  } // Checks that a user-supplied chunk is valid, especially for the particular
266  // mode the stream is in. Currently this means that `null` is never accepted
267  // and undefined/non-string values are only allowed in object mode.
268  
269  
270  function validChunk(stream, state, chunk, cb) {
271    var er;
272  
273    if (chunk === null) {
274      er = new ERR_STREAM_NULL_VALUES();
275    } else if (typeof chunk !== 'string' && !state.objectMode) {
276      er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
277    }
278  
279    if (er) {
280      errorOrDestroy(stream, er);
281      process.nextTick(cb, er);
282      return false;
283    }
284  
285    return true;
286  }
287  
288  Writable.prototype.write = function (chunk, encoding, cb) {
289    var state = this._writableState;
290    var ret = false;
291  
292    var isBuf = !state.objectMode && _isUint8Array(chunk);
293  
294    if (isBuf && !Buffer.isBuffer(chunk)) {
295      chunk = _uint8ArrayToBuffer(chunk);
296    }
297  
298    if (typeof encoding === 'function') {
299      cb = encoding;
300      encoding = null;
301    }
302  
303    if (isBuf) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
304    if (typeof cb !== 'function') cb = nop;
305    if (state.ending) writeAfterEnd(this, cb);else if (isBuf || validChunk(this, state, chunk, cb)) {
306      state.pendingcb++;
307      ret = writeOrBuffer(this, state, isBuf, chunk, encoding, cb);
308    }
309    return ret;
310  };
311  
312  Writable.prototype.cork = function () {
313    this._writableState.corked++;
314  };
315  
316  Writable.prototype.uncork = function () {
317    var state = this._writableState;
318  
319    if (state.corked) {
320      state.corked--;
321      if (!state.writing && !state.corked && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
322    }
323  };
324  
325  Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
326    // node::ParseEncoding() requires lower case.
327    if (typeof encoding === 'string') encoding = encoding.toLowerCase();
328    if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new ERR_UNKNOWN_ENCODING(encoding);
329    this._writableState.defaultEncoding = encoding;
330    return this;
331  };
332  
333  Object.defineProperty(Writable.prototype, 'writableBuffer', {
334    // making it explicit this property is not enumerable
335    // because otherwise some prototype manipulation in
336    // userland will fail
337    enumerable: false,
338    get: function get() {
339      return this._writableState && this._writableState.getBuffer();
340    }
341  });
342  
343  function decodeChunk(state, chunk, encoding) {
344    if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
345      chunk = Buffer.from(chunk, encoding);
346    }
347  
348    return chunk;
349  }
350  
351  Object.defineProperty(Writable.prototype, 'writableHighWaterMark', {
352    // making it explicit this property is not enumerable
353    // because otherwise some prototype manipulation in
354    // userland will fail
355    enumerable: false,
356    get: function get() {
357      return this._writableState.highWaterMark;
358    }
359  }); // if we're already writing something, then just put this
360  // in the queue, and wait our turn.  Otherwise, call _write
361  // If we return false, then we need a drain event, so set that flag.
362  
363  function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {
364    if (!isBuf) {
365      var newChunk = decodeChunk(state, chunk, encoding);
366  
367      if (chunk !== newChunk) {
368        isBuf = true;
369        encoding = 'buffer';
370        chunk = newChunk;
371      }
372    }
373  
374    var len = state.objectMode ? 1 : chunk.length;
375    state.length += len;
376    var ret = state.length < state.highWaterMark; // we must ensure that previous needDrain will not be reset to false.
377  
378    if (!ret) state.needDrain = true;
379  
380    if (state.writing || state.corked) {
381      var last = state.lastBufferedRequest;
382      state.lastBufferedRequest = {
383        chunk: chunk,
384        encoding: encoding,
385        isBuf: isBuf,
386        callback: cb,
387        next: null
388      };
389  
390      if (last) {
391        last.next = state.lastBufferedRequest;
392      } else {
393        state.bufferedRequest = state.lastBufferedRequest;
394      }
395  
396      state.bufferedRequestCount += 1;
397    } else {
398      doWrite(stream, state, false, len, chunk, encoding, cb);
399    }
400  
401    return ret;
402  }
403  
404  function doWrite(stream, state, writev, len, chunk, encoding, cb) {
405    state.writelen = len;
406    state.writecb = cb;
407    state.writing = true;
408    state.sync = true;
409    if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
410    state.sync = false;
411  }
412  
413  function onwriteError(stream, state, sync, er, cb) {
414    --state.pendingcb;
415  
416    if (sync) {
417      // defer the callback if we are being called synchronously
418      // to avoid piling up things on the stack
419      process.nextTick(cb, er); // this can emit finish, and it will always happen
420      // after error
421  
422      process.nextTick(finishMaybe, stream, state);
423      stream._writableState.errorEmitted = true;
424      errorOrDestroy(stream, er);
425    } else {
426      // the caller expect this to happen before if
427      // it is async
428      cb(er);
429      stream._writableState.errorEmitted = true;
430      errorOrDestroy(stream, er); // this can emit finish, but finish must
431      // always follow error
432  
433      finishMaybe(stream, state);
434    }
435  }
436  
437  function onwriteStateUpdate(state) {
438    state.writing = false;
439    state.writecb = null;
440    state.length -= state.writelen;
441    state.writelen = 0;
442  }
443  
444  function onwrite(stream, er) {
445    var state = stream._writableState;
446    var sync = state.sync;
447    var cb = state.writecb;
448    if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
449    onwriteStateUpdate(state);
450    if (er) onwriteError(stream, state, sync, er, cb);else {
451      // Check if we're actually ready to finish, but don't emit yet
452      var finished = needFinish(state) || stream.destroyed;
453  
454      if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
455        clearBuffer(stream, state);
456      }
457  
458      if (sync) {
459        process.nextTick(afterWrite, stream, state, finished, cb);
460      } else {
461        afterWrite(stream, state, finished, cb);
462      }
463    }
464  }
465  
466  function afterWrite(stream, state, finished, cb) {
467    if (!finished) onwriteDrain(stream, state);
468    state.pendingcb--;
469    cb();
470    finishMaybe(stream, state);
471  } // Must force callback to be called on nextTick, so that we don't
472  // emit 'drain' before the write() consumer gets the 'false' return
473  // value, and has a chance to attach a 'drain' listener.
474  
475  
476  function onwriteDrain(stream, state) {
477    if (state.length === 0 && state.needDrain) {
478      state.needDrain = false;
479      stream.emit('drain');
480    }
481  } // if there's something in the buffer waiting, then process it
482  
483  
484  function clearBuffer(stream, state) {
485    state.bufferProcessing = true;
486    var entry = state.bufferedRequest;
487  
488    if (stream._writev && entry && entry.next) {
489      // Fast case, write everything using _writev()
490      var l = state.bufferedRequestCount;
491      var buffer = new Array(l);
492      var holder = state.corkedRequestsFree;
493      holder.entry = entry;
494      var count = 0;
495      var allBuffers = true;
496  
497      while (entry) {
498        buffer[count] = entry;
499        if (!entry.isBuf) allBuffers = false;
500        entry = entry.next;
501        count += 1;
502      }
503  
504      buffer.allBuffers = allBuffers;
505      doWrite(stream, state, true, state.length, buffer, '', holder.finish); // doWrite is almost always async, defer these to save a bit of time
506      // as the hot path ends with doWrite
507  
508      state.pendingcb++;
509      state.lastBufferedRequest = null;
510  
511      if (holder.next) {
512        state.corkedRequestsFree = holder.next;
513        holder.next = null;
514      } else {
515        state.corkedRequestsFree = new CorkedRequest(state);
516      }
517  
518      state.bufferedRequestCount = 0;
519    } else {
520      // Slow case, write chunks one-by-one
521      while (entry) {
522        var chunk = entry.chunk;
523        var encoding = entry.encoding;
524        var cb = entry.callback;
525        var len = state.objectMode ? 1 : chunk.length;
526        doWrite(stream, state, false, len, chunk, encoding, cb);
527        entry = entry.next;
528        state.bufferedRequestCount--; // if we didn't call the onwrite immediately, then
529        // it means that we need to wait until it does.
530        // also, that means that the chunk and cb are currently
531        // being processed, so move the buffer counter past them.
532  
533        if (state.writing) {
534          break;
535        }
536      }
537  
538      if (entry === null) state.lastBufferedRequest = null;
539    }
540  
541    state.bufferedRequest = entry;
542    state.bufferProcessing = false;
543  }
544  
545  Writable.prototype._write = function (chunk, encoding, cb) {
546    cb(new ERR_METHOD_NOT_IMPLEMENTED('_write()'));
547  };
548  
549  Writable.prototype._writev = null;
550  
551  Writable.prototype.end = function (chunk, encoding, cb) {
552    var state = this._writableState;
553  
554    if (typeof chunk === 'function') {
555      cb = chunk;
556      chunk = null;
557      encoding = null;
558    } else if (typeof encoding === 'function') {
559      cb = encoding;
560      encoding = null;
561    }
562  
563    if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); // .end() fully uncorks
564  
565    if (state.corked) {
566      state.corked = 1;
567      this.uncork();
568    } // ignore unnecessary end() calls.
569  
570  
571    if (!state.ending) endWritable(this, state, cb);
572    return this;
573  };
574  
575  Object.defineProperty(Writable.prototype, 'writableLength', {
576    // making it explicit this property is not enumerable
577    // because otherwise some prototype manipulation in
578    // userland will fail
579    enumerable: false,
580    get: function get() {
581      return this._writableState.length;
582    }
583  });
584  
585  function needFinish(state) {
586    return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
587  }
588  
589  function callFinal(stream, state) {
590    stream._final(function (err) {
591      state.pendingcb--;
592  
593      if (err) {
594        errorOrDestroy(stream, err);
595      }
596  
597      state.prefinished = true;
598      stream.emit('prefinish');
599      finishMaybe(stream, state);
600    });
601  }
602  
603  function prefinish(stream, state) {
604    if (!state.prefinished && !state.finalCalled) {
605      if (typeof stream._final === 'function' && !state.destroyed) {
606        state.pendingcb++;
607        state.finalCalled = true;
608        process.nextTick(callFinal, stream, state);
609      } else {
610        state.prefinished = true;
611        stream.emit('prefinish');
612      }
613    }
614  }
615  
616  function finishMaybe(stream, state) {
617    var need = needFinish(state);
618  
619    if (need) {
620      prefinish(stream, state);
621  
622      if (state.pendingcb === 0) {
623        state.finished = true;
624        stream.emit('finish');
625  
626        if (state.autoDestroy) {
627          // In case of duplex streams we need a way to detect
628          // if the readable side is ready for autoDestroy as well
629          var rState = stream._readableState;
630  
631          if (!rState || rState.autoDestroy && rState.endEmitted) {
632            stream.destroy();
633          }
634        }
635      }
636    }
637  
638    return need;
639  }
640  
641  function endWritable(stream, state, cb) {
642    state.ending = true;
643    finishMaybe(stream, state);
644  
645    if (cb) {
646      if (state.finished) process.nextTick(cb);else stream.once('finish', cb);
647    }
648  
649    state.ended = true;
650    stream.writable = false;
651  }
652  
653  function onCorkedFinish(corkReq, state, err) {
654    var entry = corkReq.entry;
655    corkReq.entry = null;
656  
657    while (entry) {
658      var cb = entry.callback;
659      state.pendingcb--;
660      cb(err);
661      entry = entry.next;
662    } // reuse the free corkReq.
663  
664  
665    state.corkedRequestsFree.next = corkReq;
666  }
667  
668  Object.defineProperty(Writable.prototype, 'destroyed', {
669    // making it explicit this property is not enumerable
670    // because otherwise some prototype manipulation in
671    // userland will fail
672    enumerable: false,
673    get: function get() {
674      if (this._writableState === undefined) {
675        return false;
676      }
677  
678      return this._writableState.destroyed;
679    },
680    set: function set(value) {
681      // we ignore the value if the stream
682      // has not been initialized yet
683      if (!this._writableState) {
684        return;
685      } // backward compatibility, the user is explicitly
686      // managing destroyed
687  
688  
689      this._writableState.destroyed = value;
690    }
691  });
692  Writable.prototype.destroy = destroyImpl.destroy;
693  Writable.prototype._undestroy = destroyImpl.undestroy;
694  
695  Writable.prototype._destroy = function (err, cb) {
696    cb(err);
697  };