/ src / lib / keyed-queue.ts
keyed-queue.ts
 1  /**
 2   * KeyedAsyncQueue — serialize async operations per key, parallel across keys.
 3   * Each key gets a serial queue; different keys run concurrently.
 4   */
 5  export class KeyedAsyncQueue {
 6    private chains = new Map<string, Promise<void>>()
 7  
 8    enqueue<T>(key: string, fn: () => Promise<T>, options?: { timeoutMs?: number }): Promise<T> {
 9      return new Promise<T>((resolve, reject) => {
10        const prev = this.chains.get(key) ?? Promise.resolve()
11        const next = prev.then(async () => {
12          try {
13            let result: T
14            if (options?.timeoutMs != null && options.timeoutMs > 0) {
15              result = await withTimeout(fn(), options.timeoutMs)
16            } else {
17              result = await fn()
18            }
19            resolve(result)
20          } catch (err) {
21            reject(err)
22          }
23        })
24        // Store a settled version so errors don't break the chain
25        const settled = next.then(() => {}, () => {})
26        this.chains.set(key, settled)
27        // Cleanup when this is still the tail of the chain
28        settled.then(() => {
29          if (this.chains.get(key) === settled) {
30            this.chains.delete(key)
31          }
32        })
33      })
34    }
35  
36    get activeKeys(): number {
37      return this.chains.size
38    }
39  }
40  
41  function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
42    return new Promise<T>((resolve, reject) => {
43      const timer = setTimeout(() => reject(new Error(`KeyedAsyncQueue: operation timed out after ${ms}ms`)), ms)
44      promise.then(
45        (val) => { clearTimeout(timer); resolve(val) },
46        (err) => { clearTimeout(timer); reject(err) },
47      )
48    })
49  }