/ cli / transports / SerialBatchEventUploader.ts
SerialBatchEventUploader.ts
  1  import { jsonStringify } from '../../utils/slowOperations.js'
  2  
  3  /**
  4   * Serial ordered event uploader with batching, retry, and backpressure.
  5   *
  6   * - enqueue() adds events to a pending buffer
  7   * - At most 1 POST in-flight at a time
  8   * - Drains up to maxBatchSize items per POST
  9   * - New events accumulate while in-flight
 10   * - On failure: exponential backoff (clamped), retries indefinitely
 11   *   until success or close() — unless maxConsecutiveFailures is set,
 12   *   in which case the failing batch is dropped and drain advances
 13   * - flush() blocks until pending is empty and kicks drain if needed
 14   * - Backpressure: enqueue() blocks when maxQueueSize is reached
 15   */
 16  
 17  /**
 18   * Throw from config.send() to make the uploader wait a server-supplied
 19   * duration before retrying (e.g. 429 with Retry-After). When retryAfterMs
 20   * is set, it overrides exponential backoff for that attempt — clamped to
 21   * [baseDelayMs, maxDelayMs] and jittered so a misbehaving server can
 22   * neither hot-loop nor stall the client, and many sessions sharing a rate
 23   * limit don't all pounce at the same instant. Without retryAfterMs, behaves
 24   * like any other thrown error (exponential backoff).
 25   */
 26  export class RetryableError extends Error {
 27    constructor(
 28      message: string,
 29      readonly retryAfterMs?: number,
 30    ) {
 31      super(message)
 32    }
 33  }
 34  
 35  type SerialBatchEventUploaderConfig<T> = {
 36    /** Max items per POST (1 = no batching) */
 37    maxBatchSize: number
 38    /**
 39     * Max serialized bytes per POST. First item always goes in regardless of
 40     * size; subsequent items only if cumulative JSON bytes stay under this.
 41     * Undefined = no byte limit (count-only batching).
 42     */
 43    maxBatchBytes?: number
 44    /** Max pending items before enqueue() blocks */
 45    maxQueueSize: number
 46    /** The actual HTTP call — caller controls payload format */
 47    send: (batch: T[]) => Promise<void>
 48    /** Base delay for exponential backoff (ms) */
 49    baseDelayMs: number
 50    /** Max delay cap (ms) */
 51    maxDelayMs: number
 52    /** Random jitter range added to retry delay (ms) */
 53    jitterMs: number
 54    /**
 55     * After this many consecutive send() failures, drop the failing batch
 56     * and move on to the next pending item with a fresh failure budget.
 57     * Undefined = retry indefinitely (default).
 58     */
 59    maxConsecutiveFailures?: number
 60    /** Called when a batch is dropped for hitting maxConsecutiveFailures. */
 61    onBatchDropped?: (batchSize: number, failures: number) => void
 62  }
 63  
 64  export class SerialBatchEventUploader<T> {
 65    private pending: T[] = []
 66    private pendingAtClose = 0
 67    private draining = false
 68    private closed = false
 69    private backpressureResolvers: Array<() => void> = []
 70    private sleepResolve: (() => void) | null = null
 71    private flushResolvers: Array<() => void> = []
 72    private droppedBatches = 0
 73    private readonly config: SerialBatchEventUploaderConfig<T>
 74  
 75    constructor(config: SerialBatchEventUploaderConfig<T>) {
 76      this.config = config
 77    }
 78  
 79    /**
 80     * Monotonic count of batches dropped via maxConsecutiveFailures. Callers
 81     * can snapshot before flush() and compare after to detect silent drops
 82     * (flush() resolves normally even when batches were dropped).
 83     */
 84    get droppedBatchCount(): number {
 85      return this.droppedBatches
 86    }
 87  
 88    /**
 89     * Pending queue depth. After close(), returns the count at close time —
 90     * close() clears the queue but shutdown diagnostics may read this after.
 91     */
 92    get pendingCount(): number {
 93      return this.closed ? this.pendingAtClose : this.pending.length
 94    }
 95  
 96    /**
 97     * Add events to the pending buffer. Returns immediately if space is
 98     * available. Blocks (awaits) if the buffer is full — caller pauses
 99     * until drain frees space.
100     */
101    async enqueue(events: T | T[]): Promise<void> {
102      if (this.closed) return
103      const items = Array.isArray(events) ? events : [events]
104      if (items.length === 0) return
105  
106      // Backpressure: wait until there's space
107      while (
108        this.pending.length + items.length > this.config.maxQueueSize &&
109        !this.closed
110      ) {
111        await new Promise<void>(resolve => {
112          this.backpressureResolvers.push(resolve)
113        })
114      }
115  
116      if (this.closed) return
117      this.pending.push(...items)
118      void this.drain()
119    }
120  
121    /**
122     * Block until all pending events have been sent.
123     * Used at turn boundaries and graceful shutdown.
124     */
125    flush(): Promise<void> {
126      if (this.pending.length === 0 && !this.draining) {
127        return Promise.resolve()
128      }
129      void this.drain()
130      return new Promise<void>(resolve => {
131        this.flushResolvers.push(resolve)
132      })
133    }
134  
135    /**
136     * Drop pending events and stop processing.
137     * Resolves any blocked enqueue() and flush() callers.
138     */
139    close(): void {
140      if (this.closed) return
141      this.closed = true
142      this.pendingAtClose = this.pending.length
143      this.pending = []
144      this.sleepResolve?.()
145      this.sleepResolve = null
146      for (const resolve of this.backpressureResolvers) resolve()
147      this.backpressureResolvers = []
148      for (const resolve of this.flushResolvers) resolve()
149      this.flushResolvers = []
150    }
151  
152    /**
153     * Drain loop. At most one instance runs at a time (guarded by this.draining).
154     * Sends batches serially. On failure, backs off and retries indefinitely.
155     */
156    private async drain(): Promise<void> {
157      if (this.draining || this.closed) return
158      this.draining = true
159      let failures = 0
160  
161      try {
162        while (this.pending.length > 0 && !this.closed) {
163          const batch = this.takeBatch()
164          if (batch.length === 0) continue
165  
166          try {
167            await this.config.send(batch)
168            failures = 0
169          } catch (err) {
170            failures++
171            if (
172              this.config.maxConsecutiveFailures !== undefined &&
173              failures >= this.config.maxConsecutiveFailures
174            ) {
175              this.droppedBatches++
176              this.config.onBatchDropped?.(batch.length, failures)
177              failures = 0
178              this.releaseBackpressure()
179              continue
180            }
181            // Re-queue the failed batch at the front. Use concat (single
182            // allocation) instead of unshift(...batch) which shifts every
183            // pending item batch.length times. Only hit on failure path.
184            this.pending = batch.concat(this.pending)
185            const retryAfterMs =
186              err instanceof RetryableError ? err.retryAfterMs : undefined
187            await this.sleep(this.retryDelay(failures, retryAfterMs))
188            continue
189          }
190  
191          // Release backpressure waiters if space opened up
192          this.releaseBackpressure()
193        }
194      } finally {
195        this.draining = false
196        // Notify flush waiters if queue is empty
197        if (this.pending.length === 0) {
198          for (const resolve of this.flushResolvers) resolve()
199          this.flushResolvers = []
200        }
201      }
202    }
203  
204    /**
205     * Pull the next batch from pending. Respects both maxBatchSize and
206     * maxBatchBytes. The first item is always taken; subsequent items only
207     * if adding them keeps the cumulative JSON size under maxBatchBytes.
208     *
209     * Un-serializable items (BigInt, circular refs, throwing toJSON) are
210     * dropped in place — they can never be sent and leaving them at
211     * pending[0] would poison the queue and hang flush() forever.
212     */
213    private takeBatch(): T[] {
214      const { maxBatchSize, maxBatchBytes } = this.config
215      if (maxBatchBytes === undefined) {
216        return this.pending.splice(0, maxBatchSize)
217      }
218      let bytes = 0
219      let count = 0
220      while (count < this.pending.length && count < maxBatchSize) {
221        let itemBytes: number
222        try {
223          itemBytes = Buffer.byteLength(jsonStringify(this.pending[count]))
224        } catch {
225          this.pending.splice(count, 1)
226          continue
227        }
228        if (count > 0 && bytes + itemBytes > maxBatchBytes) break
229        bytes += itemBytes
230        count++
231      }
232      return this.pending.splice(0, count)
233    }
234  
235    private retryDelay(failures: number, retryAfterMs?: number): number {
236      const jitter = Math.random() * this.config.jitterMs
237      if (retryAfterMs !== undefined) {
238        // Jitter on top of the server's hint prevents thundering herd when
239        // many sessions share a rate limit and all receive the same
240        // Retry-After. Clamp first, then spread — same shape as the
241        // exponential path (effective ceiling is maxDelayMs + jitterMs).
242        const clamped = Math.max(
243          this.config.baseDelayMs,
244          Math.min(retryAfterMs, this.config.maxDelayMs),
245        )
246        return clamped + jitter
247      }
248      const exponential = Math.min(
249        this.config.baseDelayMs * 2 ** (failures - 1),
250        this.config.maxDelayMs,
251      )
252      return exponential + jitter
253    }
254  
255    private releaseBackpressure(): void {
256      const resolvers = this.backpressureResolvers
257      this.backpressureResolvers = []
258      for (const resolve of resolvers) resolve()
259    }
260  
261    private sleep(ms: number): Promise<void> {
262      return new Promise(resolve => {
263        this.sleepResolve = resolve
264        setTimeout(
265          (self, resolve) => {
266            self.sleepResolve = null
267            resolve()
268          },
269          ms,
270          this,
271          resolve,
272        )
273      })
274    }
275  }