/ services / api / sessionIngress.ts
sessionIngress.ts
  1  import axios, { type AxiosError } from 'axios'
  2  import type { UUID } from 'crypto'
  3  import { getOauthConfig } from '../../constants/oauth.js'
  4  import type { Entry, TranscriptMessage } from '../../types/logs.js'
  5  import { logForDebugging } from '../../utils/debug.js'
  6  import { logForDiagnosticsNoPII } from '../../utils/diagLogs.js'
  7  import { isEnvTruthy } from '../../utils/envUtils.js'
  8  import { logError } from '../../utils/log.js'
  9  import { sequential } from '../../utils/sequential.js'
 10  import { getSessionIngressAuthToken } from '../../utils/sessionIngressAuth.js'
 11  import { sleep } from '../../utils/sleep.js'
 12  import { jsonStringify } from '../../utils/slowOperations.js'
 13  import { getOAuthHeaders } from '../../utils/teleport/api.js'
 14  
 15  interface SessionIngressError {
 16    error?: {
 17      message?: string
 18      type?: string
 19    }
 20  }
 21  
 22  // Module-level state
 23  const lastUuidMap: Map<string, UUID> = new Map()
 24  
 25  const MAX_RETRIES = 10
 26  const BASE_DELAY_MS = 500
 27  
 28  // Per-session sequential wrappers to prevent concurrent log writes
 29  const sequentialAppendBySession: Map<
 30    string,
 31    (
 32      entry: TranscriptMessage,
 33      url: string,
 34      headers: Record<string, string>,
 35    ) => Promise<boolean>
 36  > = new Map()
 37  
 38  /**
 39   * Gets or creates a sequential wrapper for a session
 40   * This ensures that log appends for a session are processed one at a time
 41   */
 42  function getOrCreateSequentialAppend(sessionId: string) {
 43    let sequentialAppend = sequentialAppendBySession.get(sessionId)
 44    if (!sequentialAppend) {
 45      sequentialAppend = sequential(
 46        async (
 47          entry: TranscriptMessage,
 48          url: string,
 49          headers: Record<string, string>,
 50        ) => await appendSessionLogImpl(sessionId, entry, url, headers),
 51      )
 52      sequentialAppendBySession.set(sessionId, sequentialAppend)
 53    }
 54    return sequentialAppend
 55  }
 56  
 57  /**
 58   * Internal implementation of appendSessionLog with retry logic
 59   * Retries on transient errors (network, 5xx, 429). On 409, adopts the server's
 60   * last UUID and retries (handles stale state from killed process's in-flight
 61   * requests). Fails immediately on 401.
 62   */
 63  async function appendSessionLogImpl(
 64    sessionId: string,
 65    entry: TranscriptMessage,
 66    url: string,
 67    headers: Record<string, string>,
 68  ): Promise<boolean> {
 69    for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
 70      try {
 71        const lastUuid = lastUuidMap.get(sessionId)
 72        const requestHeaders = { ...headers }
 73        if (lastUuid) {
 74          requestHeaders['Last-Uuid'] = lastUuid
 75        }
 76  
 77        const response = await axios.put(url, entry, {
 78          headers: requestHeaders,
 79          validateStatus: status => status < 500,
 80        })
 81  
 82        if (response.status === 200 || response.status === 201) {
 83          lastUuidMap.set(sessionId, entry.uuid)
 84          logForDebugging(
 85            `Successfully persisted session log entry for session ${sessionId}`,
 86          )
 87          return true
 88        }
 89  
 90        if (response.status === 409) {
 91          // Check if our entry was actually stored (server returned 409 but entry exists)
 92          // This handles the scenario where entry was stored but client received an error
 93          // response, causing lastUuidMap to be stale
 94          const serverLastUuid = response.headers['x-last-uuid']
 95          if (serverLastUuid === entry.uuid) {
 96            // Our entry IS the last entry on server - it was stored successfully previously
 97            lastUuidMap.set(sessionId, entry.uuid)
 98            logForDebugging(
 99              `Session entry ${entry.uuid} already present on server, recovering from stale state`,
100            )
101            logForDiagnosticsNoPII('info', 'session_persist_recovered_from_409')
102            return true
103          }
104  
105          // Another writer (e.g. in-flight request from a killed process)
106          // advanced the server's chain. Try to adopt the server's last UUID
107          // from the response header, or re-fetch the session to discover it.
108          if (serverLastUuid) {
109            lastUuidMap.set(sessionId, serverLastUuid as UUID)
110            logForDebugging(
111              `Session 409: adopting server lastUuid=${serverLastUuid} from header, retrying entry ${entry.uuid}`,
112            )
113          } else {
114            // Server didn't return x-last-uuid (e.g. v1 endpoint). Re-fetch
115            // the session to discover the current head of the append chain.
116            const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
117            const adoptedUuid = findLastUuid(logs)
118            if (adoptedUuid) {
119              lastUuidMap.set(sessionId, adoptedUuid)
120              logForDebugging(
121                `Session 409: re-fetched ${logs!.length} entries, adopting lastUuid=${adoptedUuid}, retrying entry ${entry.uuid}`,
122              )
123            } else {
124              // Can't determine server state — give up
125              const errorData = response.data as SessionIngressError
126              const errorMessage =
127                errorData.error?.message || 'Concurrent modification detected'
128              logError(
129                new Error(
130                  `Session persistence conflict: UUID mismatch for session ${sessionId}, entry ${entry.uuid}. ${errorMessage}`,
131                ),
132              )
133              logForDiagnosticsNoPII(
134                'error',
135                'session_persist_fail_concurrent_modification',
136              )
137              return false
138            }
139          }
140          logForDiagnosticsNoPII('info', 'session_persist_409_adopt_server_uuid')
141          continue // retry with updated lastUuid
142        }
143  
144        if (response.status === 401) {
145          logForDebugging('Session token expired or invalid')
146          logForDiagnosticsNoPII('error', 'session_persist_fail_bad_token')
147          return false // Non-retryable
148        }
149  
150        // Other 4xx (429, etc.) - retryable
151        logForDebugging(
152          `Failed to persist session log: ${response.status} ${response.statusText}`,
153        )
154        logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
155          status: response.status,
156          attempt,
157        })
158      } catch (error) {
159        // Network errors, 5xx - retryable
160        const axiosError = error as AxiosError<SessionIngressError>
161        logError(new Error(`Error persisting session log: ${axiosError.message}`))
162        logForDiagnosticsNoPII('error', 'session_persist_fail_status', {
163          status: axiosError.status,
164          attempt,
165        })
166      }
167  
168      if (attempt === MAX_RETRIES) {
169        logForDebugging(`Remote persistence failed after ${MAX_RETRIES} attempts`)
170        logForDiagnosticsNoPII(
171          'error',
172          'session_persist_error_retries_exhausted',
173          { attempt },
174        )
175        return false
176      }
177  
178      const delayMs = Math.min(BASE_DELAY_MS * Math.pow(2, attempt - 1), 8000)
179      logForDebugging(
180        `Remote persistence attempt ${attempt}/${MAX_RETRIES} failed, retrying in ${delayMs}ms…`,
181      )
182      await sleep(delayMs)
183    }
184  
185    return false
186  }
187  
188  /**
189   * Append a log entry to the session using JWT token
190   * Uses optimistic concurrency control with Last-Uuid header
191   * Ensures sequential execution per session to prevent race conditions
192   */
193  export async function appendSessionLog(
194    sessionId: string,
195    entry: TranscriptMessage,
196    url: string,
197  ): Promise<boolean> {
198    const sessionToken = getSessionIngressAuthToken()
199    if (!sessionToken) {
200      logForDebugging('No session token available for session persistence')
201      logForDiagnosticsNoPII('error', 'session_persist_fail_jwt_no_token')
202      return false
203    }
204  
205    const headers: Record<string, string> = {
206      Authorization: `Bearer ${sessionToken}`,
207      'Content-Type': 'application/json',
208    }
209  
210    const sequentialAppend = getOrCreateSequentialAppend(sessionId)
211    return sequentialAppend(entry, url, headers)
212  }
213  
214  /**
215   * Get all session logs for hydration
216   */
217  export async function getSessionLogs(
218    sessionId: string,
219    url: string,
220  ): Promise<Entry[] | null> {
221    const sessionToken = getSessionIngressAuthToken()
222    if (!sessionToken) {
223      logForDebugging('No session token available for fetching session logs')
224      logForDiagnosticsNoPII('error', 'session_get_fail_no_token')
225      return null
226    }
227  
228    const headers = { Authorization: `Bearer ${sessionToken}` }
229    const logs = await fetchSessionLogsFromUrl(sessionId, url, headers)
230  
231    if (logs && logs.length > 0) {
232      // Update our lastUuid to the last entry's UUID
233      const lastEntry = logs.at(-1)
234      if (lastEntry && 'uuid' in lastEntry && lastEntry.uuid) {
235        lastUuidMap.set(sessionId, lastEntry.uuid)
236      }
237    }
238  
239    return logs
240  }
241  
242  /**
243   * Get all session logs for hydration via OAuth
244   * Used for teleporting sessions from the Sessions API
245   */
246  export async function getSessionLogsViaOAuth(
247    sessionId: string,
248    accessToken: string,
249    orgUUID: string,
250  ): Promise<Entry[] | null> {
251    const url = `${getOauthConfig().BASE_API_URL}/v1/session_ingress/session/${sessionId}`
252    logForDebugging(`[session-ingress] Fetching session logs from: ${url}`)
253    const headers = {
254      ...getOAuthHeaders(accessToken),
255      'x-organization-uuid': orgUUID,
256    }
257    const result = await fetchSessionLogsFromUrl(sessionId, url, headers)
258    return result
259  }
260  
261  /**
262   * Response shape from GET /v1/code/sessions/{id}/teleport-events.
263   * WorkerEvent.payload IS the Entry (TranscriptMessage struct) — the CLI
264   * writes it via AddWorkerEvent, the server stores it opaque, we read it
265   * back here.
266   */
267  type TeleportEventsResponse = {
268    data: Array<{
269      event_id: string
270      event_type: string
271      is_compaction: boolean
272      payload: Entry | null
273      created_at: string
274    }>
275    // Unset when there are no more pages — this IS the end-of-stream
276    // signal (no separate has_more field).
277    next_cursor?: string
278  }
279  
280  /**
281   * Get worker events (transcript) via the CCR v2 Sessions API. Replaces
282   * getSessionLogsViaOAuth once session-ingress is retired.
283   *
284   * The server dispatches per-session: Spanner for v2-native sessions,
285   * threadstore for pre-backfill session_* IDs. The cursor is opaque to us —
286   * echo it back until next_cursor is unset.
287   *
288   * Paginated (500/page default, server max 1000). session-ingress's one-shot
289   * 50k is gone; we loop.
290   */
291  export async function getTeleportEvents(
292    sessionId: string,
293    accessToken: string,
294    orgUUID: string,
295  ): Promise<Entry[] | null> {
296    const baseUrl = `${getOauthConfig().BASE_API_URL}/v1/code/sessions/${sessionId}/teleport-events`
297    const headers = {
298      ...getOAuthHeaders(accessToken),
299      'x-organization-uuid': orgUUID,
300    }
301  
302    logForDebugging(`[teleport] Fetching events from: ${baseUrl}`)
303  
304    const all: Entry[] = []
305    let cursor: string | undefined
306    let pages = 0
307  
308    // Infinite-loop guard: 1000/page × 100 pages = 100k events. Larger than
309    // session-ingress's 50k one-shot. If we hit this, something's wrong
310    // (server not advancing cursor) — bail rather than hang.
311    const maxPages = 100
312  
313    while (pages < maxPages) {
314      const params: Record<string, string | number> = { limit: 1000 }
315      if (cursor !== undefined) {
316        params.cursor = cursor
317      }
318  
319      let response
320      try {
321        response = await axios.get<TeleportEventsResponse>(baseUrl, {
322          headers,
323          params,
324          timeout: 20000,
325          validateStatus: status => status < 500,
326        })
327      } catch (e) {
328        const err = e as AxiosError
329        logError(new Error(`Teleport events fetch failed: ${err.message}`))
330        logForDiagnosticsNoPII('error', 'teleport_events_fetch_fail')
331        return null
332      }
333  
334      if (response.status === 404) {
335        // 404 on page 0 is ambiguous during the migration window:
336        //   (a) Session genuinely not found (not in Spanner AND not in
337        //       threadstore) — nothing to fetch.
338        //   (b) Route-level 404: endpoint not deployed yet, or session is
339        //       a threadstore session not yet backfilled into Spanner.
340        // We can't tell them apart from the response alone. Returning null
341        // lets the caller fall back to session-ingress, which will correctly
342        // return empty for case (a) and data for case (b). Once the backfill
343        // is complete and session-ingress is gone, the fallback also returns
344        // null → same "Failed to fetch session logs" error as today.
345        //
346        // 404 mid-pagination (pages > 0) means session was deleted between
347        // pages — return what we have.
348        logForDebugging(
349          `[teleport] Session ${sessionId} not found (page ${pages})`,
350        )
351        logForDiagnosticsNoPII('warn', 'teleport_events_not_found')
352        return pages === 0 ? null : all
353      }
354  
355      if (response.status === 401) {
356        logForDiagnosticsNoPII('error', 'teleport_events_bad_token')
357        throw new Error(
358          'Your session has expired. Please run /login to sign in again.',
359        )
360      }
361  
362      if (response.status !== 200) {
363        logError(
364          new Error(
365            `Teleport events returned ${response.status}: ${jsonStringify(response.data)}`,
366          ),
367        )
368        logForDiagnosticsNoPII('error', 'teleport_events_bad_status')
369        return null
370      }
371  
372      const { data, next_cursor } = response.data
373      if (!Array.isArray(data)) {
374        logError(
375          new Error(
376            `Teleport events invalid response shape: ${jsonStringify(response.data)}`,
377          ),
378        )
379        logForDiagnosticsNoPII('error', 'teleport_events_invalid_shape')
380        return null
381      }
382  
383      // payload IS the Entry. null payload happens for threadstore non-generic
384      // events (server skips them) or encryption failures — skip here too.
385      for (const ev of data) {
386        if (ev.payload !== null) {
387          all.push(ev.payload)
388        }
389      }
390  
391      pages++
392      // == null covers both `null` and `undefined` — the proto omits the
393      // field at end-of-stream, but some serializers emit `null`. Strict
394      // `=== undefined` would loop forever on `null` (cursor=null in query
395      // params stringifies to "null", which the server rejects or echoes).
396      if (next_cursor == null) {
397        break
398      }
399      cursor = next_cursor
400    }
401  
402    if (pages >= maxPages) {
403      // Don't fail — return what we have. Better to teleport with a
404      // truncated transcript than not at all.
405      logError(
406        new Error(`Teleport events hit page cap (${maxPages}) for ${sessionId}`),
407      )
408      logForDiagnosticsNoPII('warn', 'teleport_events_page_cap')
409    }
410  
411    logForDebugging(
412      `[teleport] Fetched ${all.length} events over ${pages} page(s) for ${sessionId}`,
413    )
414    return all
415  }
416  
417  /**
418   * Shared implementation for fetching session logs from a URL
419   */
420  async function fetchSessionLogsFromUrl(
421    sessionId: string,
422    url: string,
423    headers: Record<string, string>,
424  ): Promise<Entry[] | null> {
425    try {
426      const response = await axios.get(url, {
427        headers,
428        timeout: 20000,
429        validateStatus: status => status < 500,
430        params: isEnvTruthy(process.env.CLAUDE_AFTER_LAST_COMPACT)
431          ? { after_last_compact: true }
432          : undefined,
433      })
434  
435      if (response.status === 200) {
436        const data = response.data
437  
438        // Validate the response structure
439        if (!data || typeof data !== 'object' || !Array.isArray(data.loglines)) {
440          logError(
441            new Error(
442              `Invalid session logs response format: ${jsonStringify(data)}`,
443            ),
444          )
445          logForDiagnosticsNoPII('error', 'session_get_fail_invalid_response')
446          return null
447        }
448  
449        const logs = data.loglines as Entry[]
450        logForDebugging(
451          `Fetched ${logs.length} session logs for session ${sessionId}`,
452        )
453        return logs
454      }
455  
456      if (response.status === 404) {
457        logForDebugging(`No existing logs for session ${sessionId}`)
458        logForDiagnosticsNoPII('warn', 'session_get_no_logs_for_session')
459        return []
460      }
461  
462      if (response.status === 401) {
463        logForDebugging('Auth token expired or invalid')
464        logForDiagnosticsNoPII('error', 'session_get_fail_bad_token')
465        throw new Error(
466          'Your session has expired. Please run /login to sign in again.',
467        )
468      }
469  
470      logForDebugging(
471        `Failed to fetch session logs: ${response.status} ${response.statusText}`,
472      )
473      logForDiagnosticsNoPII('error', 'session_get_fail_status', {
474        status: response.status,
475      })
476      return null
477    } catch (error) {
478      const axiosError = error as AxiosError<SessionIngressError>
479      logError(new Error(`Error fetching session logs: ${axiosError.message}`))
480      logForDiagnosticsNoPII('error', 'session_get_fail_status', {
481        status: axiosError.status,
482      })
483      return null
484    }
485  }
486  
487  /**
488   * Walk backward through entries to find the last one with a uuid.
489   * Some entry types (SummaryMessage, TagMessage) don't have one.
490   */
491  function findLastUuid(logs: Entry[] | null): UUID | undefined {
492    if (!logs) {
493      return undefined
494    }
495    const entry = logs.findLast(e => 'uuid' in e && e.uuid)
496    return entry && 'uuid' in entry ? (entry.uuid as UUID) : undefined
497  }
498  
499  /**
500   * Clear cached state for a session
501   */
502  export function clearSession(sessionId: string): void {
503    lastUuidMap.delete(sessionId)
504    sequentialAppendBySession.delete(sessionId)
505  }
506  
507  /**
508   * Clear all cached session state (all sessions).
509   * Use this on /clear to free sub-agent session entries.
510   */
511  export function clearAllSessions(): void {
512    lastUuidMap.clear()
513    sequentialAppendBySession.clear()
514  }