/ cli / transports / HybridTransport.ts
HybridTransport.ts
  1  import axios, { type AxiosError } from 'axios'
  2  import type { StdoutMessage } from 'src/entrypoints/sdk/controlTypes.js'
  3  import { logForDebugging } from '../../utils/debug.js'
  4  import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
  5  import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
  6  import { SerialBatchEventUploader } from './SerialBatchEventUploader.js'
  7  import {
  8    WebSocketTransport,
  9    type WebSocketTransportOptions,
 10  } from './WebSocketTransport.js'
 11  
 12  const BATCH_FLUSH_INTERVAL_MS = 100
 13  // Per-attempt POST timeout. Bounds how long a single stuck POST can block
 14  // the serialized queue. Without this, a hung connection stalls all writes.
 15  const POST_TIMEOUT_MS = 15_000
 16  // Grace period for queued writes on close(). Covers a healthy POST (~100ms)
 17  // plus headroom; best-effort, not a delivery guarantee under degraded network.
 18  // Void-ed (nothing awaits it) so this is a last resort — replBridge teardown
 19  // now closes AFTER archive so archive latency is the primary drain window.
 20  // NOTE: gracefulShutdown's cleanup budget is 2s (not the 5s outer failsafe);
 21  // 3s here exceeds it, but the process lives ~2s longer for hooks+analytics.
 22  const CLOSE_GRACE_MS = 3000
 23  
 24  /**
 25   * Hybrid transport: WebSocket for reads, HTTP POST for writes.
 26   *
 27   * Write flow:
 28   *
 29   *   write(stream_event) ─┐
 30   *                        │ (100ms timer)
 31   *                        │
 32   *                        ▼
 33   *   write(other) ────► uploader.enqueue()  (SerialBatchEventUploader)
 34   *                        ▲    │
 35   *   writeBatch() ────────┘    │ serial, batched, retries indefinitely,
 36   *                             │ backpressure at maxQueueSize
 37   *                             ▼
 38   *                        postOnce()  (single HTTP POST, throws on retryable)
 39   *
 40   * stream_event messages accumulate in streamEventBuffer for up to 100ms
 41   * before enqueue (reduces POST count for high-volume content deltas). A
 42   * non-stream write flushes any buffered stream_events first to preserve order.
 43   *
 44   * Serialization + retry + backpressure are delegated to SerialBatchEventUploader
 45   * (same primitive CCR uses). At most one POST in-flight; events arriving during
 46   * a POST batch into the next one. On failure, the uploader re-queues and retries
 47   * with exponential backoff + jitter. If the queue fills past maxQueueSize,
 48   * enqueue() blocks — giving awaiting callers backpressure.
 49   *
 50   * Why serialize? Bridge mode fires writes via `void transport.write()`
 51   * (fire-and-forget). Without this, concurrent POSTs → concurrent Firestore
 52   * writes to the same document → collisions → retry storms → pages oncall.
 53   */
 54  export class HybridTransport extends WebSocketTransport {
 55    private postUrl: string
 56    private uploader: SerialBatchEventUploader<StdoutMessage>
 57  
 58    // stream_event delay buffer — accumulates content deltas for up to
 59    // BATCH_FLUSH_INTERVAL_MS before enqueueing (reduces POST count)
 60    private streamEventBuffer: StdoutMessage[] = []
 61    private streamEventTimer: ReturnType<typeof setTimeout> | null = null
 62  
 63    constructor(
 64      url: URL,
 65      headers: Record<string, string> = {},
 66      sessionId?: string,
 67      refreshHeaders?: () => Record<string, string>,
 68      options?: WebSocketTransportOptions & {
 69        maxConsecutiveFailures?: number
 70        onBatchDropped?: (batchSize: number, failures: number) => void
 71      },
 72    ) {
 73      super(url, headers, sessionId, refreshHeaders, options)
 74      const { maxConsecutiveFailures, onBatchDropped } = options ?? {}
 75      this.postUrl = convertWsUrlToPostUrl(url)
 76      this.uploader = new SerialBatchEventUploader<StdoutMessage>({
 77        // Large cap — session-ingress accepts arbitrary batch sizes. Events
 78        // naturally batch during in-flight POSTs; this just bounds the payload.
 79        maxBatchSize: 500,
 80        // Bridge callers use `void transport.write()` — backpressure doesn't
 81        // apply (they don't await). A batch >maxQueueSize deadlocks (see
 82        // SerialBatchEventUploader backpressure check). So set it high enough
 83        // to be a memory bound only. Wire real backpressure in a follow-up
 84        // once callers await.
 85        maxQueueSize: 100_000,
 86        baseDelayMs: 500,
 87        maxDelayMs: 8000,
 88        jitterMs: 1000,
 89        // Optional cap so a persistently-failing server can't pin the drain
 90        // loop for the lifetime of the process. Undefined = indefinite retry.
 91        // replBridge sets this; the 1P transportUtils path does not.
 92        maxConsecutiveFailures,
 93        onBatchDropped: (batchSize, failures) => {
 94          logForDiagnosticsNoPII(
 95            'error',
 96            'cli_hybrid_batch_dropped_max_failures',
 97            {
 98              batchSize,
 99              failures,
100            },
101          )
102          onBatchDropped?.(batchSize, failures)
103        },
104        send: batch => this.postOnce(batch),
105      })
106      logForDebugging(`HybridTransport: POST URL = ${this.postUrl}`)
107      logForDiagnosticsNoPII('info', 'cli_hybrid_transport_initialized')
108    }
109  
110    /**
111     * Enqueue a message and wait for the queue to drain. Returning flush()
112     * preserves the contract that `await write()` resolves after the event is
113     * POSTed (relied on by tests and replBridge's initial flush). Fire-and-forget
114     * callers (`void transport.write()`) are unaffected — they don't await,
115     * so the later resolution doesn't add latency.
116     */
117    override async write(message: StdoutMessage): Promise<void> {
118      if (message.type === 'stream_event') {
119        // Delay: accumulate stream_events briefly before enqueueing.
120        // Promise resolves immediately — callers don't await stream_events.
121        this.streamEventBuffer.push(message)
122        if (!this.streamEventTimer) {
123          this.streamEventTimer = setTimeout(
124            () => this.flushStreamEvents(),
125            BATCH_FLUSH_INTERVAL_MS,
126          )
127        }
128        return
129      }
130      // Immediate: flush any buffered stream_events (ordering), then this event.
131      await this.uploader.enqueue([...this.takeStreamEvents(), message])
132      return this.uploader.flush()
133    }
134  
135    async writeBatch(messages: StdoutMessage[]): Promise<void> {
136      await this.uploader.enqueue([...this.takeStreamEvents(), ...messages])
137      return this.uploader.flush()
138    }
139  
140    /** Snapshot before/after writeBatch() to detect silent drops. */
141    get droppedBatchCount(): number {
142      return this.uploader.droppedBatchCount
143    }
144  
145    /**
146     * Block until all pending events are POSTed. Used by bridge's initial
147     * history flush so onStateChange('connected') fires after persistence.
148     */
149    flush(): Promise<void> {
150      void this.uploader.enqueue(this.takeStreamEvents())
151      return this.uploader.flush()
152    }
153  
154    /** Take ownership of buffered stream_events and clear the delay timer. */
155    private takeStreamEvents(): StdoutMessage[] {
156      if (this.streamEventTimer) {
157        clearTimeout(this.streamEventTimer)
158        this.streamEventTimer = null
159      }
160      const buffered = this.streamEventBuffer
161      this.streamEventBuffer = []
162      return buffered
163    }
164  
165    /** Delay timer fired — enqueue accumulated stream_events. */
166    private flushStreamEvents(): void {
167      this.streamEventTimer = null
168      void this.uploader.enqueue(this.takeStreamEvents())
169    }
170  
171    override close(): void {
172      if (this.streamEventTimer) {
173        clearTimeout(this.streamEventTimer)
174        this.streamEventTimer = null
175      }
176      this.streamEventBuffer = []
177      // Grace period for queued writes — fallback. replBridge teardown now
178      // awaits archive between write and close (see CLOSE_GRACE_MS), so
179      // archive latency is the primary drain window and this is a last
180      // resort. Keep close() sync (returns immediately) but defer
181      // uploader.close() so any remaining queue gets a chance to finish.
182      const uploader = this.uploader
183      let graceTimer: ReturnType<typeof setTimeout> | undefined
184      void Promise.race([
185        uploader.flush(),
186        new Promise<void>(r => {
187          // eslint-disable-next-line no-restricted-syntax -- need timer ref for clearTimeout
188          graceTimer = setTimeout(r, CLOSE_GRACE_MS)
189        }),
190      ]).finally(() => {
191        clearTimeout(graceTimer)
192        uploader.close()
193      })
194      super.close()
195    }
196  
197    /**
198     * Single-attempt POST. Throws on retryable failures (429, 5xx, network)
199     * so SerialBatchEventUploader re-queues and retries. Returns on success
200     * and on permanent failures (4xx non-429, no token) so the uploader moves on.
201     */
202    private async postOnce(events: StdoutMessage[]): Promise<void> {
203      const sessionToken = getSessionIngressAuthToken()
204      if (!sessionToken) {
205        logForDebugging('HybridTransport: No session token available for POST')
206        logForDiagnosticsNoPII('warn', 'cli_hybrid_post_no_token')
207        return
208      }
209  
210      const headers: Record<string, string> = {
211        Authorization: `Bearer ${sessionToken}`,
212        'Content-Type': 'application/json',
213      }
214  
215      let response
216      try {
217        response = await axios.post(
218          this.postUrl,
219          { events },
220          {
221            headers,
222            validateStatus: () => true,
223            timeout: POST_TIMEOUT_MS,
224          },
225        )
226      } catch (error) {
227        const axiosError = error as AxiosError
228        logForDebugging(`HybridTransport: POST error: ${axiosError.message}`)
229        logForDiagnosticsNoPII('warn', 'cli_hybrid_post_network_error')
230        throw error
231      }
232  
233      if (response.status >= 200 && response.status < 300) {
234        logForDebugging(`HybridTransport: POST success count=${events.length}`)
235        return
236      }
237  
238      // 4xx (except 429) are permanent — drop, don't retry.
239      if (
240        response.status >= 400 &&
241        response.status < 500 &&
242        response.status !== 429
243      ) {
244        logForDebugging(
245          `HybridTransport: POST returned ${response.status} (permanent), dropping`,
246        )
247        logForDiagnosticsNoPII('warn', 'cli_hybrid_post_client_error', {
248          status: response.status,
249        })
250        return
251      }
252  
253      // 429 / 5xx — retryable. Throw so uploader re-queues and backs off.
254      logForDebugging(
255        `HybridTransport: POST returned ${response.status} (retryable)`,
256      )
257      logForDiagnosticsNoPII('warn', 'cli_hybrid_post_retryable_error', {
258        status: response.status,
259      })
260      throw new Error(`POST failed with ${response.status}`)
261    }
262  }
263  
264  /**
265   * Convert a WebSocket URL to the HTTP POST endpoint URL.
266   * From: wss://api.example.com/v2/session_ingress/ws/<session_id>
267   * To: https://api.example.com/v2/session_ingress/session/<session_id>/events
268   */
269  function convertWsUrlToPostUrl(wsUrl: URL): string {
270    const protocol = wsUrl.protocol === 'wss:' ? 'https:' : 'http:'
271  
272    // Replace /ws/ with /session/ and append /events
273    let pathname = wsUrl.pathname
274    pathname = pathname.replace('/ws/', '/session/')
275    if (!pathname.endsWith('/events')) {
276      pathname = pathname.endsWith('/')
277        ? pathname + 'events'
278        : pathname + '/events'
279    }
280  
281    return `${protocol}//${wsUrl.host}${pathname}${wsUrl.search}`
282  }