stream.js
1 'use strict'; 2 const isStream = require('is-stream'); 3 const getStream = require('get-stream'); 4 const mergeStream = require('merge-stream'); 5 6 // `input` option 7 const handleInput = (spawned, input) => { 8 // Checking for stdin is workaround for https://github.com/nodejs/node/issues/26852 9 // TODO: Remove `|| spawned.stdin === undefined` once we drop support for Node.js <=12.2.0 10 if (input === undefined || spawned.stdin === undefined) { 11 return; 12 } 13 14 if (isStream(input)) { 15 input.pipe(spawned.stdin); 16 } else { 17 spawned.stdin.end(input); 18 } 19 }; 20 21 // `all` interleaves `stdout` and `stderr` 22 const makeAllStream = (spawned, {all}) => { 23 if (!all || (!spawned.stdout && !spawned.stderr)) { 24 return; 25 } 26 27 const mixed = mergeStream(); 28 29 if (spawned.stdout) { 30 mixed.add(spawned.stdout); 31 } 32 33 if (spawned.stderr) { 34 mixed.add(spawned.stderr); 35 } 36 37 return mixed; 38 }; 39 40 // On failure, `result.stdout|stderr|all` should contain the currently buffered stream 41 const getBufferedData = async (stream, streamPromise) => { 42 if (!stream) { 43 return; 44 } 45 46 stream.destroy(); 47 48 try { 49 return await streamPromise; 50 } catch (error) { 51 return error.bufferedData; 52 } 53 }; 54 55 const getStreamPromise = (stream, {encoding, buffer, maxBuffer}) => { 56 if (!stream || !buffer) { 57 return; 58 } 59 60 if (encoding) { 61 return getStream(stream, {encoding, maxBuffer}); 62 } 63 64 return getStream.buffer(stream, {maxBuffer}); 65 }; 66 67 // Retrieve result of child process: exit code, signal, error, streams (stdout/stderr/all) 68 const getSpawnedResult = async ({stdout, stderr, all}, {encoding, buffer, maxBuffer}, processDone) => { 69 const stdoutPromise = getStreamPromise(stdout, {encoding, buffer, maxBuffer}); 70 const stderrPromise = getStreamPromise(stderr, {encoding, buffer, maxBuffer}); 71 const allPromise = getStreamPromise(all, {encoding, buffer, maxBuffer: maxBuffer * 2}); 72 73 try { 74 return await Promise.all([processDone, stdoutPromise, stderrPromise, allPromise]); 75 } catch (error) { 76 return Promise.all([ 77 {error, signal: error.signal, timedOut: error.timedOut}, 78 getBufferedData(stdout, stdoutPromise), 79 getBufferedData(stderr, stderrPromise), 80 getBufferedData(all, allPromise) 81 ]); 82 } 83 }; 84 85 const validateInputSync = ({input}) => { 86 if (isStream(input)) { 87 throw new TypeError('The `input` option cannot be a stream in sync mode'); 88 } 89 }; 90 91 module.exports = { 92 handleInput, 93 makeAllStream, 94 getSpawnedResult, 95 validateInputSync 96 }; 97