/ cli / transports / WorkerStateUploader.ts
WorkerStateUploader.ts
  1  import { sleep } from '../../utils/sleep.js'
  2  
  3  /**
  4   * Coalescing uploader for PUT /worker (session state + metadata).
  5   *
  6   * - 1 in-flight PUT + 1 pending patch
  7   * - New calls coalesce into pending (never grows beyond 1 slot)
  8   * - On success: send pending if exists
  9   * - On failure: exponential backoff (clamped), retries indefinitely
 10   *   until success or close(). Absorbs any pending patches before each retry.
 11   * - No backpressure needed — naturally bounded at 2 slots
 12   *
 13   * Coalescing rules:
 14   * - Top-level keys (worker_status, external_metadata) — last value wins
 15   * - Inside external_metadata / internal_metadata — RFC 7396 merge:
 16   *   keys are added/overwritten, null values preserved (server deletes)
 17   */
 18  
 19  type WorkerStateUploaderConfig = {
 20    send: (body: Record<string, unknown>) => Promise<boolean>
 21    /** Base delay for exponential backoff (ms) */
 22    baseDelayMs: number
 23    /** Max delay cap (ms) */
 24    maxDelayMs: number
 25    /** Random jitter range added to retry delay (ms) */
 26    jitterMs: number
 27  }
 28  
 29  export class WorkerStateUploader {
 30    private inflight: Promise<void> | null = null
 31    private pending: Record<string, unknown> | null = null
 32    private closed = false
 33    private readonly config: WorkerStateUploaderConfig
 34  
 35    constructor(config: WorkerStateUploaderConfig) {
 36      this.config = config
 37    }
 38  
 39    /**
 40     * Enqueue a patch to PUT /worker. Coalesces with any existing pending
 41     * patch. Fire-and-forget — callers don't need to await.
 42     */
 43    enqueue(patch: Record<string, unknown>): void {
 44      if (this.closed) return
 45      this.pending = this.pending ? coalescePatches(this.pending, patch) : patch
 46      void this.drain()
 47    }
 48  
 49    close(): void {
 50      this.closed = true
 51      this.pending = null
 52    }
 53  
 54    private async drain(): Promise<void> {
 55      if (this.inflight || this.closed) return
 56      if (!this.pending) return
 57  
 58      const payload = this.pending
 59      this.pending = null
 60  
 61      this.inflight = this.sendWithRetry(payload).then(() => {
 62        this.inflight = null
 63        if (this.pending && !this.closed) {
 64          void this.drain()
 65        }
 66      })
 67    }
 68  
 69    /** Retries indefinitely with exponential backoff until success or close(). */
 70    private async sendWithRetry(payload: Record<string, unknown>): Promise<void> {
 71      let current = payload
 72      let failures = 0
 73      while (!this.closed) {
 74        const ok = await this.config.send(current)
 75        if (ok) return
 76  
 77        failures++
 78        await sleep(this.retryDelay(failures))
 79  
 80        // Absorb any patches that arrived during the retry
 81        if (this.pending && !this.closed) {
 82          current = coalescePatches(current, this.pending)
 83          this.pending = null
 84        }
 85      }
 86    }
 87  
 88    private retryDelay(failures: number): number {
 89      const exponential = Math.min(
 90        this.config.baseDelayMs * 2 ** (failures - 1),
 91        this.config.maxDelayMs,
 92      )
 93      const jitter = Math.random() * this.config.jitterMs
 94      return exponential + jitter
 95    }
 96  }
 97  
 98  /**
 99   * Coalesce two patches for PUT /worker.
100   *
101   * Top-level keys: overlay replaces base (last value wins).
102   * Metadata keys (external_metadata, internal_metadata): RFC 7396 merge
103   * one level deep — overlay keys are added/overwritten, null values
104   * preserved for server-side delete.
105   */
106  function coalescePatches(
107    base: Record<string, unknown>,
108    overlay: Record<string, unknown>,
109  ): Record<string, unknown> {
110    const merged = { ...base }
111  
112    for (const [key, value] of Object.entries(overlay)) {
113      if (
114        (key === 'external_metadata' || key === 'internal_metadata') &&
115        merged[key] &&
116        typeof merged[key] === 'object' &&
117        typeof value === 'object' &&
118        value !== null
119      ) {
120        // RFC 7396 merge — overlay keys win, nulls preserved for server
121        merged[key] = {
122          ...(merged[key] as Record<string, unknown>),
123          ...(value as Record<string, unknown>),
124        }
125      } else {
126        merged[key] = value
127      }
128    }
129  
130    return merged
131  }