queue.js
1 /** 2 * @template T 3 * @template R 4 * @callback ProcessingFn 5 * @param {T} item - Value of array element 6 * @returns {Promise<R>} 7 */ 8 9 export class FifoQueue { 10 /** 11 * @param {ProcessingFn<T, R>} processingFn - callback function to process queue items 12 * @param {number=} highWaterMark - max number of unprocessed items upon which should other tasks await 13 */ 14 constructor(processingFn, highWaterMark) { 15 if (!processingFn) throw new Error('No processing function was provided as a callback') 16 17 /** 18 * @private 19 * @type {ProcessingFn<T>} 20 */ 21 this.processingFn = processingFn 22 23 /** 24 * @private 25 * @type {number} 26 */ 27 this.highWaterMark = highWaterMark || Number.POSITIVE_INFINITY 28 29 /** 30 * @type {boolean} 31 */ 32 this.isProcessing = false 33 34 /** 35 * @type {boolean} 36 */ 37 this.isWaterMarkExceeded = false 38 39 /** 40 * @private 41 * @type {Set<() => void>} 42 */ 43 this.finishedEmitterQueue = new Set() 44 45 /** 46 * @private 47 * @type {Set<() => void>} 48 */ 49 this.waterMarkQueue = new Set() 50 51 /** 52 * @private 53 * @type {T[]} 54 */ 55 this.items = [] 56 } 57 58 /** 59 * Appends new elements to the end of an array, and returns the new length of the array. 60 * @param {T} items - New elements to add to the array. 61 * @returns {number} 62 */ 63 push(...items) { 64 const len = this.items.push(...items) 65 this.onPush(len) 66 67 return len 68 } 69 70 /** 71 * @private 72 * @param {number} len - current queue length 73 * @returns {Promise<void>} 74 */ 75 async onPush(len) { 76 this.isWaterMarkExceeded = len >= this.highWaterMark 77 78 if (this.isProcessing === false) { 79 this.isProcessing = true 80 81 while (this.items.length > 0) { 82 const item = this.items.shift() 83 84 await this.processingFn(item) 85 this.checkWaterMark() 86 } 87 88 this.onFinished() 89 } 90 } 91 92 /** 93 * @private 94 */ 95 onFinished() { 96 this.isProcessing = false 97 for (const resolver of this.finishedEmitterQueue.values()) { 98 resolver() 99 } 100 this.finishedEmitterQueue.clear() 101 } 102 103 /** 104 * @private 105 */ 106 checkWaterMark() { 107 if (this.items.length < this.highWaterMark) { 108 this.isWaterMarkExceeded = false 109 110 for (const resolver of this.waterMarkQueue.values()) { 111 resolver() 112 } 113 this.waterMarkQueue.clear() 114 } 115 } 116 117 /** 118 * Resolves when queue is done processing 119 * @returns {Promise<void>} 120 */ 121 waitOnFinished() { 122 if (!this.isProcessing) return Promise.resolve() 123 124 return new Promise((resolve) => { 125 this.finishedEmitterQueue.add(resolve) 126 }) 127 } 128 129 /** 130 * Resolves when process queue is lower than specified high watermark 131 * @returns {Promise<void>} 132 */ 133 waitOnWaterMark() { 134 if (!this.isWaterMarkExceeded) return Promise.resolve() 135 136 return new Promise((resolve) => { 137 this.waterMarkQueue.add(resolve) 138 }) 139 } 140 }