keyed-queue.ts
1 /** 2 * KeyedAsyncQueue — serialize async operations per key, parallel across keys. 3 * Each key gets a serial queue; different keys run concurrently. 4 */ 5 export class KeyedAsyncQueue { 6 private chains = new Map<string, Promise<void>>() 7 8 enqueue<T>(key: string, fn: () => Promise<T>, options?: { timeoutMs?: number }): Promise<T> { 9 return new Promise<T>((resolve, reject) => { 10 const prev = this.chains.get(key) ?? Promise.resolve() 11 const next = prev.then(async () => { 12 try { 13 let result: T 14 if (options?.timeoutMs != null && options.timeoutMs > 0) { 15 result = await withTimeout(fn(), options.timeoutMs) 16 } else { 17 result = await fn() 18 } 19 resolve(result) 20 } catch (err) { 21 reject(err) 22 } 23 }) 24 // Store a settled version so errors don't break the chain 25 const settled = next.then(() => {}, () => {}) 26 this.chains.set(key, settled) 27 // Cleanup when this is still the tail of the chain 28 settled.then(() => { 29 if (this.chains.get(key) === settled) { 30 this.chains.delete(key) 31 } 32 }) 33 }) 34 } 35 36 get activeKeys(): number { 37 return this.chains.size 38 } 39 } 40 41 function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> { 42 return new Promise<T>((resolve, reject) => { 43 const timer = setTimeout(() => reject(new Error(`KeyedAsyncQueue: operation timed out after ${ms}ms`)), ms) 44 promise.then( 45 (val) => { clearTimeout(timer); resolve(val) }, 46 (err) => { clearTimeout(timer); reject(err) }, 47 ) 48 }) 49 }