auto.js
1 'use strict'; 2 3 Object.defineProperty(exports, "__esModule", { 4 value: true 5 }); 6 exports.default = auto; 7 8 var _once = require('./internal/once'); 9 10 var _once2 = _interopRequireDefault(_once); 11 12 var _onlyOnce = require('./internal/onlyOnce'); 13 14 var _onlyOnce2 = _interopRequireDefault(_onlyOnce); 15 16 var _wrapAsync = require('./internal/wrapAsync'); 17 18 var _wrapAsync2 = _interopRequireDefault(_wrapAsync); 19 20 var _promiseCallback = require('./internal/promiseCallback'); 21 22 function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } 23 24 /** 25 * Determines the best order for running the {@link AsyncFunction}s in `tasks`, based on 26 * their requirements. Each function can optionally depend on other functions 27 * being completed first, and each function is run as soon as its requirements 28 * are satisfied. 29 * 30 * If any of the {@link AsyncFunction}s pass an error to their callback, the `auto` sequence 31 * will stop. Further tasks will not execute (so any other functions depending 32 * on it will not run), and the main `callback` is immediately called with the 33 * error. 34 * 35 * {@link AsyncFunction}s also receive an object containing the results of functions which 36 * have completed so far as the first argument, if they have dependencies. If a 37 * task function has no dependencies, it will only be passed a callback. 38 * 39 * @name auto 40 * @static 41 * @memberOf module:ControlFlow 42 * @method 43 * @category Control Flow 44 * @param {Object} tasks - An object. Each of its properties is either a 45 * function or an array of requirements, with the {@link AsyncFunction} itself the last item 46 * in the array. The object's key of a property serves as the name of the task 47 * defined by that property, i.e. can be used when specifying requirements for 48 * other tasks. The function receives one or two arguments: 49 * * a `results` object, containing the results of the previously executed 50 * functions, only passed if the task has any dependencies, 51 * * a `callback(err, result)` function, which must be called when finished, 52 * passing an `error` (which can be `null`) and the result of the function's 53 * execution. 54 * @param {number} [concurrency=Infinity] - An optional `integer` for 55 * determining the maximum number of tasks that can be run in parallel. By 56 * default, as many as possible. 57 * @param {Function} [callback] - An optional callback which is called when all 58 * the tasks have been completed. It receives the `err` argument if any `tasks` 59 * pass an error to their callback. Results are always returned; however, if an 60 * error occurs, no further `tasks` will be performed, and the results object 61 * will only contain partial results. Invoked with (err, results). 62 * @returns {Promise} a promise, if a callback is not passed 63 * @example 64 * 65 * async.auto({ 66 * // this function will just be passed a callback 67 * readData: async.apply(fs.readFile, 'data.txt', 'utf-8'), 68 * showData: ['readData', function(results, cb) { 69 * // results.readData is the file's contents 70 * // ... 71 * }] 72 * }, callback); 73 * 74 * async.auto({ 75 * get_data: function(callback) { 76 * console.log('in get_data'); 77 * // async code to get some data 78 * callback(null, 'data', 'converted to array'); 79 * }, 80 * make_folder: function(callback) { 81 * console.log('in make_folder'); 82 * // async code to create a directory to store a file in 83 * // this is run at the same time as getting the data 84 * callback(null, 'folder'); 85 * }, 86 * write_file: ['get_data', 'make_folder', function(results, callback) { 87 * console.log('in write_file', JSON.stringify(results)); 88 * // once there is some data and the directory exists, 89 * // write the data to a file in the directory 90 * callback(null, 'filename'); 91 * }], 92 * email_link: ['write_file', function(results, callback) { 93 * console.log('in email_link', JSON.stringify(results)); 94 * // once the file is written let's email a link to it... 95 * // results.write_file contains the filename returned by write_file. 96 * callback(null, {'file':results.write_file, 'email':'user@example.com'}); 97 * }] 98 * }, function(err, results) { 99 * console.log('err = ', err); 100 * console.log('results = ', results); 101 * }); 102 */ 103 function auto(tasks, concurrency, callback) { 104 if (typeof concurrency !== 'number') { 105 // concurrency is optional, shift the args. 106 callback = concurrency; 107 concurrency = null; 108 } 109 callback = (0, _once2.default)(callback || (0, _promiseCallback.promiseCallback)()); 110 var numTasks = Object.keys(tasks).length; 111 if (!numTasks) { 112 return callback(null); 113 } 114 if (!concurrency) { 115 concurrency = numTasks; 116 } 117 118 var results = {}; 119 var runningTasks = 0; 120 var canceled = false; 121 var hasError = false; 122 123 var listeners = Object.create(null); 124 125 var readyTasks = []; 126 127 // for cycle detection: 128 var readyToCheck = []; // tasks that have been identified as reachable 129 // without the possibility of returning to an ancestor task 130 var uncheckedDependencies = {}; 131 132 Object.keys(tasks).forEach(key => { 133 var task = tasks[key]; 134 if (!Array.isArray(task)) { 135 // no dependencies 136 enqueueTask(key, [task]); 137 readyToCheck.push(key); 138 return; 139 } 140 141 var dependencies = task.slice(0, task.length - 1); 142 var remainingDependencies = dependencies.length; 143 if (remainingDependencies === 0) { 144 enqueueTask(key, task); 145 readyToCheck.push(key); 146 return; 147 } 148 uncheckedDependencies[key] = remainingDependencies; 149 150 dependencies.forEach(dependencyName => { 151 if (!tasks[dependencyName]) { 152 throw new Error('async.auto task `' + key + '` has a non-existent dependency `' + dependencyName + '` in ' + dependencies.join(', ')); 153 } 154 addListener(dependencyName, () => { 155 remainingDependencies--; 156 if (remainingDependencies === 0) { 157 enqueueTask(key, task); 158 } 159 }); 160 }); 161 }); 162 163 checkForDeadlocks(); 164 processQueue(); 165 166 function enqueueTask(key, task) { 167 readyTasks.push(() => runTask(key, task)); 168 } 169 170 function processQueue() { 171 if (canceled) return; 172 if (readyTasks.length === 0 && runningTasks === 0) { 173 return callback(null, results); 174 } 175 while (readyTasks.length && runningTasks < concurrency) { 176 var run = readyTasks.shift(); 177 run(); 178 } 179 } 180 181 function addListener(taskName, fn) { 182 var taskListeners = listeners[taskName]; 183 if (!taskListeners) { 184 taskListeners = listeners[taskName] = []; 185 } 186 187 taskListeners.push(fn); 188 } 189 190 function taskComplete(taskName) { 191 var taskListeners = listeners[taskName] || []; 192 taskListeners.forEach(fn => fn()); 193 processQueue(); 194 } 195 196 function runTask(key, task) { 197 if (hasError) return; 198 199 var taskCallback = (0, _onlyOnce2.default)((err, ...result) => { 200 runningTasks--; 201 if (err === false) { 202 canceled = true; 203 return; 204 } 205 if (result.length < 2) { 206 [result] = result; 207 } 208 if (err) { 209 var safeResults = {}; 210 Object.keys(results).forEach(rkey => { 211 safeResults[rkey] = results[rkey]; 212 }); 213 safeResults[key] = result; 214 hasError = true; 215 listeners = Object.create(null); 216 if (canceled) return; 217 callback(err, safeResults); 218 } else { 219 results[key] = result; 220 taskComplete(key); 221 } 222 }); 223 224 runningTasks++; 225 var taskFn = (0, _wrapAsync2.default)(task[task.length - 1]); 226 if (task.length > 1) { 227 taskFn(results, taskCallback); 228 } else { 229 taskFn(taskCallback); 230 } 231 } 232 233 function checkForDeadlocks() { 234 // Kahn's algorithm 235 // https://en.wikipedia.org/wiki/Topological_sorting#Kahn.27s_algorithm 236 // http://connalle.blogspot.com/2013/10/topological-sortingkahn-algorithm.html 237 var currentTask; 238 var counter = 0; 239 while (readyToCheck.length) { 240 currentTask = readyToCheck.pop(); 241 counter++; 242 getDependents(currentTask).forEach(dependent => { 243 if (--uncheckedDependencies[dependent] === 0) { 244 readyToCheck.push(dependent); 245 } 246 }); 247 } 248 249 if (counter !== numTasks) { 250 throw new Error('async.auto cannot execute tasks due to a recursive dependency'); 251 } 252 } 253 254 function getDependents(taskName) { 255 var result = []; 256 Object.keys(tasks).forEach(key => { 257 const task = tasks[key]; 258 if (Array.isArray(task) && task.indexOf(taskName) >= 0) { 259 result.push(key); 260 } 261 }); 262 return result; 263 } 264 265 return callback[_promiseCallback.PROMISE_SYMBOL]; 266 } 267 module.exports = exports['default'];