/ lib / queue.js
queue.js
  1  /**
  2   * @template T
  3   * @template R
  4   * @callback ProcessingFn
  5   * @param {T} item - Value of array element
  6   * @returns {Promise<R>}
  7   */
  8  
  9  export class FifoQueue {
 10      /**
 11       * @param {ProcessingFn<T, R>} processingFn - callback function to process queue items
 12       * @param {number=} highWaterMark - max number of unprocessed items upon which should other tasks await
 13       */
 14      constructor(processingFn, highWaterMark) {
 15          if (!processingFn) throw new Error('No processing function was provided as a callback')
 16  
 17          /**
 18           * @private
 19           * @type {ProcessingFn<T>}
 20           */
 21          this.processingFn = processingFn
 22  
 23          /**
 24           * @private
 25           * @type {number}
 26           */
 27          this.highWaterMark = highWaterMark || Number.POSITIVE_INFINITY
 28  
 29          /**
 30           * @type {boolean}
 31           */
 32          this.isProcessing = false
 33  
 34          /**
 35           * @type {boolean}
 36           */
 37          this.isWaterMarkExceeded = false
 38  
 39          /**
 40           * @private
 41           * @type {Set<() => void>}
 42           */
 43          this.finishedEmitterQueue = new Set()
 44  
 45          /**
 46           * @private
 47           * @type {Set<() => void>}
 48           */
 49          this.waterMarkQueue = new Set()
 50  
 51          /**
 52           * @private
 53           * @type {T[]}
 54           */
 55          this.items = []
 56      }
 57  
 58      /**
 59       * Appends new elements to the end of an array, and returns the new length of the array.
 60       * @param  {T} items - New elements to add to the array.
 61       * @returns {number}
 62       */
 63      push(...items) {
 64          const len = this.items.push(...items)
 65          this.onPush(len)
 66  
 67          return len
 68      }
 69  
 70      /**
 71       * @private
 72       * @param {number} len - current queue length
 73       * @returns {Promise<void>}
 74       */
 75      async onPush(len) {
 76          this.isWaterMarkExceeded = len >= this.highWaterMark
 77  
 78          if (this.isProcessing === false) {
 79              this.isProcessing = true
 80  
 81              while (this.items.length > 0) {
 82                  const item = this.items.shift()
 83  
 84                  await this.processingFn(item)
 85                  this.checkWaterMark()
 86              }
 87  
 88              this.onFinished()
 89          }
 90      }
 91  
 92      /**
 93       * @private
 94       */
 95      onFinished() {
 96          this.isProcessing = false
 97          for (const resolver of this.finishedEmitterQueue.values()) {
 98              resolver()
 99          }
100          this.finishedEmitterQueue.clear()
101      }
102  
103      /**
104       * @private
105       */
106      checkWaterMark() {
107          if (this.items.length < this.highWaterMark) {
108              this.isWaterMarkExceeded = false
109  
110              for (const resolver of this.waterMarkQueue.values()) {
111                  resolver()
112              }
113              this.waterMarkQueue.clear()
114          }
115      }
116  
117      /**
118       * Resolves when queue is done processing
119       * @returns {Promise<void>}
120       */
121      waitOnFinished() {
122          if (!this.isProcessing) return Promise.resolve()
123  
124          return new Promise((resolve) => {
125              this.finishedEmitterQueue.add(resolve)
126          })
127      }
128  
129      /**
130       * Resolves when process queue is lower than specified high watermark
131       * @returns {Promise<void>}
132       */
133      waitOnWaterMark() {
134          if (!this.isWaterMarkExceeded) return Promise.resolve()
135  
136          return new Promise((resolve) => {
137              this.waterMarkQueue.add(resolve)
138          })
139      }
140  }