queue.js
  1  'use strict';
  2  
  3  Object.defineProperty(exports, "__esModule", {
  4      value: true
  5  });
  6  exports.default = queue;
  7  
  8  var _onlyOnce = require('./onlyOnce');
  9  
 10  var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
 11  
 12  var _setImmediate = require('./setImmediate');
 13  
 14  var _setImmediate2 = _interopRequireDefault(_setImmediate);
 15  
 16  var _DoublyLinkedList = require('./DoublyLinkedList');
 17  
 18  var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
 19  
 20  var _wrapAsync = require('./wrapAsync');
 21  
 22  var _wrapAsync2 = _interopRequireDefault(_wrapAsync);
 23  
 24  function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
 25  
 26  function queue(worker, concurrency, payload) {
 27      if (concurrency == null) {
 28          concurrency = 1;
 29      } else if (concurrency === 0) {
 30          throw new RangeError('Concurrency must not be zero');
 31      }
 32  
 33      var _worker = (0, _wrapAsync2.default)(worker);
 34      var numRunning = 0;
 35      var workersList = [];
 36      const events = {
 37          error: [],
 38          drain: [],
 39          saturated: [],
 40          unsaturated: [],
 41          empty: []
 42      };
 43  
 44      function on(event, handler) {
 45          events[event].push(handler);
 46      }
 47  
 48      function once(event, handler) {
 49          const handleAndRemove = (...args) => {
 50              off(event, handleAndRemove);
 51              handler(...args);
 52          };
 53          events[event].push(handleAndRemove);
 54      }
 55  
 56      function off(event, handler) {
 57          if (!event) return Object.keys(events).forEach(ev => events[ev] = []);
 58          if (!handler) return events[event] = [];
 59          events[event] = events[event].filter(ev => ev !== handler);
 60      }
 61  
 62      function trigger(event, ...args) {
 63          events[event].forEach(handler => handler(...args));
 64      }
 65  
 66      var processingScheduled = false;
 67      function _insert(data, insertAtFront, rejectOnError, callback) {
 68          if (callback != null && typeof callback !== 'function') {
 69              throw new Error('task callback must be a function');
 70          }
 71          q.started = true;
 72  
 73          var res, rej;
 74          function promiseCallback(err, ...args) {
 75              // we don't care about the error, let the global error handler
 76              // deal with it
 77              if (err) return rejectOnError ? rej(err) : res();
 78              if (args.length <= 1) return res(args[0]);
 79              res(args);
 80          }
 81  
 82          var item = {
 83              data,
 84              callback: rejectOnError ? promiseCallback : callback || promiseCallback
 85          };
 86  
 87          if (insertAtFront) {
 88              q._tasks.unshift(item);
 89          } else {
 90              q._tasks.push(item);
 91          }
 92  
 93          if (!processingScheduled) {
 94              processingScheduled = true;
 95              (0, _setImmediate2.default)(() => {
 96                  processingScheduled = false;
 97                  q.process();
 98              });
 99          }
100  
101          if (rejectOnError || !callback) {
102              return new Promise((resolve, reject) => {
103                  res = resolve;
104                  rej = reject;
105              });
106          }
107      }
108  
109      function _createCB(tasks) {
110          return function (err, ...args) {
111              numRunning -= 1;
112  
113              for (var i = 0, l = tasks.length; i < l; i++) {
114                  var task = tasks[i];
115  
116                  var index = workersList.indexOf(task);
117                  if (index === 0) {
118                      workersList.shift();
119                  } else if (index > 0) {
120                      workersList.splice(index, 1);
121                  }
122  
123                  task.callback(err, ...args);
124  
125                  if (err != null) {
126                      trigger('error', err, task.data);
127                  }
128              }
129  
130              if (numRunning <= q.concurrency - q.buffer) {
131                  trigger('unsaturated');
132              }
133  
134              if (q.idle()) {
135                  trigger('drain');
136              }
137              q.process();
138          };
139      }
140  
141      function _maybeDrain(data) {
142          if (data.length === 0 && q.idle()) {
143              // call drain immediately if there are no tasks
144              (0, _setImmediate2.default)(() => trigger('drain'));
145              return true;
146          }
147          return false;
148      }
149  
150      const eventMethod = name => handler => {
151          if (!handler) {
152              return new Promise((resolve, reject) => {
153                  once(name, (err, data) => {
154                      if (err) return reject(err);
155                      resolve(data);
156                  });
157              });
158          }
159          off(name);
160          on(name, handler);
161      };
162  
163      var isProcessing = false;
164      var q = {
165          _tasks: new _DoublyLinkedList2.default(),
166          *[Symbol.iterator]() {
167              yield* q._tasks[Symbol.iterator]();
168          },
169          concurrency,
170          payload,
171          buffer: concurrency / 4,
172          started: false,
173          paused: false,
174          push(data, callback) {
175              if (Array.isArray(data)) {
176                  if (_maybeDrain(data)) return;
177                  return data.map(datum => _insert(datum, false, false, callback));
178              }
179              return _insert(data, false, false, callback);
180          },
181          pushAsync(data, callback) {
182              if (Array.isArray(data)) {
183                  if (_maybeDrain(data)) return;
184                  return data.map(datum => _insert(datum, false, true, callback));
185              }
186              return _insert(data, false, true, callback);
187          },
188          kill() {
189              off();
190              q._tasks.empty();
191          },
192          unshift(data, callback) {
193              if (Array.isArray(data)) {
194                  if (_maybeDrain(data)) return;
195                  return data.map(datum => _insert(datum, true, false, callback));
196              }
197              return _insert(data, true, false, callback);
198          },
199          unshiftAsync(data, callback) {
200              if (Array.isArray(data)) {
201                  if (_maybeDrain(data)) return;
202                  return data.map(datum => _insert(datum, true, true, callback));
203              }
204              return _insert(data, true, true, callback);
205          },
206          remove(testFn) {
207              q._tasks.remove(testFn);
208          },
209          process() {
210              // Avoid trying to start too many processing operations. This can occur
211              // when callbacks resolve synchronously (#1267).
212              if (isProcessing) {
213                  return;
214              }
215              isProcessing = true;
216              while (!q.paused && numRunning < q.concurrency && q._tasks.length) {
217                  var tasks = [],
218                      data = [];
219                  var l = q._tasks.length;
220                  if (q.payload) l = Math.min(l, q.payload);
221                  for (var i = 0; i < l; i++) {
222                      var node = q._tasks.shift();
223                      tasks.push(node);
224                      workersList.push(node);
225                      data.push(node.data);
226                  }
227  
228                  numRunning += 1;
229  
230                  if (q._tasks.length === 0) {
231                      trigger('empty');
232                  }
233  
234                  if (numRunning === q.concurrency) {
235                      trigger('saturated');
236                  }
237  
238                  var cb = (0, _onlyOnce2.default)(_createCB(tasks));
239                  _worker(data, cb);
240              }
241              isProcessing = false;
242          },
243          length() {
244              return q._tasks.length;
245          },
246          running() {
247              return numRunning;
248          },
249          workersList() {
250              return workersList;
251          },
252          idle() {
253              return q._tasks.length + numRunning === 0;
254          },
255          pause() {
256              q.paused = true;
257          },
258          resume() {
259              if (q.paused === false) {
260                  return;
261              }
262              q.paused = false;
263              (0, _setImmediate2.default)(q.process);
264          }
265      };
266      // define these as fixed properties, so people get useful errors when updating
267      Object.defineProperties(q, {
268          saturated: {
269              writable: false,
270              value: eventMethod('saturated')
271          },
272          unsaturated: {
273              writable: false,
274              value: eventMethod('unsaturated')
275          },
276          empty: {
277              writable: false,
278              value: eventMethod('empty')
279          },
280          drain: {
281              writable: false,
282              value: eventMethod('drain')
283          },
284          error: {
285              writable: false,
286              value: eventMethod('error')
287          }
288      });
289      return q;
290  }
291  module.exports = exports['default'];