stream.js
 1  'use strict';
 2  const isStream = require('is-stream');
 3  const getStream = require('get-stream');
 4  const mergeStream = require('merge-stream');
 5  
 6  // `input` option
 7  const handleInput = (spawned, input) => {
 8  	// Checking for stdin is workaround for https://github.com/nodejs/node/issues/26852
 9  	// TODO: Remove `|| spawned.stdin === undefined` once we drop support for Node.js <=12.2.0
10  	if (input === undefined || spawned.stdin === undefined) {
11  		return;
12  	}
13  
14  	if (isStream(input)) {
15  		input.pipe(spawned.stdin);
16  	} else {
17  		spawned.stdin.end(input);
18  	}
19  };
20  
21  // `all` interleaves `stdout` and `stderr`
22  const makeAllStream = (spawned, {all}) => {
23  	if (!all || (!spawned.stdout && !spawned.stderr)) {
24  		return;
25  	}
26  
27  	const mixed = mergeStream();
28  
29  	if (spawned.stdout) {
30  		mixed.add(spawned.stdout);
31  	}
32  
33  	if (spawned.stderr) {
34  		mixed.add(spawned.stderr);
35  	}
36  
37  	return mixed;
38  };
39  
40  // On failure, `result.stdout|stderr|all` should contain the currently buffered stream
41  const getBufferedData = async (stream, streamPromise) => {
42  	if (!stream) {
43  		return;
44  	}
45  
46  	stream.destroy();
47  
48  	try {
49  		return await streamPromise;
50  	} catch (error) {
51  		return error.bufferedData;
52  	}
53  };
54  
55  const getStreamPromise = (stream, {encoding, buffer, maxBuffer}) => {
56  	if (!stream || !buffer) {
57  		return;
58  	}
59  
60  	if (encoding) {
61  		return getStream(stream, {encoding, maxBuffer});
62  	}
63  
64  	return getStream.buffer(stream, {maxBuffer});
65  };
66  
67  // Retrieve result of child process: exit code, signal, error, streams (stdout/stderr/all)
68  const getSpawnedResult = async ({stdout, stderr, all}, {encoding, buffer, maxBuffer}, processDone) => {
69  	const stdoutPromise = getStreamPromise(stdout, {encoding, buffer, maxBuffer});
70  	const stderrPromise = getStreamPromise(stderr, {encoding, buffer, maxBuffer});
71  	const allPromise = getStreamPromise(all, {encoding, buffer, maxBuffer: maxBuffer * 2});
72  
73  	try {
74  		return await Promise.all([processDone, stdoutPromise, stderrPromise, allPromise]);
75  	} catch (error) {
76  		return Promise.all([
77  			{error, signal: error.signal, timedOut: error.timedOut},
78  			getBufferedData(stdout, stdoutPromise),
79  			getBufferedData(stderr, stderrPromise),
80  			getBufferedData(all, allPromise)
81  		]);
82  	}
83  };
84  
85  const validateInputSync = ({input}) => {
86  	if (isStream(input)) {
87  		throw new TypeError('The `input` option cannot be a stream in sync mode');
88  	}
89  };
90  
91  module.exports = {
92  	handleInput,
93  	makeAllStream,
94  	getSpawnedResult,
95  	validateInputSync
96  };
97