queue.js
1 'use strict'; 2 3 Object.defineProperty(exports, "__esModule", { 4 value: true 5 }); 6 exports.default = queue; 7 8 var _onlyOnce = require('./onlyOnce'); 9 10 var _onlyOnce2 = _interopRequireDefault(_onlyOnce); 11 12 var _setImmediate = require('./setImmediate'); 13 14 var _setImmediate2 = _interopRequireDefault(_setImmediate); 15 16 var _DoublyLinkedList = require('./DoublyLinkedList'); 17 18 var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList); 19 20 var _wrapAsync = require('./wrapAsync'); 21 22 var _wrapAsync2 = _interopRequireDefault(_wrapAsync); 23 24 function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } 25 26 function queue(worker, concurrency, payload) { 27 if (concurrency == null) { 28 concurrency = 1; 29 } else if (concurrency === 0) { 30 throw new RangeError('Concurrency must not be zero'); 31 } 32 33 var _worker = (0, _wrapAsync2.default)(worker); 34 var numRunning = 0; 35 var workersList = []; 36 const events = { 37 error: [], 38 drain: [], 39 saturated: [], 40 unsaturated: [], 41 empty: [] 42 }; 43 44 function on(event, handler) { 45 events[event].push(handler); 46 } 47 48 function once(event, handler) { 49 const handleAndRemove = (...args) => { 50 off(event, handleAndRemove); 51 handler(...args); 52 }; 53 events[event].push(handleAndRemove); 54 } 55 56 function off(event, handler) { 57 if (!event) return Object.keys(events).forEach(ev => events[ev] = []); 58 if (!handler) return events[event] = []; 59 events[event] = events[event].filter(ev => ev !== handler); 60 } 61 62 function trigger(event, ...args) { 63 events[event].forEach(handler => handler(...args)); 64 } 65 66 var processingScheduled = false; 67 function _insert(data, insertAtFront, rejectOnError, callback) { 68 if (callback != null && typeof callback !== 'function') { 69 throw new Error('task callback must be a function'); 70 } 71 q.started = true; 72 73 var res, rej; 74 function promiseCallback(err, ...args) { 75 // we don't care about the error, let the global error handler 76 // deal with it 77 if (err) return rejectOnError ? rej(err) : res(); 78 if (args.length <= 1) return res(args[0]); 79 res(args); 80 } 81 82 var item = { 83 data, 84 callback: rejectOnError ? promiseCallback : callback || promiseCallback 85 }; 86 87 if (insertAtFront) { 88 q._tasks.unshift(item); 89 } else { 90 q._tasks.push(item); 91 } 92 93 if (!processingScheduled) { 94 processingScheduled = true; 95 (0, _setImmediate2.default)(() => { 96 processingScheduled = false; 97 q.process(); 98 }); 99 } 100 101 if (rejectOnError || !callback) { 102 return new Promise((resolve, reject) => { 103 res = resolve; 104 rej = reject; 105 }); 106 } 107 } 108 109 function _createCB(tasks) { 110 return function (err, ...args) { 111 numRunning -= 1; 112 113 for (var i = 0, l = tasks.length; i < l; i++) { 114 var task = tasks[i]; 115 116 var index = workersList.indexOf(task); 117 if (index === 0) { 118 workersList.shift(); 119 } else if (index > 0) { 120 workersList.splice(index, 1); 121 } 122 123 task.callback(err, ...args); 124 125 if (err != null) { 126 trigger('error', err, task.data); 127 } 128 } 129 130 if (numRunning <= q.concurrency - q.buffer) { 131 trigger('unsaturated'); 132 } 133 134 if (q.idle()) { 135 trigger('drain'); 136 } 137 q.process(); 138 }; 139 } 140 141 function _maybeDrain(data) { 142 if (data.length === 0 && q.idle()) { 143 // call drain immediately if there are no tasks 144 (0, _setImmediate2.default)(() => trigger('drain')); 145 return true; 146 } 147 return false; 148 } 149 150 const eventMethod = name => handler => { 151 if (!handler) { 152 return new Promise((resolve, reject) => { 153 once(name, (err, data) => { 154 if (err) return reject(err); 155 resolve(data); 156 }); 157 }); 158 } 159 off(name); 160 on(name, handler); 161 }; 162 163 var isProcessing = false; 164 var q = { 165 _tasks: new _DoublyLinkedList2.default(), 166 *[Symbol.iterator]() { 167 yield* q._tasks[Symbol.iterator](); 168 }, 169 concurrency, 170 payload, 171 buffer: concurrency / 4, 172 started: false, 173 paused: false, 174 push(data, callback) { 175 if (Array.isArray(data)) { 176 if (_maybeDrain(data)) return; 177 return data.map(datum => _insert(datum, false, false, callback)); 178 } 179 return _insert(data, false, false, callback); 180 }, 181 pushAsync(data, callback) { 182 if (Array.isArray(data)) { 183 if (_maybeDrain(data)) return; 184 return data.map(datum => _insert(datum, false, true, callback)); 185 } 186 return _insert(data, false, true, callback); 187 }, 188 kill() { 189 off(); 190 q._tasks.empty(); 191 }, 192 unshift(data, callback) { 193 if (Array.isArray(data)) { 194 if (_maybeDrain(data)) return; 195 return data.map(datum => _insert(datum, true, false, callback)); 196 } 197 return _insert(data, true, false, callback); 198 }, 199 unshiftAsync(data, callback) { 200 if (Array.isArray(data)) { 201 if (_maybeDrain(data)) return; 202 return data.map(datum => _insert(datum, true, true, callback)); 203 } 204 return _insert(data, true, true, callback); 205 }, 206 remove(testFn) { 207 q._tasks.remove(testFn); 208 }, 209 process() { 210 // Avoid trying to start too many processing operations. This can occur 211 // when callbacks resolve synchronously (#1267). 212 if (isProcessing) { 213 return; 214 } 215 isProcessing = true; 216 while (!q.paused && numRunning < q.concurrency && q._tasks.length) { 217 var tasks = [], 218 data = []; 219 var l = q._tasks.length; 220 if (q.payload) l = Math.min(l, q.payload); 221 for (var i = 0; i < l; i++) { 222 var node = q._tasks.shift(); 223 tasks.push(node); 224 workersList.push(node); 225 data.push(node.data); 226 } 227 228 numRunning += 1; 229 230 if (q._tasks.length === 0) { 231 trigger('empty'); 232 } 233 234 if (numRunning === q.concurrency) { 235 trigger('saturated'); 236 } 237 238 var cb = (0, _onlyOnce2.default)(_createCB(tasks)); 239 _worker(data, cb); 240 } 241 isProcessing = false; 242 }, 243 length() { 244 return q._tasks.length; 245 }, 246 running() { 247 return numRunning; 248 }, 249 workersList() { 250 return workersList; 251 }, 252 idle() { 253 return q._tasks.length + numRunning === 0; 254 }, 255 pause() { 256 q.paused = true; 257 }, 258 resume() { 259 if (q.paused === false) { 260 return; 261 } 262 q.paused = false; 263 (0, _setImmediate2.default)(q.process); 264 } 265 }; 266 // define these as fixed properties, so people get useful errors when updating 267 Object.defineProperties(q, { 268 saturated: { 269 writable: false, 270 value: eventMethod('saturated') 271 }, 272 unsaturated: { 273 writable: false, 274 value: eventMethod('unsaturated') 275 }, 276 empty: { 277 writable: false, 278 value: eventMethod('empty') 279 }, 280 drain: { 281 writable: false, 282 value: eventMethod('drain') 283 }, 284 error: { 285 writable: false, 286 value: eventMethod('error') 287 } 288 }); 289 return q; 290 } 291 module.exports = exports['default'];