auto.js
  1  'use strict';
  2  
  3  Object.defineProperty(exports, "__esModule", {
  4      value: true
  5  });
  6  exports.default = auto;
  7  
  8  var _once = require('./internal/once');
  9  
 10  var _once2 = _interopRequireDefault(_once);
 11  
 12  var _onlyOnce = require('./internal/onlyOnce');
 13  
 14  var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
 15  
 16  var _wrapAsync = require('./internal/wrapAsync');
 17  
 18  var _wrapAsync2 = _interopRequireDefault(_wrapAsync);
 19  
 20  var _promiseCallback = require('./internal/promiseCallback');
 21  
 22  function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
 23  
 24  /**
 25   * Determines the best order for running the {@link AsyncFunction}s in `tasks`, based on
 26   * their requirements. Each function can optionally depend on other functions
 27   * being completed first, and each function is run as soon as its requirements
 28   * are satisfied.
 29   *
 30   * If any of the {@link AsyncFunction}s pass an error to their callback, the `auto` sequence
 31   * will stop. Further tasks will not execute (so any other functions depending
 32   * on it will not run), and the main `callback` is immediately called with the
 33   * error.
 34   *
 35   * {@link AsyncFunction}s also receive an object containing the results of functions which
 36   * have completed so far as the first argument, if they have dependencies. If a
 37   * task function has no dependencies, it will only be passed a callback.
 38   *
 39   * @name auto
 40   * @static
 41   * @memberOf module:ControlFlow
 42   * @method
 43   * @category Control Flow
 44   * @param {Object} tasks - An object. Each of its properties is either a
 45   * function or an array of requirements, with the {@link AsyncFunction} itself the last item
 46   * in the array. The object's key of a property serves as the name of the task
 47   * defined by that property, i.e. can be used when specifying requirements for
 48   * other tasks. The function receives one or two arguments:
 49   * * a `results` object, containing the results of the previously executed
 50   *   functions, only passed if the task has any dependencies,
 51   * * a `callback(err, result)` function, which must be called when finished,
 52   *   passing an `error` (which can be `null`) and the result of the function's
 53   *   execution.
 54   * @param {number} [concurrency=Infinity] - An optional `integer` for
 55   * determining the maximum number of tasks that can be run in parallel. By
 56   * default, as many as possible.
 57   * @param {Function} [callback] - An optional callback which is called when all
 58   * the tasks have been completed. It receives the `err` argument if any `tasks`
 59   * pass an error to their callback. Results are always returned; however, if an
 60   * error occurs, no further `tasks` will be performed, and the results object
 61   * will only contain partial results. Invoked with (err, results).
 62   * @returns {Promise} a promise, if a callback is not passed
 63   * @example
 64   *
 65   * async.auto({
 66   *     // this function will just be passed a callback
 67   *     readData: async.apply(fs.readFile, 'data.txt', 'utf-8'),
 68   *     showData: ['readData', function(results, cb) {
 69   *         // results.readData is the file's contents
 70   *         // ...
 71   *     }]
 72   * }, callback);
 73   *
 74   * async.auto({
 75   *     get_data: function(callback) {
 76   *         console.log('in get_data');
 77   *         // async code to get some data
 78   *         callback(null, 'data', 'converted to array');
 79   *     },
 80   *     make_folder: function(callback) {
 81   *         console.log('in make_folder');
 82   *         // async code to create a directory to store a file in
 83   *         // this is run at the same time as getting the data
 84   *         callback(null, 'folder');
 85   *     },
 86   *     write_file: ['get_data', 'make_folder', function(results, callback) {
 87   *         console.log('in write_file', JSON.stringify(results));
 88   *         // once there is some data and the directory exists,
 89   *         // write the data to a file in the directory
 90   *         callback(null, 'filename');
 91   *     }],
 92   *     email_link: ['write_file', function(results, callback) {
 93   *         console.log('in email_link', JSON.stringify(results));
 94   *         // once the file is written let's email a link to it...
 95   *         // results.write_file contains the filename returned by write_file.
 96   *         callback(null, {'file':results.write_file, 'email':'user@example.com'});
 97   *     }]
 98   * }, function(err, results) {
 99   *     console.log('err = ', err);
100   *     console.log('results = ', results);
101   * });
102   */
103  function auto(tasks, concurrency, callback) {
104      if (typeof concurrency !== 'number') {
105          // concurrency is optional, shift the args.
106          callback = concurrency;
107          concurrency = null;
108      }
109      callback = (0, _once2.default)(callback || (0, _promiseCallback.promiseCallback)());
110      var numTasks = Object.keys(tasks).length;
111      if (!numTasks) {
112          return callback(null);
113      }
114      if (!concurrency) {
115          concurrency = numTasks;
116      }
117  
118      var results = {};
119      var runningTasks = 0;
120      var canceled = false;
121      var hasError = false;
122  
123      var listeners = Object.create(null);
124  
125      var readyTasks = [];
126  
127      // for cycle detection:
128      var readyToCheck = []; // tasks that have been identified as reachable
129      // without the possibility of returning to an ancestor task
130      var uncheckedDependencies = {};
131  
132      Object.keys(tasks).forEach(key => {
133          var task = tasks[key];
134          if (!Array.isArray(task)) {
135              // no dependencies
136              enqueueTask(key, [task]);
137              readyToCheck.push(key);
138              return;
139          }
140  
141          var dependencies = task.slice(0, task.length - 1);
142          var remainingDependencies = dependencies.length;
143          if (remainingDependencies === 0) {
144              enqueueTask(key, task);
145              readyToCheck.push(key);
146              return;
147          }
148          uncheckedDependencies[key] = remainingDependencies;
149  
150          dependencies.forEach(dependencyName => {
151              if (!tasks[dependencyName]) {
152                  throw new Error('async.auto task `' + key + '` has a non-existent dependency `' + dependencyName + '` in ' + dependencies.join(', '));
153              }
154              addListener(dependencyName, () => {
155                  remainingDependencies--;
156                  if (remainingDependencies === 0) {
157                      enqueueTask(key, task);
158                  }
159              });
160          });
161      });
162  
163      checkForDeadlocks();
164      processQueue();
165  
166      function enqueueTask(key, task) {
167          readyTasks.push(() => runTask(key, task));
168      }
169  
170      function processQueue() {
171          if (canceled) return;
172          if (readyTasks.length === 0 && runningTasks === 0) {
173              return callback(null, results);
174          }
175          while (readyTasks.length && runningTasks < concurrency) {
176              var run = readyTasks.shift();
177              run();
178          }
179      }
180  
181      function addListener(taskName, fn) {
182          var taskListeners = listeners[taskName];
183          if (!taskListeners) {
184              taskListeners = listeners[taskName] = [];
185          }
186  
187          taskListeners.push(fn);
188      }
189  
190      function taskComplete(taskName) {
191          var taskListeners = listeners[taskName] || [];
192          taskListeners.forEach(fn => fn());
193          processQueue();
194      }
195  
196      function runTask(key, task) {
197          if (hasError) return;
198  
199          var taskCallback = (0, _onlyOnce2.default)((err, ...result) => {
200              runningTasks--;
201              if (err === false) {
202                  canceled = true;
203                  return;
204              }
205              if (result.length < 2) {
206                  [result] = result;
207              }
208              if (err) {
209                  var safeResults = {};
210                  Object.keys(results).forEach(rkey => {
211                      safeResults[rkey] = results[rkey];
212                  });
213                  safeResults[key] = result;
214                  hasError = true;
215                  listeners = Object.create(null);
216                  if (canceled) return;
217                  callback(err, safeResults);
218              } else {
219                  results[key] = result;
220                  taskComplete(key);
221              }
222          });
223  
224          runningTasks++;
225          var taskFn = (0, _wrapAsync2.default)(task[task.length - 1]);
226          if (task.length > 1) {
227              taskFn(results, taskCallback);
228          } else {
229              taskFn(taskCallback);
230          }
231      }
232  
233      function checkForDeadlocks() {
234          // Kahn's algorithm
235          // https://en.wikipedia.org/wiki/Topological_sorting#Kahn.27s_algorithm
236          // http://connalle.blogspot.com/2013/10/topological-sortingkahn-algorithm.html
237          var currentTask;
238          var counter = 0;
239          while (readyToCheck.length) {
240              currentTask = readyToCheck.pop();
241              counter++;
242              getDependents(currentTask).forEach(dependent => {
243                  if (--uncheckedDependencies[dependent] === 0) {
244                      readyToCheck.push(dependent);
245                  }
246              });
247          }
248  
249          if (counter !== numTasks) {
250              throw new Error('async.auto cannot execute tasks due to a recursive dependency');
251          }
252      }
253  
254      function getDependents(taskName) {
255          var result = [];
256          Object.keys(tasks).forEach(key => {
257              const task = tasks[key];
258              if (Array.isArray(task) && task.indexOf(taskName) >= 0) {
259                  result.push(key);
260              }
261          });
262          return result;
263      }
264  
265      return callback[_promiseCallback.PROMISE_SYMBOL];
266  }
267  module.exports = exports['default'];