/ services / api / claude.ts
claude.ts
   1  import type {
   2    BetaContentBlock,
   3    BetaContentBlockParam,
   4    BetaImageBlockParam,
   5    BetaJSONOutputFormat,
   6    BetaMessage,
   7    BetaMessageDeltaUsage,
   8    BetaMessageStreamParams,
   9    BetaOutputConfig,
  10    BetaRawMessageStreamEvent,
  11    BetaRequestDocumentBlock,
  12    BetaStopReason,
  13    BetaToolChoiceAuto,
  14    BetaToolChoiceTool,
  15    BetaToolResultBlockParam,
  16    BetaToolUnion,
  17    BetaUsage,
  18    BetaMessageParam as MessageParam,
  19  } from '@anthropic-ai/sdk/resources/beta/messages/messages.mjs'
  20  import type { TextBlockParam } from '@anthropic-ai/sdk/resources/index.mjs'
  21  import type { Stream } from '@anthropic-ai/sdk/streaming.mjs'
  22  import { randomUUID } from 'crypto'
  23  import {
  24    getAPIProvider,
  25    isFirstPartyAnthropicBaseUrl,
  26  } from 'src/utils/model/providers.js'
  27  import {
  28    getAttributionHeader,
  29    getCLISyspromptPrefix,
  30  } from '../../constants/system.js'
  31  import {
  32    getEmptyToolPermissionContext,
  33    type QueryChainTracking,
  34    type Tool,
  35    type ToolPermissionContext,
  36    type Tools,
  37    toolMatchesName,
  38  } from '../../Tool.js'
  39  import type { AgentDefinition } from '../../tools/AgentTool/loadAgentsDir.js'
  40  import {
  41    type ConnectorTextBlock,
  42    type ConnectorTextDelta,
  43    isConnectorTextBlock,
  44  } from '../../types/connectorText.js'
  45  import type {
  46    AssistantMessage,
  47    Message,
  48    StreamEvent,
  49    SystemAPIErrorMessage,
  50    UserMessage,
  51  } from '../../types/message.js'
  52  import {
  53    type CacheScope,
  54    logAPIPrefix,
  55    splitSysPromptPrefix,
  56    toolToAPISchema,
  57  } from '../../utils/api.js'
  58  import { getOauthAccountInfo } from '../../utils/auth.js'
  59  import {
  60    getBedrockExtraBodyParamsBetas,
  61    getMergedBetas,
  62    getModelBetas,
  63  } from '../../utils/betas.js'
  64  import { getOrCreateUserID } from '../../utils/config.js'
  65  import {
  66    CAPPED_DEFAULT_MAX_TOKENS,
  67    getModelMaxOutputTokens,
  68    getSonnet1mExpTreatmentEnabled,
  69  } from '../../utils/context.js'
  70  import { resolveAppliedEffort } from '../../utils/effort.js'
  71  import { isEnvTruthy } from '../../utils/envUtils.js'
  72  import { errorMessage } from '../../utils/errors.js'
  73  import { computeFingerprintFromMessages } from '../../utils/fingerprint.js'
  74  import { captureAPIRequest, logError } from '../../utils/log.js'
  75  import {
  76    createAssistantAPIErrorMessage,
  77    createUserMessage,
  78    ensureToolResultPairing,
  79    normalizeContentFromAPI,
  80    normalizeMessagesForAPI,
  81    stripAdvisorBlocks,
  82    stripCallerFieldFromAssistantMessage,
  83    stripToolReferenceBlocksFromUserMessage,
  84  } from '../../utils/messages.js'
  85  import {
  86    getDefaultOpusModel,
  87    getDefaultSonnetModel,
  88    getSmallFastModel,
  89    isNonCustomOpusModel,
  90  } from '../../utils/model/model.js'
  91  import {
  92    asSystemPrompt,
  93    type SystemPrompt,
  94  } from '../../utils/systemPromptType.js'
  95  import { tokenCountFromLastAPIResponse } from '../../utils/tokens.js'
  96  import { getDynamicConfig_BLOCKS_ON_INIT } from '../analytics/growthbook.js'
  97  import {
  98    currentLimits,
  99    extractQuotaStatusFromError,
 100    extractQuotaStatusFromHeaders,
 101  } from '../claudeAiLimits.js'
 102  import { getAPIContextManagement } from '../compact/apiMicrocompact.js'
 103  
 104  /* eslint-disable @typescript-eslint/no-require-imports */
 105  const autoModeStateModule = feature('TRANSCRIPT_CLASSIFIER')
 106    ? (require('../../utils/permissions/autoModeState.js') as typeof import('../../utils/permissions/autoModeState.js'))
 107    : null
 108  
 109  import { feature } from 'bun:bundle'
 110  import type { ClientOptions } from '@anthropic-ai/sdk'
 111  import {
 112    APIConnectionTimeoutError,
 113    APIError,
 114    APIUserAbortError,
 115  } from '@anthropic-ai/sdk/error'
 116  import {
 117    getAfkModeHeaderLatched,
 118    getCacheEditingHeaderLatched,
 119    getFastModeHeaderLatched,
 120    getLastApiCompletionTimestamp,
 121    getPromptCache1hAllowlist,
 122    getPromptCache1hEligible,
 123    getSessionId,
 124    getThinkingClearLatched,
 125    setAfkModeHeaderLatched,
 126    setCacheEditingHeaderLatched,
 127    setFastModeHeaderLatched,
 128    setLastMainRequestId,
 129    setPromptCache1hAllowlist,
 130    setPromptCache1hEligible,
 131    setThinkingClearLatched,
 132  } from 'src/bootstrap/state.js'
 133  import {
 134    AFK_MODE_BETA_HEADER,
 135    CONTEXT_1M_BETA_HEADER,
 136    CONTEXT_MANAGEMENT_BETA_HEADER,
 137    EFFORT_BETA_HEADER,
 138    FAST_MODE_BETA_HEADER,
 139    PROMPT_CACHING_SCOPE_BETA_HEADER,
 140    REDACT_THINKING_BETA_HEADER,
 141    STRUCTURED_OUTPUTS_BETA_HEADER,
 142    TASK_BUDGETS_BETA_HEADER,
 143  } from 'src/constants/betas.js'
 144  import type { QuerySource } from 'src/constants/querySource.js'
 145  import type { Notification } from 'src/context/notifications.js'
 146  import { addToTotalSessionCost } from 'src/cost-tracker.js'
 147  import { getFeatureValue_CACHED_MAY_BE_STALE } from 'src/services/analytics/growthbook.js'
 148  import type { AgentId } from 'src/types/ids.js'
 149  import {
 150    ADVISOR_TOOL_INSTRUCTIONS,
 151    getExperimentAdvisorModels,
 152    isAdvisorEnabled,
 153    isValidAdvisorModel,
 154    modelSupportsAdvisor,
 155  } from 'src/utils/advisor.js'
 156  import { getAgentContext } from 'src/utils/agentContext.js'
 157  import { isClaudeAISubscriber } from 'src/utils/auth.js'
 158  import {
 159    getToolSearchBetaHeader,
 160    modelSupportsStructuredOutputs,
 161    shouldIncludeFirstPartyOnlyBetas,
 162    shouldUseGlobalCacheScope,
 163  } from 'src/utils/betas.js'
 164  import { CLAUDE_IN_CHROME_MCP_SERVER_NAME } from 'src/utils/claudeInChrome/common.js'
 165  import { CHROME_TOOL_SEARCH_INSTRUCTIONS } from 'src/utils/claudeInChrome/prompt.js'
 166  import { getMaxThinkingTokensForModel } from 'src/utils/context.js'
 167  import { logForDebugging } from 'src/utils/debug.js'
 168  import { logForDiagnosticsNoPII } from 'src/utils/diagLogs.js'
 169  import { type EffortValue, modelSupportsEffort } from 'src/utils/effort.js'
 170  import {
 171    isFastModeAvailable,
 172    isFastModeCooldown,
 173    isFastModeEnabled,
 174    isFastModeSupportedByModel,
 175  } from 'src/utils/fastMode.js'
 176  import { returnValue } from 'src/utils/generators.js'
 177  import { headlessProfilerCheckpoint } from 'src/utils/headlessProfiler.js'
 178  import { isMcpInstructionsDeltaEnabled } from 'src/utils/mcpInstructionsDelta.js'
 179  import { calculateUSDCost } from 'src/utils/modelCost.js'
 180  import { endQueryProfile, queryCheckpoint } from 'src/utils/queryProfiler.js'
 181  import {
 182    modelSupportsAdaptiveThinking,
 183    modelSupportsThinking,
 184    type ThinkingConfig,
 185  } from 'src/utils/thinking.js'
 186  import {
 187    extractDiscoveredToolNames,
 188    isDeferredToolsDeltaEnabled,
 189    isToolSearchEnabled,
 190  } from 'src/utils/toolSearch.js'
 191  import { API_MAX_MEDIA_PER_REQUEST } from '../../constants/apiLimits.js'
 192  import { ADVISOR_BETA_HEADER } from '../../constants/betas.js'
 193  import {
 194    formatDeferredToolLine,
 195    isDeferredTool,
 196    TOOL_SEARCH_TOOL_NAME,
 197  } from '../../tools/ToolSearchTool/prompt.js'
 198  import { count } from '../../utils/array.js'
 199  import { insertBlockAfterToolResults } from '../../utils/contentArray.js'
 200  import { validateBoundedIntEnvVar } from '../../utils/envValidation.js'
 201  import { safeParseJSON } from '../../utils/json.js'
 202  import { getInferenceProfileBackingModel } from '../../utils/model/bedrock.js'
 203  import {
 204    normalizeModelStringForAPI,
 205    parseUserSpecifiedModel,
 206  } from '../../utils/model/model.js'
 207  import {
 208    startSessionActivity,
 209    stopSessionActivity,
 210  } from '../../utils/sessionActivity.js'
 211  import { jsonStringify } from '../../utils/slowOperations.js'
 212  import {
 213    isBetaTracingEnabled,
 214    type LLMRequestNewContext,
 215    startLLMRequestSpan,
 216  } from '../../utils/telemetry/sessionTracing.js'
 217  /* eslint-enable @typescript-eslint/no-require-imports */
 218  import {
 219    type AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 220    logEvent,
 221  } from '../analytics/index.js'
 222  import {
 223    consumePendingCacheEdits,
 224    getPinnedCacheEdits,
 225    markToolsSentToAPIState,
 226    pinCacheEdits,
 227  } from '../compact/microCompact.js'
 228  import { getInitializationStatus } from '../lsp/manager.js'
 229  import { isToolFromMcpServer } from '../mcp/utils.js'
 230  import { withStreamingVCR, withVCR } from '../vcr.js'
 231  import { CLIENT_REQUEST_ID_HEADER, getAnthropicClient } from './client.js'
 232  import {
 233    API_ERROR_MESSAGE_PREFIX,
 234    CUSTOM_OFF_SWITCH_MESSAGE,
 235    getAssistantMessageFromError,
 236    getErrorMessageIfRefusal,
 237  } from './errors.js'
 238  import {
 239    EMPTY_USAGE,
 240    type GlobalCacheStrategy,
 241    logAPIError,
 242    logAPIQuery,
 243    logAPISuccessAndDuration,
 244    type NonNullableUsage,
 245  } from './logging.js'
 246  import {
 247    CACHE_TTL_1HOUR_MS,
 248    checkResponseForCacheBreak,
 249    recordPromptState,
 250  } from './promptCacheBreakDetection.js'
 251  import {
 252    CannotRetryError,
 253    FallbackTriggeredError,
 254    is529Error,
 255    type RetryContext,
 256    withRetry,
 257  } from './withRetry.js'
 258  
 259  // Define a type that represents valid JSON values
 260  type JsonValue = string | number | boolean | null | JsonObject | JsonArray
 261  type JsonObject = { [key: string]: JsonValue }
 262  type JsonArray = JsonValue[]
 263  
 264  /**
 265   * Assemble the extra body parameters for the API request, based on the
 266   * CLAUDE_CODE_EXTRA_BODY environment variable if present and on any beta
 267   * headers (primarily for Bedrock requests).
 268   *
 269   * @param betaHeaders - An array of beta headers to include in the request.
 270   * @returns A JSON object representing the extra body parameters.
 271   */
 272  export function getExtraBodyParams(betaHeaders?: string[]): JsonObject {
 273    // Parse user's extra body parameters first
 274    const extraBodyStr = process.env.CLAUDE_CODE_EXTRA_BODY
 275    let result: JsonObject = {}
 276  
 277    if (extraBodyStr) {
 278      try {
 279        // Parse as JSON, which can be null, boolean, number, string, array or object
 280        const parsed = safeParseJSON(extraBodyStr)
 281        // We expect an object with key-value pairs to spread into API parameters
 282        if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
 283          // Shallow clone — safeParseJSON is LRU-cached and returns the same
 284          // object reference for the same string. Mutating `result` below
 285          // would poison the cache, causing stale values to persist.
 286          result = { ...(parsed as JsonObject) }
 287        } else {
 288          logForDebugging(
 289            `CLAUDE_CODE_EXTRA_BODY env var must be a JSON object, but was given ${extraBodyStr}`,
 290            { level: 'error' },
 291          )
 292        }
 293      } catch (error) {
 294        logForDebugging(
 295          `Error parsing CLAUDE_CODE_EXTRA_BODY: ${errorMessage(error)}`,
 296          { level: 'error' },
 297        )
 298      }
 299    }
 300  
 301    // Anti-distillation: send fake_tools opt-in for 1P CLI only
 302    if (
 303      feature('ANTI_DISTILLATION_CC')
 304        ? process.env.CLAUDE_CODE_ENTRYPOINT === 'cli' &&
 305          shouldIncludeFirstPartyOnlyBetas() &&
 306          getFeatureValue_CACHED_MAY_BE_STALE(
 307            'tengu_anti_distill_fake_tool_injection',
 308            false,
 309          )
 310        : false
 311    ) {
 312      result.anti_distillation = ['fake_tools']
 313    }
 314  
 315    // Handle beta headers if provided
 316    if (betaHeaders && betaHeaders.length > 0) {
 317      if (result.anthropic_beta && Array.isArray(result.anthropic_beta)) {
 318        // Add to existing array, avoiding duplicates
 319        const existingHeaders = result.anthropic_beta as string[]
 320        const newHeaders = betaHeaders.filter(
 321          header => !existingHeaders.includes(header),
 322        )
 323        result.anthropic_beta = [...existingHeaders, ...newHeaders]
 324      } else {
 325        // Create new array with the beta headers
 326        result.anthropic_beta = betaHeaders
 327      }
 328    }
 329  
 330    return result
 331  }
 332  
 333  export function getPromptCachingEnabled(model: string): boolean {
 334    // Global disable takes precedence
 335    if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING)) return false
 336  
 337    // Check if we should disable for small/fast model
 338    if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_HAIKU)) {
 339      const smallFastModel = getSmallFastModel()
 340      if (model === smallFastModel) return false
 341    }
 342  
 343    // Check if we should disable for default Sonnet
 344    if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_SONNET)) {
 345      const defaultSonnet = getDefaultSonnetModel()
 346      if (model === defaultSonnet) return false
 347    }
 348  
 349    // Check if we should disable for default Opus
 350    if (isEnvTruthy(process.env.DISABLE_PROMPT_CACHING_OPUS)) {
 351      const defaultOpus = getDefaultOpusModel()
 352      if (model === defaultOpus) return false
 353    }
 354  
 355    return true
 356  }
 357  
 358  export function getCacheControl({
 359    scope,
 360    querySource,
 361  }: {
 362    scope?: CacheScope
 363    querySource?: QuerySource
 364  } = {}): {
 365    type: 'ephemeral'
 366    ttl?: '1h'
 367    scope?: CacheScope
 368  } {
 369    return {
 370      type: 'ephemeral',
 371      ...(should1hCacheTTL(querySource) && { ttl: '1h' }),
 372      ...(scope === 'global' && { scope }),
 373    }
 374  }
 375  
 376  /**
 377   * Determines if 1h TTL should be used for prompt caching.
 378   *
 379   * Only applied when:
 380   * 1. User is eligible (ant or subscriber within rate limits)
 381   * 2. The query source matches a pattern in the GrowthBook allowlist
 382   *
 383   * GrowthBook config shape: { allowlist: string[] }
 384   * Patterns support trailing '*' for prefix matching.
 385   * Examples:
 386   * - { allowlist: ["repl_main_thread*", "sdk"] } — main thread + SDK only
 387   * - { allowlist: ["repl_main_thread*", "sdk", "agent:*"] } — also subagents
 388   * - { allowlist: ["*"] } — all sources
 389   *
 390   * The allowlist is cached in STATE for session stability — prevents mixed
 391   * TTLs when GrowthBook's disk cache updates mid-request.
 392   */
 393  function should1hCacheTTL(querySource?: QuerySource): boolean {
 394    // 3P Bedrock users get 1h TTL when opted in via env var — they manage their own billing
 395    // No GrowthBook gating needed since 3P users don't have GrowthBook configured
 396    if (
 397      getAPIProvider() === 'bedrock' &&
 398      isEnvTruthy(process.env.ENABLE_PROMPT_CACHING_1H_BEDROCK)
 399    ) {
 400      return true
 401    }
 402  
 403    // Latch eligibility in bootstrap state for session stability — prevents
 404    // mid-session overage flips from changing the cache_control TTL, which
 405    // would bust the server-side prompt cache (~20K tokens per flip).
 406    let userEligible = getPromptCache1hEligible()
 407    if (userEligible === null) {
 408      userEligible =
 409        process.env.USER_TYPE === 'ant' ||
 410        (isClaudeAISubscriber() && !currentLimits.isUsingOverage)
 411      setPromptCache1hEligible(userEligible)
 412    }
 413    if (!userEligible) return false
 414  
 415    // Cache allowlist in bootstrap state for session stability — prevents mixed
 416    // TTLs when GrowthBook's disk cache updates mid-request
 417    let allowlist = getPromptCache1hAllowlist()
 418    if (allowlist === null) {
 419      const config = getFeatureValue_CACHED_MAY_BE_STALE<{
 420        allowlist?: string[]
 421      }>('tengu_prompt_cache_1h_config', {})
 422      allowlist = config.allowlist ?? []
 423      setPromptCache1hAllowlist(allowlist)
 424    }
 425  
 426    return (
 427      querySource !== undefined &&
 428      allowlist.some(pattern =>
 429        pattern.endsWith('*')
 430          ? querySource.startsWith(pattern.slice(0, -1))
 431          : querySource === pattern,
 432      )
 433    )
 434  }
 435  
 436  /**
 437   * Configure effort parameters for API request.
 438   *
 439   */
 440  function configureEffortParams(
 441    effortValue: EffortValue | undefined,
 442    outputConfig: BetaOutputConfig,
 443    extraBodyParams: Record<string, unknown>,
 444    betas: string[],
 445    model: string,
 446  ): void {
 447    if (!modelSupportsEffort(model) || 'effort' in outputConfig) {
 448      return
 449    }
 450  
 451    if (effortValue === undefined) {
 452      betas.push(EFFORT_BETA_HEADER)
 453    } else if (typeof effortValue === 'string') {
 454      // Send string effort level as is
 455      outputConfig.effort = effortValue
 456      betas.push(EFFORT_BETA_HEADER)
 457    } else if (process.env.USER_TYPE === 'ant') {
 458      // Numeric effort override - ant-only (uses anthropic_internal)
 459      const existingInternal =
 460        (extraBodyParams.anthropic_internal as Record<string, unknown>) || {}
 461      extraBodyParams.anthropic_internal = {
 462        ...existingInternal,
 463        effort_override: effortValue,
 464      }
 465    }
 466  }
 467  
 468  // output_config.task_budget — API-side token budget awareness for the model.
 469  // Stainless SDK types don't yet include task_budget on BetaOutputConfig, so we
 470  // define the wire shape locally and cast. The API validates on receipt; see
 471  // api/api/schemas/messages/request/output_config.py:12-39 in the monorepo.
 472  // Beta: task-budgets-2026-03-13 (EAP, claude-strudel-eap only as of Mar 2026).
 473  type TaskBudgetParam = {
 474    type: 'tokens'
 475    total: number
 476    remaining?: number
 477  }
 478  
 479  export function configureTaskBudgetParams(
 480    taskBudget: Options['taskBudget'],
 481    outputConfig: BetaOutputConfig & { task_budget?: TaskBudgetParam },
 482    betas: string[],
 483  ): void {
 484    if (
 485      !taskBudget ||
 486      'task_budget' in outputConfig ||
 487      !shouldIncludeFirstPartyOnlyBetas()
 488    ) {
 489      return
 490    }
 491    outputConfig.task_budget = {
 492      type: 'tokens',
 493      total: taskBudget.total,
 494      ...(taskBudget.remaining !== undefined && {
 495        remaining: taskBudget.remaining,
 496      }),
 497    }
 498    if (!betas.includes(TASK_BUDGETS_BETA_HEADER)) {
 499      betas.push(TASK_BUDGETS_BETA_HEADER)
 500    }
 501  }
 502  
 503  export function getAPIMetadata() {
 504    // https://docs.google.com/document/d/1dURO9ycXXQCBS0V4Vhl4poDBRgkelFc5t2BNPoEgH5Q/edit?tab=t.0#heading=h.5g7nec5b09w5
 505    let extra: JsonObject = {}
 506    const extraStr = process.env.CLAUDE_CODE_EXTRA_METADATA
 507    if (extraStr) {
 508      const parsed = safeParseJSON(extraStr, false)
 509      if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
 510        extra = parsed as JsonObject
 511      } else {
 512        logForDebugging(
 513          `CLAUDE_CODE_EXTRA_METADATA env var must be a JSON object, but was given ${extraStr}`,
 514          { level: 'error' },
 515        )
 516      }
 517    }
 518  
 519    return {
 520      user_id: jsonStringify({
 521        ...extra,
 522        device_id: getOrCreateUserID(),
 523        // Only include OAuth account UUID when actively using OAuth authentication
 524        account_uuid: getOauthAccountInfo()?.accountUuid ?? '',
 525        session_id: getSessionId(),
 526      }),
 527    }
 528  }
 529  
 530  export async function verifyApiKey(
 531    apiKey: string,
 532    isNonInteractiveSession: boolean,
 533  ): Promise<boolean> {
 534    // Skip API verification if running in print mode (isNonInteractiveSession)
 535    if (isNonInteractiveSession) {
 536      return true
 537    }
 538  
 539    try {
 540      // WARNING: if you change this to use a non-Haiku model, this request will fail in 1P unless it uses getCLISyspromptPrefix.
 541      const model = getSmallFastModel()
 542      const betas = getModelBetas(model)
 543      return await returnValue(
 544        withRetry(
 545          () =>
 546            getAnthropicClient({
 547              apiKey,
 548              maxRetries: 3,
 549              model,
 550              source: 'verify_api_key',
 551            }),
 552          async anthropic => {
 553            const messages: MessageParam[] = [{ role: 'user', content: 'test' }]
 554            // biome-ignore lint/plugin: API key verification is intentionally a minimal direct call
 555            await anthropic.beta.messages.create({
 556              model,
 557              max_tokens: 1,
 558              messages,
 559              temperature: 1,
 560              ...(betas.length > 0 && { betas }),
 561              metadata: getAPIMetadata(),
 562              ...getExtraBodyParams(),
 563            })
 564            return true
 565          },
 566          { maxRetries: 2, model, thinkingConfig: { type: 'disabled' } }, // Use fewer retries for API key verification
 567        ),
 568      )
 569    } catch (errorFromRetry) {
 570      let error = errorFromRetry
 571      if (errorFromRetry instanceof CannotRetryError) {
 572        error = errorFromRetry.originalError
 573      }
 574      logError(error)
 575      // Check for authentication error
 576      if (
 577        error instanceof Error &&
 578        error.message.includes(
 579          '{"type":"error","error":{"type":"authentication_error","message":"invalid x-api-key"}}',
 580        )
 581      ) {
 582        return false
 583      }
 584      throw error
 585    }
 586  }
 587  
 588  export function userMessageToMessageParam(
 589    message: UserMessage,
 590    addCache = false,
 591    enablePromptCaching: boolean,
 592    querySource?: QuerySource,
 593  ): MessageParam {
 594    if (addCache) {
 595      if (typeof message.message.content === 'string') {
 596        return {
 597          role: 'user',
 598          content: [
 599            {
 600              type: 'text',
 601              text: message.message.content,
 602              ...(enablePromptCaching && {
 603                cache_control: getCacheControl({ querySource }),
 604              }),
 605            },
 606          ],
 607        }
 608      } else {
 609        return {
 610          role: 'user',
 611          content: message.message.content.map((_, i) => ({
 612            ..._,
 613            ...(i === message.message.content.length - 1
 614              ? enablePromptCaching
 615                ? { cache_control: getCacheControl({ querySource }) }
 616                : {}
 617              : {}),
 618          })),
 619        }
 620      }
 621    }
 622    // Clone array content to prevent in-place mutations (e.g., insertCacheEditsBlock's
 623    // splice) from contaminating the original message. Without cloning, multiple calls
 624    // to addCacheBreakpoints share the same array and each splices in duplicate cache_edits.
 625    return {
 626      role: 'user',
 627      content: Array.isArray(message.message.content)
 628        ? [...message.message.content]
 629        : message.message.content,
 630    }
 631  }
 632  
 633  export function assistantMessageToMessageParam(
 634    message: AssistantMessage,
 635    addCache = false,
 636    enablePromptCaching: boolean,
 637    querySource?: QuerySource,
 638  ): MessageParam {
 639    if (addCache) {
 640      if (typeof message.message.content === 'string') {
 641        return {
 642          role: 'assistant',
 643          content: [
 644            {
 645              type: 'text',
 646              text: message.message.content,
 647              ...(enablePromptCaching && {
 648                cache_control: getCacheControl({ querySource }),
 649              }),
 650            },
 651          ],
 652        }
 653      } else {
 654        return {
 655          role: 'assistant',
 656          content: message.message.content.map((_, i) => ({
 657            ..._,
 658            ...(i === message.message.content.length - 1 &&
 659            _.type !== 'thinking' &&
 660            _.type !== 'redacted_thinking' &&
 661            (feature('CONNECTOR_TEXT') ? !isConnectorTextBlock(_) : true)
 662              ? enablePromptCaching
 663                ? { cache_control: getCacheControl({ querySource }) }
 664                : {}
 665              : {}),
 666          })),
 667        }
 668      }
 669    }
 670    return {
 671      role: 'assistant',
 672      content: message.message.content,
 673    }
 674  }
 675  
 676  export type Options = {
 677    getToolPermissionContext: () => Promise<ToolPermissionContext>
 678    model: string
 679    toolChoice?: BetaToolChoiceTool | BetaToolChoiceAuto | undefined
 680    isNonInteractiveSession: boolean
 681    extraToolSchemas?: BetaToolUnion[]
 682    maxOutputTokensOverride?: number
 683    fallbackModel?: string
 684    onStreamingFallback?: () => void
 685    querySource: QuerySource
 686    agents: AgentDefinition[]
 687    allowedAgentTypes?: string[]
 688    hasAppendSystemPrompt: boolean
 689    fetchOverride?: ClientOptions['fetch']
 690    enablePromptCaching?: boolean
 691    skipCacheWrite?: boolean
 692    temperatureOverride?: number
 693    effortValue?: EffortValue
 694    mcpTools: Tools
 695    hasPendingMcpServers?: boolean
 696    queryTracking?: QueryChainTracking
 697    agentId?: AgentId // Only set for subagents
 698    outputFormat?: BetaJSONOutputFormat
 699    fastMode?: boolean
 700    advisorModel?: string
 701    addNotification?: (notif: Notification) => void
 702    // API-side task budget (output_config.task_budget). Distinct from the
 703    // tokenBudget.ts +500k auto-continue feature — this one is sent to the API
 704    // so the model can pace itself. `remaining` is computed by the caller
 705    // (query.ts decrements across the agentic loop).
 706    taskBudget?: { total: number; remaining?: number }
 707  }
 708  
 709  export async function queryModelWithoutStreaming({
 710    messages,
 711    systemPrompt,
 712    thinkingConfig,
 713    tools,
 714    signal,
 715    options,
 716  }: {
 717    messages: Message[]
 718    systemPrompt: SystemPrompt
 719    thinkingConfig: ThinkingConfig
 720    tools: Tools
 721    signal: AbortSignal
 722    options: Options
 723  }): Promise<AssistantMessage> {
 724    // Store the assistant message but continue consuming the generator to ensure
 725    // logAPISuccessAndDuration gets called (which happens after all yields)
 726    let assistantMessage: AssistantMessage | undefined
 727    for await (const message of withStreamingVCR(messages, async function* () {
 728      yield* queryModel(
 729        messages,
 730        systemPrompt,
 731        thinkingConfig,
 732        tools,
 733        signal,
 734        options,
 735      )
 736    })) {
 737      if (message.type === 'assistant') {
 738        assistantMessage = message
 739      }
 740    }
 741    if (!assistantMessage) {
 742      // If the signal was aborted, throw APIUserAbortError instead of a generic error
 743      // This allows callers to handle abort scenarios gracefully
 744      if (signal.aborted) {
 745        throw new APIUserAbortError()
 746      }
 747      throw new Error('No assistant message found')
 748    }
 749    return assistantMessage
 750  }
 751  
 752  export async function* queryModelWithStreaming({
 753    messages,
 754    systemPrompt,
 755    thinkingConfig,
 756    tools,
 757    signal,
 758    options,
 759  }: {
 760    messages: Message[]
 761    systemPrompt: SystemPrompt
 762    thinkingConfig: ThinkingConfig
 763    tools: Tools
 764    signal: AbortSignal
 765    options: Options
 766  }): AsyncGenerator<
 767    StreamEvent | AssistantMessage | SystemAPIErrorMessage,
 768    void
 769  > {
 770    return yield* withStreamingVCR(messages, async function* () {
 771      yield* queryModel(
 772        messages,
 773        systemPrompt,
 774        thinkingConfig,
 775        tools,
 776        signal,
 777        options,
 778      )
 779    })
 780  }
 781  
 782  /**
 783   * Determines if an LSP tool should be deferred (tool appears with defer_loading: true)
 784   * because LSP initialization is not yet complete.
 785   */
 786  function shouldDeferLspTool(tool: Tool): boolean {
 787    if (!('isLsp' in tool) || !tool.isLsp) {
 788      return false
 789    }
 790    const status = getInitializationStatus()
 791    // Defer when pending or not started
 792    return status.status === 'pending' || status.status === 'not-started'
 793  }
 794  
 795  /**
 796   * Per-attempt timeout for non-streaming fallback requests, in milliseconds.
 797   * Reads API_TIMEOUT_MS when set so slow backends and the streaming path
 798   * share the same ceiling.
 799   *
 800   * Remote sessions default to 120s to stay under CCR's container idle-kill
 801   * (~5min) so a hung fallback to a wedged backend surfaces a clean
 802   * APIConnectionTimeoutError instead of stalling past SIGKILL.
 803   *
 804   * Otherwise defaults to 300s — long enough for slow backends without
 805   * approaching the API's 10-minute non-streaming boundary.
 806   */
 807  function getNonstreamingFallbackTimeoutMs(): number {
 808    const override = parseInt(process.env.API_TIMEOUT_MS || '', 10)
 809    if (override) return override
 810    return isEnvTruthy(process.env.CLAUDE_CODE_REMOTE) ? 120_000 : 300_000
 811  }
 812  
 813  /**
 814   * Helper generator for non-streaming API requests.
 815   * Encapsulates the common pattern of creating a withRetry generator,
 816   * iterating to yield system messages, and returning the final BetaMessage.
 817   */
 818  export async function* executeNonStreamingRequest(
 819    clientOptions: {
 820      model: string
 821      fetchOverride?: Options['fetchOverride']
 822      source: string
 823    },
 824    retryOptions: {
 825      model: string
 826      fallbackModel?: string
 827      thinkingConfig: ThinkingConfig
 828      fastMode?: boolean
 829      signal: AbortSignal
 830      initialConsecutive529Errors?: number
 831      querySource?: QuerySource
 832    },
 833    paramsFromContext: (context: RetryContext) => BetaMessageStreamParams,
 834    onAttempt: (attempt: number, start: number, maxOutputTokens: number) => void,
 835    captureRequest: (params: BetaMessageStreamParams) => void,
 836    /**
 837     * Request ID of the failed streaming attempt this fallback is recovering
 838     * from. Emitted in tengu_nonstreaming_fallback_error for funnel correlation.
 839     */
 840    originatingRequestId?: string | null,
 841  ): AsyncGenerator<SystemAPIErrorMessage, BetaMessage> {
 842    const fallbackTimeoutMs = getNonstreamingFallbackTimeoutMs()
 843    const generator = withRetry(
 844      () =>
 845        getAnthropicClient({
 846          maxRetries: 0,
 847          model: clientOptions.model,
 848          fetchOverride: clientOptions.fetchOverride,
 849          source: clientOptions.source,
 850        }),
 851      async (anthropic, attempt, context) => {
 852        const start = Date.now()
 853        const retryParams = paramsFromContext(context)
 854        captureRequest(retryParams)
 855        onAttempt(attempt, start, retryParams.max_tokens)
 856  
 857        const adjustedParams = adjustParamsForNonStreaming(
 858          retryParams,
 859          MAX_NON_STREAMING_TOKENS,
 860        )
 861  
 862        try {
 863          // biome-ignore lint/plugin: non-streaming API call
 864          return await anthropic.beta.messages.create(
 865            {
 866              ...adjustedParams,
 867              model: normalizeModelStringForAPI(adjustedParams.model),
 868            },
 869            {
 870              signal: retryOptions.signal,
 871              timeout: fallbackTimeoutMs,
 872            },
 873          )
 874        } catch (err) {
 875          // User aborts are not errors — re-throw immediately without logging
 876          if (err instanceof APIUserAbortError) throw err
 877  
 878          // Instrumentation: record when the non-streaming request errors (including
 879          // timeouts). Lets us distinguish "fallback hung past container kill"
 880          // (no event) from "fallback hit the bounded timeout" (this event).
 881          logForDiagnosticsNoPII('error', 'cli_nonstreaming_fallback_error')
 882          logEvent('tengu_nonstreaming_fallback_error', {
 883            model:
 884              clientOptions.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 885            error:
 886              err instanceof Error
 887                ? (err.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
 888                : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
 889            attempt,
 890            timeout_ms: fallbackTimeoutMs,
 891            request_id: (originatingRequestId ??
 892              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
 893          })
 894          throw err
 895        }
 896      },
 897      {
 898        model: retryOptions.model,
 899        fallbackModel: retryOptions.fallbackModel,
 900        thinkingConfig: retryOptions.thinkingConfig,
 901        ...(isFastModeEnabled() && { fastMode: retryOptions.fastMode }),
 902        signal: retryOptions.signal,
 903        initialConsecutive529Errors: retryOptions.initialConsecutive529Errors,
 904        querySource: retryOptions.querySource,
 905      },
 906    )
 907  
 908    let e
 909    do {
 910      e = await generator.next()
 911      if (!e.done && e.value.type === 'system') {
 912        yield e.value
 913      }
 914    } while (!e.done)
 915  
 916    return e.value as BetaMessage
 917  }
 918  
 919  /**
 920   * Extracts the request ID from the most recent assistant message in the
 921   * conversation. Used to link consecutive API requests in analytics so we can
 922   * join them for cache-hit-rate analysis and incremental token tracking.
 923   *
 924   * Deriving this from the message array (rather than global state) ensures each
 925   * query chain (main thread, subagent, teammate) tracks its own request chain
 926   * independently, and rollback/undo naturally updates the value.
 927   */
 928  function getPreviousRequestIdFromMessages(
 929    messages: Message[],
 930  ): string | undefined {
 931    for (let i = messages.length - 1; i >= 0; i--) {
 932      const msg = messages[i]!
 933      if (msg.type === 'assistant' && msg.requestId) {
 934        return msg.requestId
 935      }
 936    }
 937    return undefined
 938  }
 939  
 940  function isMedia(
 941    block: BetaContentBlockParam,
 942  ): block is BetaImageBlockParam | BetaRequestDocumentBlock {
 943    return block.type === 'image' || block.type === 'document'
 944  }
 945  
 946  function isToolResult(
 947    block: BetaContentBlockParam,
 948  ): block is BetaToolResultBlockParam {
 949    return block.type === 'tool_result'
 950  }
 951  
 952  /**
 953   * Ensures messages contain at most `limit` media items (images + documents).
 954   * Strips oldest media first to preserve the most recent.
 955   */
 956  export function stripExcessMediaItems(
 957    messages: (UserMessage | AssistantMessage)[],
 958    limit: number,
 959  ): (UserMessage | AssistantMessage)[] {
 960    let toRemove = 0
 961    for (const msg of messages) {
 962      if (!Array.isArray(msg.message.content)) continue
 963      for (const block of msg.message.content) {
 964        if (isMedia(block)) toRemove++
 965        if (isToolResult(block) && Array.isArray(block.content)) {
 966          for (const nested of block.content) {
 967            if (isMedia(nested)) toRemove++
 968          }
 969        }
 970      }
 971    }
 972    toRemove -= limit
 973    if (toRemove <= 0) return messages
 974  
 975    return messages.map(msg => {
 976      if (toRemove <= 0) return msg
 977      const content = msg.message.content
 978      if (!Array.isArray(content)) return msg
 979  
 980      const before = toRemove
 981      const stripped = content
 982        .map(block => {
 983          if (
 984            toRemove <= 0 ||
 985            !isToolResult(block) ||
 986            !Array.isArray(block.content)
 987          )
 988            return block
 989          const filtered = block.content.filter(n => {
 990            if (toRemove > 0 && isMedia(n)) {
 991              toRemove--
 992              return false
 993            }
 994            return true
 995          })
 996          return filtered.length === block.content.length
 997            ? block
 998            : { ...block, content: filtered }
 999        })
1000        .filter(block => {
1001          if (toRemove > 0 && isMedia(block)) {
1002            toRemove--
1003            return false
1004          }
1005          return true
1006        })
1007  
1008      return before === toRemove
1009        ? msg
1010        : {
1011            ...msg,
1012            message: { ...msg.message, content: stripped },
1013          }
1014    }) as (UserMessage | AssistantMessage)[]
1015  }
1016  
1017  async function* queryModel(
1018    messages: Message[],
1019    systemPrompt: SystemPrompt,
1020    thinkingConfig: ThinkingConfig,
1021    tools: Tools,
1022    signal: AbortSignal,
1023    options: Options,
1024  ): AsyncGenerator<
1025    StreamEvent | AssistantMessage | SystemAPIErrorMessage,
1026    void
1027  > {
1028    // Check cheap conditions first — the off-switch await blocks on GrowthBook
1029    // init (~10ms). For non-Opus models (haiku, sonnet) this skips the await
1030    // entirely. Subscribers don't hit this path at all.
1031    if (
1032      !isClaudeAISubscriber() &&
1033      isNonCustomOpusModel(options.model) &&
1034      (
1035        await getDynamicConfig_BLOCKS_ON_INIT<{ activated: boolean }>(
1036          'tengu-off-switch',
1037          {
1038            activated: false,
1039          },
1040        )
1041      ).activated
1042    ) {
1043      logEvent('tengu_off_switch_query', {})
1044      yield getAssistantMessageFromError(
1045        new Error(CUSTOM_OFF_SWITCH_MESSAGE),
1046        options.model,
1047      )
1048      return
1049    }
1050  
1051    // Derive previous request ID from the last assistant message in this query chain.
1052    // This is scoped per message array (main thread, subagent, teammate each have their own),
1053    // so concurrent agents don't clobber each other's request chain tracking.
1054    // Also naturally handles rollback/undo since removed messages won't be in the array.
1055    const previousRequestId = getPreviousRequestIdFromMessages(messages)
1056  
1057    const resolvedModel =
1058      getAPIProvider() === 'bedrock' &&
1059      options.model.includes('application-inference-profile')
1060        ? ((await getInferenceProfileBackingModel(options.model)) ??
1061          options.model)
1062        : options.model
1063  
1064    queryCheckpoint('query_tool_schema_build_start')
1065    const isAgenticQuery =
1066      options.querySource.startsWith('repl_main_thread') ||
1067      options.querySource.startsWith('agent:') ||
1068      options.querySource === 'sdk' ||
1069      options.querySource === 'hook_agent' ||
1070      options.querySource === 'verification_agent'
1071    const betas = getMergedBetas(options.model, { isAgenticQuery })
1072  
1073    // Always send the advisor beta header when advisor is enabled, so
1074    // non-agentic queries (compact, side_question, extract_memories, etc.)
1075    // can parse advisor server_tool_use blocks already in the conversation history.
1076    if (isAdvisorEnabled()) {
1077      betas.push(ADVISOR_BETA_HEADER)
1078    }
1079  
1080    let advisorModel: string | undefined
1081    if (isAgenticQuery && isAdvisorEnabled()) {
1082      let advisorOption = options.advisorModel
1083  
1084      const advisorExperiment = getExperimentAdvisorModels()
1085      if (advisorExperiment !== undefined) {
1086        if (
1087          normalizeModelStringForAPI(advisorExperiment.baseModel) ===
1088          normalizeModelStringForAPI(options.model)
1089        ) {
1090          // Override the advisor model if the base model matches. We
1091          // should only have experiment models if the user cannot
1092          // configure it themselves.
1093          advisorOption = advisorExperiment.advisorModel
1094        }
1095      }
1096  
1097      if (advisorOption) {
1098        const normalizedAdvisorModel = normalizeModelStringForAPI(
1099          parseUserSpecifiedModel(advisorOption),
1100        )
1101        if (!modelSupportsAdvisor(options.model)) {
1102          logForDebugging(
1103            `[AdvisorTool] Skipping advisor - base model ${options.model} does not support advisor`,
1104          )
1105        } else if (!isValidAdvisorModel(normalizedAdvisorModel)) {
1106          logForDebugging(
1107            `[AdvisorTool] Skipping advisor - ${normalizedAdvisorModel} is not a valid advisor model`,
1108          )
1109        } else {
1110          advisorModel = normalizedAdvisorModel
1111          logForDebugging(
1112            `[AdvisorTool] Server-side tool enabled with ${advisorModel} as the advisor model`,
1113          )
1114        }
1115      }
1116    }
1117  
1118    // Check if tool search is enabled (checks mode, model support, and threshold for auto mode)
1119    // This is async because it may need to calculate MCP tool description sizes for TstAuto mode
1120    let useToolSearch = await isToolSearchEnabled(
1121      options.model,
1122      tools,
1123      options.getToolPermissionContext,
1124      options.agents,
1125      'query',
1126    )
1127  
1128    // Precompute once — isDeferredTool does 2 GrowthBook lookups per call
1129    const deferredToolNames = new Set<string>()
1130    if (useToolSearch) {
1131      for (const t of tools) {
1132        if (isDeferredTool(t)) deferredToolNames.add(t.name)
1133      }
1134    }
1135  
1136    // Even if tool search mode is enabled, skip if there are no deferred tools
1137    // AND no MCP servers are still connecting. When servers are pending, keep
1138    // ToolSearch available so the model can discover tools after they connect.
1139    if (
1140      useToolSearch &&
1141      deferredToolNames.size === 0 &&
1142      !options.hasPendingMcpServers
1143    ) {
1144      logForDebugging(
1145        'Tool search disabled: no deferred tools available to search',
1146      )
1147      useToolSearch = false
1148    }
1149  
1150    // Filter out ToolSearchTool if tool search is not enabled for this model
1151    // ToolSearchTool returns tool_reference blocks which unsupported models can't handle
1152    let filteredTools: Tools
1153  
1154    if (useToolSearch) {
1155      // Dynamic tool loading: Only include deferred tools that have been discovered
1156      // via tool_reference blocks in the message history. This eliminates the need
1157      // to predeclare all deferred tools upfront and removes limits on tool quantity.
1158      const discoveredToolNames = extractDiscoveredToolNames(messages)
1159  
1160      filteredTools = tools.filter(tool => {
1161        // Always include non-deferred tools
1162        if (!deferredToolNames.has(tool.name)) return true
1163        // Always include ToolSearchTool (so it can discover more tools)
1164        if (toolMatchesName(tool, TOOL_SEARCH_TOOL_NAME)) return true
1165        // Only include deferred tools that have been discovered
1166        return discoveredToolNames.has(tool.name)
1167      })
1168    } else {
1169      filteredTools = tools.filter(
1170        t => !toolMatchesName(t, TOOL_SEARCH_TOOL_NAME),
1171      )
1172    }
1173  
1174    // Add tool search beta header if enabled - required for defer_loading to be accepted
1175    // Header differs by provider: 1P/Foundry use advanced-tool-use, Vertex/Bedrock use tool-search-tool
1176    // For Bedrock, this header must go in extraBodyParams, not the betas array
1177    const toolSearchHeader = useToolSearch ? getToolSearchBetaHeader() : null
1178    if (toolSearchHeader && getAPIProvider() !== 'bedrock') {
1179      if (!betas.includes(toolSearchHeader)) {
1180        betas.push(toolSearchHeader)
1181      }
1182    }
1183  
1184    // Determine if cached microcompact is enabled for this model.
1185    // Computed once here (in async context) and captured by paramsFromContext.
1186    // The beta header is also captured here to avoid a top-level import of the
1187    // ant-only CACHE_EDITING_BETA_HEADER constant.
1188    let cachedMCEnabled = false
1189    let cacheEditingBetaHeader = ''
1190    if (feature('CACHED_MICROCOMPACT')) {
1191      const {
1192        isCachedMicrocompactEnabled,
1193        isModelSupportedForCacheEditing,
1194        getCachedMCConfig,
1195      } = await import('../compact/cachedMicrocompact.js')
1196      const betas = await import('src/constants/betas.js')
1197      cacheEditingBetaHeader = betas.CACHE_EDITING_BETA_HEADER
1198      const featureEnabled = isCachedMicrocompactEnabled()
1199      const modelSupported = isModelSupportedForCacheEditing(options.model)
1200      cachedMCEnabled = featureEnabled && modelSupported
1201      const config = getCachedMCConfig()
1202      logForDebugging(
1203        `Cached MC gate: enabled=${featureEnabled} modelSupported=${modelSupported} model=${options.model} supportedModels=${jsonStringify(config.supportedModels)}`,
1204      )
1205    }
1206  
1207    const useGlobalCacheFeature = shouldUseGlobalCacheScope()
1208    const willDefer = (t: Tool) =>
1209      useToolSearch && (deferredToolNames.has(t.name) || shouldDeferLspTool(t))
1210    // MCP tools are per-user → dynamic tool section → can't globally cache.
1211    // Only gate when an MCP tool will actually render (not defer_loading).
1212    const needsToolBasedCacheMarker =
1213      useGlobalCacheFeature &&
1214      filteredTools.some(t => t.isMcp === true && !willDefer(t))
1215  
1216    // Ensure prompt_caching_scope beta header is present when global cache is enabled.
1217    if (
1218      useGlobalCacheFeature &&
1219      !betas.includes(PROMPT_CACHING_SCOPE_BETA_HEADER)
1220    ) {
1221      betas.push(PROMPT_CACHING_SCOPE_BETA_HEADER)
1222    }
1223  
1224    // Determine global cache strategy for logging
1225    const globalCacheStrategy: GlobalCacheStrategy = useGlobalCacheFeature
1226      ? needsToolBasedCacheMarker
1227        ? 'none'
1228        : 'system_prompt'
1229      : 'none'
1230  
1231    // Build tool schemas, adding defer_loading for MCP tools when tool search is enabled
1232    // Note: We pass the full `tools` list (not filteredTools) to toolToAPISchema so that
1233    // ToolSearchTool's prompt can list ALL available MCP tools. The filtering only affects
1234    // which tools are actually sent to the API, not what the model sees in tool descriptions.
1235    const toolSchemas = await Promise.all(
1236      filteredTools.map(tool =>
1237        toolToAPISchema(tool, {
1238          getToolPermissionContext: options.getToolPermissionContext,
1239          tools,
1240          agents: options.agents,
1241          allowedAgentTypes: options.allowedAgentTypes,
1242          model: options.model,
1243          deferLoading: willDefer(tool),
1244        }),
1245      ),
1246    )
1247  
1248    if (useToolSearch) {
1249      const includedDeferredTools = count(filteredTools, t =>
1250        deferredToolNames.has(t.name),
1251      )
1252      logForDebugging(
1253        `Dynamic tool loading: ${includedDeferredTools}/${deferredToolNames.size} deferred tools included`,
1254      )
1255    }
1256  
1257    queryCheckpoint('query_tool_schema_build_end')
1258  
1259    // Normalize messages before building system prompt (needed for fingerprinting)
1260    // Instrumentation: Track message count before normalization
1261    logEvent('tengu_api_before_normalize', {
1262      preNormalizedMessageCount: messages.length,
1263    })
1264  
1265    queryCheckpoint('query_message_normalization_start')
1266    let messagesForAPI = normalizeMessagesForAPI(messages, filteredTools)
1267    queryCheckpoint('query_message_normalization_end')
1268  
1269    // Model-specific post-processing: strip tool-search-specific fields if the
1270    // selected model doesn't support tool search.
1271    //
1272    // Why is this needed in addition to normalizeMessagesForAPI?
1273    // - normalizeMessagesForAPI uses isToolSearchEnabledNoModelCheck() because it's
1274    //   called from ~20 places (analytics, feedback, sharing, etc.), many of which
1275    //   don't have model context. Adding model to its signature would be a large refactor.
1276    // - This post-processing uses the model-aware isToolSearchEnabled() check
1277    // - This handles mid-conversation model switching (e.g., Sonnet → Haiku) where
1278    //   stale tool-search fields from the previous model would cause 400 errors
1279    //
1280    // Note: For assistant messages, normalizeMessagesForAPI already normalized the
1281    // tool inputs, so stripCallerFieldFromAssistantMessage only needs to remove the
1282    // 'caller' field (not re-normalize inputs).
1283    if (!useToolSearch) {
1284      messagesForAPI = messagesForAPI.map(msg => {
1285        switch (msg.type) {
1286          case 'user':
1287            // Strip tool_reference blocks from tool_result content
1288            return stripToolReferenceBlocksFromUserMessage(msg)
1289          case 'assistant':
1290            // Strip 'caller' field from tool_use blocks
1291            return stripCallerFieldFromAssistantMessage(msg)
1292          default:
1293            return msg
1294        }
1295      })
1296    }
1297  
1298    // Repair tool_use/tool_result pairing mismatches that can occur when resuming
1299    // remote/teleport sessions. Inserts synthetic error tool_results for orphaned
1300    // tool_uses and strips orphaned tool_results referencing non-existent tool_uses.
1301    messagesForAPI = ensureToolResultPairing(messagesForAPI)
1302  
1303    // Strip advisor blocks — the API rejects them without the beta header.
1304    if (!betas.includes(ADVISOR_BETA_HEADER)) {
1305      messagesForAPI = stripAdvisorBlocks(messagesForAPI)
1306    }
1307  
1308    // Strip excess media items before making the API call.
1309    // The API rejects requests with >100 media items but returns a confusing error.
1310    // Rather than erroring (which is hard to recover from in Cowork/CCD), we
1311    // silently drop the oldest media items to stay within the limit.
1312    messagesForAPI = stripExcessMediaItems(
1313      messagesForAPI,
1314      API_MAX_MEDIA_PER_REQUEST,
1315    )
1316  
1317    // Instrumentation: Track message count after normalization
1318    logEvent('tengu_api_after_normalize', {
1319      postNormalizedMessageCount: messagesForAPI.length,
1320    })
1321  
1322    // Compute fingerprint from first user message for attribution.
1323    // Must run BEFORE injecting synthetic messages (e.g. deferred tool names)
1324    // so the fingerprint reflects the actual user input.
1325    const fingerprint = computeFingerprintFromMessages(messagesForAPI)
1326  
1327    // When the delta attachment is enabled, deferred tools are announced
1328    // via persisted deferred_tools_delta attachments instead of this
1329    // ephemeral prepend (which busts cache whenever the pool changes).
1330    if (useToolSearch && !isDeferredToolsDeltaEnabled()) {
1331      const deferredToolList = tools
1332        .filter(t => deferredToolNames.has(t.name))
1333        .map(formatDeferredToolLine)
1334        .sort()
1335        .join('\n')
1336      if (deferredToolList) {
1337        messagesForAPI = [
1338          createUserMessage({
1339            content: `<available-deferred-tools>\n${deferredToolList}\n</available-deferred-tools>`,
1340            isMeta: true,
1341          }),
1342          ...messagesForAPI,
1343        ]
1344      }
1345    }
1346  
1347    // Chrome tool-search instructions: when the delta attachment is enabled,
1348    // these are carried as a client-side block in mcp_instructions_delta
1349    // (attachments.ts) instead of here. This per-request sys-prompt append
1350    // busts the prompt cache when chrome connects late.
1351    const hasChromeTools = filteredTools.some(t =>
1352      isToolFromMcpServer(t.name, CLAUDE_IN_CHROME_MCP_SERVER_NAME),
1353    )
1354    const injectChromeHere =
1355      useToolSearch && hasChromeTools && !isMcpInstructionsDeltaEnabled()
1356  
1357    // filter(Boolean) works by converting each element to a boolean - empty strings become false and are filtered out.
1358    systemPrompt = asSystemPrompt(
1359      [
1360        getAttributionHeader(fingerprint),
1361        getCLISyspromptPrefix({
1362          isNonInteractive: options.isNonInteractiveSession,
1363          hasAppendSystemPrompt: options.hasAppendSystemPrompt,
1364        }),
1365        ...systemPrompt,
1366        ...(advisorModel ? [ADVISOR_TOOL_INSTRUCTIONS] : []),
1367        ...(injectChromeHere ? [CHROME_TOOL_SEARCH_INSTRUCTIONS] : []),
1368      ].filter(Boolean),
1369    )
1370  
1371    // Prepend system prompt block for easy API identification
1372    logAPIPrefix(systemPrompt)
1373  
1374    const enablePromptCaching =
1375      options.enablePromptCaching ?? getPromptCachingEnabled(options.model)
1376    const system = buildSystemPromptBlocks(systemPrompt, enablePromptCaching, {
1377      skipGlobalCacheForSystemPrompt: needsToolBasedCacheMarker,
1378      querySource: options.querySource,
1379    })
1380    const useBetas = betas.length > 0
1381  
1382    // Build minimal context for detailed tracing (when beta tracing is enabled)
1383    // Note: The actual new_context message extraction is done in sessionTracing.ts using
1384    // hash-based tracking per querySource (agent) from the messagesForAPI array
1385    const extraToolSchemas = [...(options.extraToolSchemas ?? [])]
1386    if (advisorModel) {
1387      // Server tools must be in the tools array by API contract. Appended after
1388      // toolSchemas (which carries the cache_control marker) so toggling /advisor
1389      // only churns the small suffix, not the cached prefix.
1390      extraToolSchemas.push({
1391        type: 'advisor_20260301',
1392        name: 'advisor',
1393        model: advisorModel,
1394      } as unknown as BetaToolUnion)
1395    }
1396    const allTools = [...toolSchemas, ...extraToolSchemas]
1397  
1398    const isFastMode =
1399      isFastModeEnabled() &&
1400      isFastModeAvailable() &&
1401      !isFastModeCooldown() &&
1402      isFastModeSupportedByModel(options.model) &&
1403      !!options.fastMode
1404  
1405    // Sticky-on latches for dynamic beta headers. Each header, once first
1406    // sent, keeps being sent for the rest of the session so mid-session
1407    // toggles don't change the server-side cache key and bust ~50-70K tokens.
1408    // Latches are cleared on /clear and /compact via clearBetaHeaderLatches().
1409    // Per-call gates (isAgenticQuery, querySource===repl_main_thread) stay
1410    // per-call so non-agentic queries keep their own stable header set.
1411  
1412    let afkHeaderLatched = getAfkModeHeaderLatched() === true
1413    if (feature('TRANSCRIPT_CLASSIFIER')) {
1414      if (
1415        !afkHeaderLatched &&
1416        isAgenticQuery &&
1417        shouldIncludeFirstPartyOnlyBetas() &&
1418        (autoModeStateModule?.isAutoModeActive() ?? false)
1419      ) {
1420        afkHeaderLatched = true
1421        setAfkModeHeaderLatched(true)
1422      }
1423    }
1424  
1425    let fastModeHeaderLatched = getFastModeHeaderLatched() === true
1426    if (!fastModeHeaderLatched && isFastMode) {
1427      fastModeHeaderLatched = true
1428      setFastModeHeaderLatched(true)
1429    }
1430  
1431    let cacheEditingHeaderLatched = getCacheEditingHeaderLatched() === true
1432    if (feature('CACHED_MICROCOMPACT')) {
1433      if (
1434        !cacheEditingHeaderLatched &&
1435        cachedMCEnabled &&
1436        getAPIProvider() === 'firstParty' &&
1437        options.querySource === 'repl_main_thread'
1438      ) {
1439        cacheEditingHeaderLatched = true
1440        setCacheEditingHeaderLatched(true)
1441      }
1442    }
1443  
1444    // Only latch from agentic queries so a classifier call doesn't flip the
1445    // main thread's context_management mid-turn.
1446    let thinkingClearLatched = getThinkingClearLatched() === true
1447    if (!thinkingClearLatched && isAgenticQuery) {
1448      const lastCompletion = getLastApiCompletionTimestamp()
1449      if (
1450        lastCompletion !== null &&
1451        Date.now() - lastCompletion > CACHE_TTL_1HOUR_MS
1452      ) {
1453        thinkingClearLatched = true
1454        setThinkingClearLatched(true)
1455      }
1456    }
1457  
1458    const effort = resolveAppliedEffort(options.model, options.effortValue)
1459  
1460    if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
1461      // Exclude defer_loading tools from the hash -- the API strips them from the
1462      // prompt, so they never affect the actual cache key. Including them creates
1463      // false-positive "tool schemas changed" breaks when tools are discovered or
1464      // MCP servers reconnect.
1465      const toolsForCacheDetection = allTools.filter(
1466        t => !('defer_loading' in t && t.defer_loading),
1467      )
1468      // Capture everything that could affect the server-side cache key.
1469      // Pass latched header values (not live state) so break detection
1470      // reflects what we actually send, not what the user toggled.
1471      recordPromptState({
1472        system,
1473        toolSchemas: toolsForCacheDetection,
1474        querySource: options.querySource,
1475        model: options.model,
1476        agentId: options.agentId,
1477        fastMode: fastModeHeaderLatched,
1478        globalCacheStrategy,
1479        betas,
1480        autoModeActive: afkHeaderLatched,
1481        isUsingOverage: currentLimits.isUsingOverage ?? false,
1482        cachedMCEnabled: cacheEditingHeaderLatched,
1483        effortValue: effort,
1484        extraBodyParams: getExtraBodyParams(),
1485      })
1486    }
1487  
1488    const newContext: LLMRequestNewContext | undefined = isBetaTracingEnabled()
1489      ? {
1490          systemPrompt: systemPrompt.join('\n\n'),
1491          querySource: options.querySource,
1492          tools: jsonStringify(allTools),
1493        }
1494      : undefined
1495  
1496    // Capture the span so we can pass it to endLLMRequestSpan later
1497    // This ensures responses are matched to the correct request when multiple requests run in parallel
1498    const llmSpan = startLLMRequestSpan(
1499      options.model,
1500      newContext,
1501      messagesForAPI,
1502      isFastMode,
1503    )
1504  
1505    const startIncludingRetries = Date.now()
1506    let start = Date.now()
1507    let attemptNumber = 0
1508    const attemptStartTimes: number[] = []
1509    let stream: Stream<BetaRawMessageStreamEvent> | undefined = undefined
1510    let streamRequestId: string | null | undefined = undefined
1511    let clientRequestId: string | undefined = undefined
1512    // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins -- Response is available in Node 18+ and is used by the SDK
1513    let streamResponse: Response | undefined = undefined
1514  
1515    // Release all stream resources to prevent native memory leaks.
1516    // The Response object holds native TLS/socket buffers that live outside the
1517    // V8 heap (observed on the Node.js/npm path; see GH #32920), so we must
1518    // explicitly cancel and release it regardless of how the generator exits.
1519    function releaseStreamResources(): void {
1520      cleanupStream(stream)
1521      stream = undefined
1522      if (streamResponse) {
1523        streamResponse.body?.cancel().catch(() => {})
1524        streamResponse = undefined
1525      }
1526    }
1527  
1528    // Consume pending cache edits ONCE before paramsFromContext is defined.
1529    // paramsFromContext is called multiple times (logging, retries), so consuming
1530    // inside it would cause the first call to steal edits from subsequent calls.
1531    const consumedCacheEdits = cachedMCEnabled ? consumePendingCacheEdits() : null
1532    const consumedPinnedEdits = cachedMCEnabled ? getPinnedCacheEdits() : []
1533  
1534    // Capture the betas sent in the last API request, including the ones that
1535    // were dynamically added, so we can log and send it to telemetry.
1536    let lastRequestBetas: string[] | undefined
1537  
1538    const paramsFromContext = (retryContext: RetryContext) => {
1539      const betasParams = [...betas]
1540  
1541      // Append 1M beta dynamically for the Sonnet 1M experiment.
1542      if (
1543        !betasParams.includes(CONTEXT_1M_BETA_HEADER) &&
1544        getSonnet1mExpTreatmentEnabled(retryContext.model)
1545      ) {
1546        betasParams.push(CONTEXT_1M_BETA_HEADER)
1547      }
1548  
1549      // For Bedrock, include both model-based betas and dynamically-added tool search header
1550      const bedrockBetas =
1551        getAPIProvider() === 'bedrock'
1552          ? [
1553              ...getBedrockExtraBodyParamsBetas(retryContext.model),
1554              ...(toolSearchHeader ? [toolSearchHeader] : []),
1555            ]
1556          : []
1557      const extraBodyParams = getExtraBodyParams(bedrockBetas)
1558  
1559      const outputConfig: BetaOutputConfig = {
1560        ...((extraBodyParams.output_config as BetaOutputConfig) ?? {}),
1561      }
1562  
1563      configureEffortParams(
1564        effort,
1565        outputConfig,
1566        extraBodyParams,
1567        betasParams,
1568        options.model,
1569      )
1570  
1571      configureTaskBudgetParams(
1572        options.taskBudget,
1573        outputConfig as BetaOutputConfig & { task_budget?: TaskBudgetParam },
1574        betasParams,
1575      )
1576  
1577      // Merge outputFormat into extraBodyParams.output_config alongside effort
1578      // Requires structured-outputs beta header per SDK (see parse() in messages.mjs)
1579      if (options.outputFormat && !('format' in outputConfig)) {
1580        outputConfig.format = options.outputFormat as BetaJSONOutputFormat
1581        // Add beta header if not already present and provider supports it
1582        if (
1583          modelSupportsStructuredOutputs(options.model) &&
1584          !betasParams.includes(STRUCTURED_OUTPUTS_BETA_HEADER)
1585        ) {
1586          betasParams.push(STRUCTURED_OUTPUTS_BETA_HEADER)
1587        }
1588      }
1589  
1590      // Retry context gets preference because it tries to course correct if we exceed the context window limit
1591      const maxOutputTokens =
1592        retryContext?.maxTokensOverride ||
1593        options.maxOutputTokensOverride ||
1594        getMaxOutputTokensForModel(options.model)
1595  
1596      const hasThinking =
1597        thinkingConfig.type !== 'disabled' &&
1598        !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_THINKING)
1599      let thinking: BetaMessageStreamParams['thinking'] | undefined = undefined
1600  
1601      // IMPORTANT: Do not change the adaptive-vs-budget thinking selection below
1602      // without notifying the model launch DRI and research. This is a sensitive
1603      // setting that can greatly affect model quality and bashing.
1604      if (hasThinking && modelSupportsThinking(options.model)) {
1605        if (
1606          !isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_ADAPTIVE_THINKING) &&
1607          modelSupportsAdaptiveThinking(options.model)
1608        ) {
1609          // For models that support adaptive thinking, always use adaptive
1610          // thinking without a budget.
1611          thinking = {
1612            type: 'adaptive',
1613          } satisfies BetaMessageStreamParams['thinking']
1614        } else {
1615          // For models that do not support adaptive thinking, use the default
1616          // thinking budget unless explicitly specified.
1617          let thinkingBudget = getMaxThinkingTokensForModel(options.model)
1618          if (
1619            thinkingConfig.type === 'enabled' &&
1620            thinkingConfig.budgetTokens !== undefined
1621          ) {
1622            thinkingBudget = thinkingConfig.budgetTokens
1623          }
1624          thinkingBudget = Math.min(maxOutputTokens - 1, thinkingBudget)
1625          thinking = {
1626            budget_tokens: thinkingBudget,
1627            type: 'enabled',
1628          } satisfies BetaMessageStreamParams['thinking']
1629        }
1630      }
1631  
1632      // Get API context management strategies if enabled
1633      const contextManagement = getAPIContextManagement({
1634        hasThinking,
1635        isRedactThinkingActive: betasParams.includes(REDACT_THINKING_BETA_HEADER),
1636        clearAllThinking: thinkingClearLatched,
1637      })
1638  
1639      const enablePromptCaching =
1640        options.enablePromptCaching ?? getPromptCachingEnabled(retryContext.model)
1641  
1642      // Fast mode: header is latched session-stable (cache-safe), but
1643      // `speed='fast'` stays dynamic so cooldown still suppresses the actual
1644      // fast-mode request without changing the cache key.
1645      let speed: BetaMessageStreamParams['speed']
1646      const isFastModeForRetry =
1647        isFastModeEnabled() &&
1648        isFastModeAvailable() &&
1649        !isFastModeCooldown() &&
1650        isFastModeSupportedByModel(options.model) &&
1651        !!retryContext.fastMode
1652      if (isFastModeForRetry) {
1653        speed = 'fast'
1654      }
1655      if (fastModeHeaderLatched && !betasParams.includes(FAST_MODE_BETA_HEADER)) {
1656        betasParams.push(FAST_MODE_BETA_HEADER)
1657      }
1658  
1659      // AFK mode beta: latched once auto mode is first activated. Still gated
1660      // by isAgenticQuery per-call so classifiers/compaction don't get it.
1661      if (feature('TRANSCRIPT_CLASSIFIER')) {
1662        if (
1663          afkHeaderLatched &&
1664          shouldIncludeFirstPartyOnlyBetas() &&
1665          isAgenticQuery &&
1666          !betasParams.includes(AFK_MODE_BETA_HEADER)
1667        ) {
1668          betasParams.push(AFK_MODE_BETA_HEADER)
1669        }
1670      }
1671  
1672      // Cache editing beta: header is latched session-stable; useCachedMC
1673      // (controls cache_edits body behavior) stays live so edits stop when
1674      // the feature disables but the header doesn't flip.
1675      const useCachedMC =
1676        cachedMCEnabled &&
1677        getAPIProvider() === 'firstParty' &&
1678        options.querySource === 'repl_main_thread'
1679      if (
1680        cacheEditingHeaderLatched &&
1681        getAPIProvider() === 'firstParty' &&
1682        options.querySource === 'repl_main_thread' &&
1683        !betasParams.includes(cacheEditingBetaHeader)
1684      ) {
1685        betasParams.push(cacheEditingBetaHeader)
1686        logForDebugging(
1687          'Cache editing beta header enabled for cached microcompact',
1688        )
1689      }
1690  
1691      // Only send temperature when thinking is disabled — the API requires
1692      // temperature: 1 when thinking is enabled, which is already the default.
1693      const temperature = !hasThinking
1694        ? (options.temperatureOverride ?? 1)
1695        : undefined
1696  
1697      lastRequestBetas = betasParams
1698  
1699      return {
1700        model: normalizeModelStringForAPI(options.model),
1701        messages: addCacheBreakpoints(
1702          messagesForAPI,
1703          enablePromptCaching,
1704          options.querySource,
1705          useCachedMC,
1706          consumedCacheEdits,
1707          consumedPinnedEdits,
1708          options.skipCacheWrite,
1709        ),
1710        system,
1711        tools: allTools,
1712        tool_choice: options.toolChoice,
1713        ...(useBetas && { betas: betasParams }),
1714        metadata: getAPIMetadata(),
1715        max_tokens: maxOutputTokens,
1716        thinking,
1717        ...(temperature !== undefined && { temperature }),
1718        ...(contextManagement &&
1719          useBetas &&
1720          betasParams.includes(CONTEXT_MANAGEMENT_BETA_HEADER) && {
1721            context_management: contextManagement,
1722          }),
1723        ...extraBodyParams,
1724        ...(Object.keys(outputConfig).length > 0 && {
1725          output_config: outputConfig,
1726        }),
1727        ...(speed !== undefined && { speed }),
1728      }
1729    }
1730  
1731    // Compute log scalars synchronously so the fire-and-forget .then() closure
1732    // captures only primitives instead of paramsFromContext's full closure scope
1733    // (messagesForAPI, system, allTools, betas — the entire request-building
1734    // context), which would otherwise be pinned until the promise resolves.
1735    {
1736      const queryParams = paramsFromContext({
1737        model: options.model,
1738        thinkingConfig,
1739      })
1740      const logMessagesLength = queryParams.messages.length
1741      const logBetas = useBetas ? (queryParams.betas ?? []) : []
1742      const logThinkingType = queryParams.thinking?.type ?? 'disabled'
1743      const logEffortValue = queryParams.output_config?.effort
1744      void options.getToolPermissionContext().then(permissionContext => {
1745        logAPIQuery({
1746          model: options.model,
1747          messagesLength: logMessagesLength,
1748          temperature: options.temperatureOverride ?? 1,
1749          betas: logBetas,
1750          permissionMode: permissionContext.mode,
1751          querySource: options.querySource,
1752          queryTracking: options.queryTracking,
1753          thinkingType: logThinkingType,
1754          effortValue: logEffortValue,
1755          fastMode: isFastMode,
1756          previousRequestId,
1757        })
1758      })
1759    }
1760  
1761    const newMessages: AssistantMessage[] = []
1762    let ttftMs = 0
1763    let partialMessage: BetaMessage | undefined = undefined
1764    const contentBlocks: (BetaContentBlock | ConnectorTextBlock)[] = []
1765    let usage: NonNullableUsage = EMPTY_USAGE
1766    let costUSD = 0
1767    let stopReason: BetaStopReason | null = null
1768    let didFallBackToNonStreaming = false
1769    let fallbackMessage: AssistantMessage | undefined
1770    let maxOutputTokens = 0
1771    let responseHeaders: globalThis.Headers | undefined = undefined
1772    let research: unknown = undefined
1773    let isFastModeRequest = isFastMode // Keep separate state as it may change if falling back
1774    let isAdvisorInProgress = false
1775  
1776    try {
1777      queryCheckpoint('query_client_creation_start')
1778      const generator = withRetry(
1779        () =>
1780          getAnthropicClient({
1781            maxRetries: 0, // Disabled auto-retry in favor of manual implementation
1782            model: options.model,
1783            fetchOverride: options.fetchOverride,
1784            source: options.querySource,
1785          }),
1786        async (anthropic, attempt, context) => {
1787          attemptNumber = attempt
1788          isFastModeRequest = context.fastMode ?? false
1789          start = Date.now()
1790          attemptStartTimes.push(start)
1791          // Client has been created by withRetry's getClient() call. This fires
1792          // once per attempt; on retries the client is usually cached (withRetry
1793          // only calls getClient() again after auth errors), so the delta from
1794          // client_creation_start is meaningful on attempt 1.
1795          queryCheckpoint('query_client_creation_end')
1796  
1797          const params = paramsFromContext(context)
1798          captureAPIRequest(params, options.querySource) // Capture for bug reports
1799  
1800          maxOutputTokens = params.max_tokens
1801  
1802          // Fire immediately before the fetch is dispatched. .withResponse() below
1803          // awaits until response headers arrive, so this MUST be before the await
1804          // or the "Network TTFB" phase measurement is wrong.
1805          queryCheckpoint('query_api_request_sent')
1806          if (!options.agentId) {
1807            headlessProfilerCheckpoint('api_request_sent')
1808          }
1809  
1810          // Generate and track client request ID so timeouts (which return no
1811          // server request ID) can still be correlated with server logs.
1812          // First-party only — 3P providers don't log it (inc-4029 class).
1813          clientRequestId =
1814            getAPIProvider() === 'firstParty' && isFirstPartyAnthropicBaseUrl()
1815              ? randomUUID()
1816              : undefined
1817  
1818          // Use raw stream instead of BetaMessageStream to avoid O(n²) partial JSON parsing
1819          // BetaMessageStream calls partialParse() on every input_json_delta, which we don't need
1820          // since we handle tool input accumulation ourselves
1821          // biome-ignore lint/plugin: main conversation loop handles attribution separately
1822          const result = await anthropic.beta.messages
1823            .create(
1824              { ...params, stream: true },
1825              {
1826                signal,
1827                ...(clientRequestId && {
1828                  headers: { [CLIENT_REQUEST_ID_HEADER]: clientRequestId },
1829                }),
1830              },
1831            )
1832            .withResponse()
1833          queryCheckpoint('query_response_headers_received')
1834          streamRequestId = result.request_id
1835          streamResponse = result.response
1836          return result.data
1837        },
1838        {
1839          model: options.model,
1840          fallbackModel: options.fallbackModel,
1841          thinkingConfig,
1842          ...(isFastModeEnabled() ? { fastMode: isFastMode } : false),
1843          signal,
1844          querySource: options.querySource,
1845        },
1846      )
1847  
1848      let e
1849      do {
1850        e = await generator.next()
1851  
1852        // yield API error messages (the stream has a 'controller' property, error messages don't)
1853        if (!('controller' in e.value)) {
1854          yield e.value
1855        }
1856      } while (!e.done)
1857      stream = e.value as Stream<BetaRawMessageStreamEvent>
1858  
1859      // reset state
1860      newMessages.length = 0
1861      ttftMs = 0
1862      partialMessage = undefined
1863      contentBlocks.length = 0
1864      usage = EMPTY_USAGE
1865      stopReason = null
1866      isAdvisorInProgress = false
1867  
1868      // Streaming idle timeout watchdog: abort the stream if no chunks arrive
1869      // for STREAM_IDLE_TIMEOUT_MS. Unlike the stall detection below (which only
1870      // fires when the *next* chunk arrives), this uses setTimeout to actively
1871      // kill hung streams. Without this, a silently dropped connection can hang
1872      // the session indefinitely since the SDK's request timeout only covers the
1873      // initial fetch(), not the streaming body.
1874      const streamWatchdogEnabled = isEnvTruthy(
1875        process.env.CLAUDE_ENABLE_STREAM_WATCHDOG,
1876      )
1877      const STREAM_IDLE_TIMEOUT_MS =
1878        parseInt(process.env.CLAUDE_STREAM_IDLE_TIMEOUT_MS || '', 10) || 90_000
1879      const STREAM_IDLE_WARNING_MS = STREAM_IDLE_TIMEOUT_MS / 2
1880      let streamIdleAborted = false
1881      // performance.now() snapshot when watchdog fires, for measuring abort propagation delay
1882      let streamWatchdogFiredAt: number | null = null
1883      let streamIdleWarningTimer: ReturnType<typeof setTimeout> | null = null
1884      let streamIdleTimer: ReturnType<typeof setTimeout> | null = null
1885      function clearStreamIdleTimers(): void {
1886        if (streamIdleWarningTimer !== null) {
1887          clearTimeout(streamIdleWarningTimer)
1888          streamIdleWarningTimer = null
1889        }
1890        if (streamIdleTimer !== null) {
1891          clearTimeout(streamIdleTimer)
1892          streamIdleTimer = null
1893        }
1894      }
1895      function resetStreamIdleTimer(): void {
1896        clearStreamIdleTimers()
1897        if (!streamWatchdogEnabled) {
1898          return
1899        }
1900        streamIdleWarningTimer = setTimeout(
1901          warnMs => {
1902            logForDebugging(
1903              `Streaming idle warning: no chunks received for ${warnMs / 1000}s`,
1904              { level: 'warn' },
1905            )
1906            logForDiagnosticsNoPII('warn', 'cli_streaming_idle_warning')
1907          },
1908          STREAM_IDLE_WARNING_MS,
1909          STREAM_IDLE_WARNING_MS,
1910        )
1911        streamIdleTimer = setTimeout(() => {
1912          streamIdleAborted = true
1913          streamWatchdogFiredAt = performance.now()
1914          logForDebugging(
1915            `Streaming idle timeout: no chunks received for ${STREAM_IDLE_TIMEOUT_MS / 1000}s, aborting stream`,
1916            { level: 'error' },
1917          )
1918          logForDiagnosticsNoPII('error', 'cli_streaming_idle_timeout')
1919          logEvent('tengu_streaming_idle_timeout', {
1920            model:
1921              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1922            request_id: (streamRequestId ??
1923              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1924            timeout_ms: STREAM_IDLE_TIMEOUT_MS,
1925          })
1926          releaseStreamResources()
1927        }, STREAM_IDLE_TIMEOUT_MS)
1928      }
1929      resetStreamIdleTimer()
1930  
1931      startSessionActivity('api_call')
1932      try {
1933        // stream in and accumulate state
1934        let isFirstChunk = true
1935        let lastEventTime: number | null = null // Set after first chunk to avoid measuring TTFB as a stall
1936        const STALL_THRESHOLD_MS = 30_000 // 30 seconds
1937        let totalStallTime = 0
1938        let stallCount = 0
1939  
1940        for await (const part of stream) {
1941          resetStreamIdleTimer()
1942          const now = Date.now()
1943  
1944          // Detect and log streaming stalls (only after first event to avoid counting TTFB)
1945          if (lastEventTime !== null) {
1946            const timeSinceLastEvent = now - lastEventTime
1947            if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
1948              stallCount++
1949              totalStallTime += timeSinceLastEvent
1950              logForDebugging(
1951                `Streaming stall detected: ${(timeSinceLastEvent / 1000).toFixed(1)}s gap between events (stall #${stallCount})`,
1952                { level: 'warn' },
1953              )
1954              logEvent('tengu_streaming_stall', {
1955                stall_duration_ms: timeSinceLastEvent,
1956                stall_count: stallCount,
1957                total_stall_time_ms: totalStallTime,
1958                event_type:
1959                  part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1960                model:
1961                  options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1962                request_id: (streamRequestId ??
1963                  'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
1964              })
1965            }
1966          }
1967          lastEventTime = now
1968  
1969          if (isFirstChunk) {
1970            logForDebugging('Stream started - received first chunk')
1971            queryCheckpoint('query_first_chunk_received')
1972            if (!options.agentId) {
1973              headlessProfilerCheckpoint('first_chunk')
1974            }
1975            endQueryProfile()
1976            isFirstChunk = false
1977          }
1978  
1979          switch (part.type) {
1980            case 'message_start': {
1981              partialMessage = part.message
1982              ttftMs = Date.now() - start
1983              usage = updateUsage(usage, part.message?.usage)
1984              // Capture research from message_start if available (internal only).
1985              // Always overwrite with the latest value.
1986              if (
1987                process.env.USER_TYPE === 'ant' &&
1988                'research' in (part.message as unknown as Record<string, unknown>)
1989              ) {
1990                research = (part.message as unknown as Record<string, unknown>)
1991                  .research
1992              }
1993              break
1994            }
1995            case 'content_block_start':
1996              switch (part.content_block.type) {
1997                case 'tool_use':
1998                  contentBlocks[part.index] = {
1999                    ...part.content_block,
2000                    input: '',
2001                  }
2002                  break
2003                case 'server_tool_use':
2004                  contentBlocks[part.index] = {
2005                    ...part.content_block,
2006                    input: '' as unknown as { [key: string]: unknown },
2007                  }
2008                  if ((part.content_block.name as string) === 'advisor') {
2009                    isAdvisorInProgress = true
2010                    logForDebugging(`[AdvisorTool] Advisor tool called`)
2011                    logEvent('tengu_advisor_tool_call', {
2012                      model:
2013                        options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2014                      advisor_model: (advisorModel ??
2015                        'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2016                    })
2017                  }
2018                  break
2019                case 'text':
2020                  contentBlocks[part.index] = {
2021                    ...part.content_block,
2022                    // awkwardly, the sdk sometimes returns text as part of a
2023                    // content_block_start message, then returns the same text
2024                    // again in a content_block_delta message. we ignore it here
2025                    // since there doesn't seem to be a way to detect when a
2026                    // content_block_delta message duplicates the text.
2027                    text: '',
2028                  }
2029                  break
2030                case 'thinking':
2031                  contentBlocks[part.index] = {
2032                    ...part.content_block,
2033                    // also awkward
2034                    thinking: '',
2035                    // initialize signature to ensure field exists even if signature_delta never arrives
2036                    signature: '',
2037                  }
2038                  break
2039                default:
2040                  // even more awkwardly, the sdk mutates the contents of text blocks
2041                  // as it works. we want the blocks to be immutable, so that we can
2042                  // accumulate state ourselves.
2043                  contentBlocks[part.index] = { ...part.content_block }
2044                  if (
2045                    (part.content_block.type as string) === 'advisor_tool_result'
2046                  ) {
2047                    isAdvisorInProgress = false
2048                    logForDebugging(`[AdvisorTool] Advisor tool result received`)
2049                  }
2050                  break
2051              }
2052              break
2053            case 'content_block_delta': {
2054              const contentBlock = contentBlocks[part.index]
2055              const delta = part.delta as typeof part.delta | ConnectorTextDelta
2056              if (!contentBlock) {
2057                logEvent('tengu_streaming_error', {
2058                  error_type:
2059                    'content_block_not_found_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2060                  part_type:
2061                    part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2062                  part_index: part.index,
2063                })
2064                throw new RangeError('Content block not found')
2065              }
2066              if (
2067                feature('CONNECTOR_TEXT') &&
2068                delta.type === 'connector_text_delta'
2069              ) {
2070                if (contentBlock.type !== 'connector_text') {
2071                  logEvent('tengu_streaming_error', {
2072                    error_type:
2073                      'content_block_type_mismatch_connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2074                    expected_type:
2075                      'connector_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2076                    actual_type:
2077                      contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2078                  })
2079                  throw new Error('Content block is not a connector_text block')
2080                }
2081                contentBlock.connector_text += delta.connector_text
2082              } else {
2083                switch (delta.type) {
2084                  case 'citations_delta':
2085                    // TODO: handle citations
2086                    break
2087                  case 'input_json_delta':
2088                    if (
2089                      contentBlock.type !== 'tool_use' &&
2090                      contentBlock.type !== 'server_tool_use'
2091                    ) {
2092                      logEvent('tengu_streaming_error', {
2093                        error_type:
2094                          'content_block_type_mismatch_input_json' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2095                        expected_type:
2096                          'tool_use' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2097                        actual_type:
2098                          contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2099                      })
2100                      throw new Error('Content block is not a input_json block')
2101                    }
2102                    if (typeof contentBlock.input !== 'string') {
2103                      logEvent('tengu_streaming_error', {
2104                        error_type:
2105                          'content_block_input_not_string' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2106                        input_type:
2107                          typeof contentBlock.input as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2108                      })
2109                      throw new Error('Content block input is not a string')
2110                    }
2111                    contentBlock.input += delta.partial_json
2112                    break
2113                  case 'text_delta':
2114                    if (contentBlock.type !== 'text') {
2115                      logEvent('tengu_streaming_error', {
2116                        error_type:
2117                          'content_block_type_mismatch_text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2118                        expected_type:
2119                          'text' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2120                        actual_type:
2121                          contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2122                      })
2123                      throw new Error('Content block is not a text block')
2124                    }
2125                    contentBlock.text += delta.text
2126                    break
2127                  case 'signature_delta':
2128                    if (
2129                      feature('CONNECTOR_TEXT') &&
2130                      contentBlock.type === 'connector_text'
2131                    ) {
2132                      contentBlock.signature = delta.signature
2133                      break
2134                    }
2135                    if (contentBlock.type !== 'thinking') {
2136                      logEvent('tengu_streaming_error', {
2137                        error_type:
2138                          'content_block_type_mismatch_thinking_signature' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2139                        expected_type:
2140                          'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2141                        actual_type:
2142                          contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2143                      })
2144                      throw new Error('Content block is not a thinking block')
2145                    }
2146                    contentBlock.signature = delta.signature
2147                    break
2148                  case 'thinking_delta':
2149                    if (contentBlock.type !== 'thinking') {
2150                      logEvent('tengu_streaming_error', {
2151                        error_type:
2152                          'content_block_type_mismatch_thinking_delta' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2153                        expected_type:
2154                          'thinking' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2155                        actual_type:
2156                          contentBlock.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2157                      })
2158                      throw new Error('Content block is not a thinking block')
2159                    }
2160                    contentBlock.thinking += delta.thinking
2161                    break
2162                }
2163              }
2164              // Capture research from content_block_delta if available (internal only).
2165              // Always overwrite with the latest value.
2166              if (process.env.USER_TYPE === 'ant' && 'research' in part) {
2167                research = (part as { research: unknown }).research
2168              }
2169              break
2170            }
2171            case 'content_block_stop': {
2172              const contentBlock = contentBlocks[part.index]
2173              if (!contentBlock) {
2174                logEvent('tengu_streaming_error', {
2175                  error_type:
2176                    'content_block_not_found_stop' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2177                  part_type:
2178                    part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2179                  part_index: part.index,
2180                })
2181                throw new RangeError('Content block not found')
2182              }
2183              if (!partialMessage) {
2184                logEvent('tengu_streaming_error', {
2185                  error_type:
2186                    'partial_message_not_found' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2187                  part_type:
2188                    part.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2189                })
2190                throw new Error('Message not found')
2191              }
2192              const m: AssistantMessage = {
2193                message: {
2194                  ...partialMessage,
2195                  content: normalizeContentFromAPI(
2196                    [contentBlock] as BetaContentBlock[],
2197                    tools,
2198                    options.agentId,
2199                  ),
2200                },
2201                requestId: streamRequestId ?? undefined,
2202                type: 'assistant',
2203                uuid: randomUUID(),
2204                timestamp: new Date().toISOString(),
2205                ...(process.env.USER_TYPE === 'ant' &&
2206                  research !== undefined && { research }),
2207                ...(advisorModel && { advisorModel }),
2208              }
2209              newMessages.push(m)
2210              yield m
2211              break
2212            }
2213            case 'message_delta': {
2214              usage = updateUsage(usage, part.usage)
2215              // Capture research from message_delta if available (internal only).
2216              // Always overwrite with the latest value. Also write back to
2217              // already-yielded messages since message_delta arrives after
2218              // content_block_stop.
2219              if (
2220                process.env.USER_TYPE === 'ant' &&
2221                'research' in (part as unknown as Record<string, unknown>)
2222              ) {
2223                research = (part as unknown as Record<string, unknown>).research
2224                for (const msg of newMessages) {
2225                  msg.research = research
2226                }
2227              }
2228  
2229              // Write final usage and stop_reason back to the last yielded
2230              // message. Messages are created at content_block_stop from
2231              // partialMessage, which was set at message_start before any tokens
2232              // were generated (output_tokens: 0, stop_reason: null).
2233              // message_delta arrives after content_block_stop with the real
2234              // values.
2235              //
2236              // IMPORTANT: Use direct property mutation, not object replacement.
2237              // The transcript write queue holds a reference to message.message
2238              // and serializes it lazily (100ms flush interval). Object
2239              // replacement ({ ...lastMsg.message, usage }) would disconnect
2240              // the queued reference; direct mutation ensures the transcript
2241              // captures the final values.
2242              stopReason = part.delta.stop_reason
2243  
2244              const lastMsg = newMessages.at(-1)
2245              if (lastMsg) {
2246                lastMsg.message.usage = usage
2247                lastMsg.message.stop_reason = stopReason
2248              }
2249  
2250              // Update cost
2251              const costUSDForPart = calculateUSDCost(resolvedModel, usage)
2252              costUSD += addToTotalSessionCost(
2253                costUSDForPart,
2254                usage,
2255                options.model,
2256              )
2257  
2258              const refusalMessage = getErrorMessageIfRefusal(
2259                part.delta.stop_reason,
2260                options.model,
2261              )
2262              if (refusalMessage) {
2263                yield refusalMessage
2264              }
2265  
2266              if (stopReason === 'max_tokens') {
2267                logEvent('tengu_max_tokens_reached', {
2268                  max_tokens: maxOutputTokens,
2269                })
2270                yield createAssistantAPIErrorMessage({
2271                  content: `${API_ERROR_MESSAGE_PREFIX}: Claude's response exceeded the ${
2272                    maxOutputTokens
2273                  } output token maximum. To configure this behavior, set the CLAUDE_CODE_MAX_OUTPUT_TOKENS environment variable.`,
2274                  apiError: 'max_output_tokens',
2275                  error: 'max_output_tokens',
2276                })
2277              }
2278  
2279              if (stopReason === 'model_context_window_exceeded') {
2280                logEvent('tengu_context_window_exceeded', {
2281                  max_tokens: maxOutputTokens,
2282                  output_tokens: usage.output_tokens,
2283                })
2284                // Reuse the max_output_tokens recovery path — from the model's
2285                // perspective, both mean "response was cut off, continue from
2286                // where you left off."
2287                yield createAssistantAPIErrorMessage({
2288                  content: `${API_ERROR_MESSAGE_PREFIX}: The model has reached its context window limit.`,
2289                  apiError: 'max_output_tokens',
2290                  error: 'max_output_tokens',
2291                })
2292              }
2293              break
2294            }
2295            case 'message_stop':
2296              break
2297          }
2298  
2299          yield {
2300            type: 'stream_event',
2301            event: part,
2302            ...(part.type === 'message_start' ? { ttftMs } : undefined),
2303          }
2304        }
2305        // Clear the idle timeout watchdog now that the stream loop has exited
2306        clearStreamIdleTimers()
2307  
2308        // If the stream was aborted by our idle timeout watchdog, fall back to
2309        // non-streaming retry rather than treating it as a completed stream.
2310        if (streamIdleAborted) {
2311          // Instrumentation: proves the for-await exited after the watchdog fired
2312          // (vs. hung forever). exit_delay_ms measures abort propagation latency:
2313          // 0-10ms = abort worked; >>1000ms = something else woke the loop.
2314          const exitDelayMs =
2315            streamWatchdogFiredAt !== null
2316              ? Math.round(performance.now() - streamWatchdogFiredAt)
2317              : -1
2318          logForDiagnosticsNoPII(
2319            'info',
2320            'cli_stream_loop_exited_after_watchdog_clean',
2321          )
2322          logEvent('tengu_stream_loop_exited_after_watchdog', {
2323            request_id: (streamRequestId ??
2324              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2325            exit_delay_ms: exitDelayMs,
2326            exit_path:
2327              'clean' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2328            model:
2329              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2330          })
2331          // Prevent double-emit: this throw lands in the catch block below,
2332          // whose exit_path='error' probe guards on streamWatchdogFiredAt.
2333          streamWatchdogFiredAt = null
2334          throw new Error('Stream idle timeout - no chunks received')
2335        }
2336  
2337        // Detect when the stream completed without producing any assistant messages.
2338        // This covers two proxy failure modes:
2339        // 1. No events at all (!partialMessage): proxy returned 200 with non-SSE body
2340        // 2. Partial events (partialMessage set but no content blocks completed AND
2341        //    no stop_reason received): proxy returned message_start but stream ended
2342        //    before content_block_stop and before message_delta with stop_reason
2343        // BetaMessageStream had the first check in _endRequest() but the raw Stream
2344        // does not - without it the generator silently returns no assistant messages,
2345        // causing "Execution error" in -p mode.
2346        // Note: We must check stopReason to avoid false positives. For example, with
2347        // structured output (--json-schema), the model calls a StructuredOutput tool
2348        // on turn 1, then on turn 2 responds with end_turn and no content blocks.
2349        // That's a legitimate empty response, not an incomplete stream.
2350        if (!partialMessage || (newMessages.length === 0 && !stopReason)) {
2351          logForDebugging(
2352            !partialMessage
2353              ? 'Stream completed without receiving message_start event - triggering non-streaming fallback'
2354              : 'Stream completed with message_start but no content blocks completed - triggering non-streaming fallback',
2355            { level: 'error' },
2356          )
2357          logEvent('tengu_stream_no_events', {
2358            model:
2359              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2360            request_id: (streamRequestId ??
2361              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2362          })
2363          throw new Error('Stream ended without receiving any events')
2364        }
2365  
2366        // Log summary if any stalls occurred during streaming
2367        if (stallCount > 0) {
2368          logForDebugging(
2369            `Streaming completed with ${stallCount} stall(s), total stall time: ${(totalStallTime / 1000).toFixed(1)}s`,
2370            { level: 'warn' },
2371          )
2372          logEvent('tengu_streaming_stall_summary', {
2373            stall_count: stallCount,
2374            total_stall_time_ms: totalStallTime,
2375            model:
2376              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2377            request_id: (streamRequestId ??
2378              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2379          })
2380        }
2381  
2382        // Check if the cache actually broke based on response tokens
2383        if (feature('PROMPT_CACHE_BREAK_DETECTION')) {
2384          void checkResponseForCacheBreak(
2385            options.querySource,
2386            usage.cache_read_input_tokens,
2387            usage.cache_creation_input_tokens,
2388            messages,
2389            options.agentId,
2390            streamRequestId,
2391          )
2392        }
2393  
2394        // Process fallback percentage header and quota status if available
2395        // streamResponse is set when the stream is created in the withRetry callback above
2396        // TypeScript's control flow analysis can't track that streamResponse is set in the callback
2397        // eslint-disable-next-line eslint-plugin-n/no-unsupported-features/node-builtins
2398        const resp = streamResponse as unknown as Response | undefined
2399        if (resp) {
2400          extractQuotaStatusFromHeaders(resp.headers)
2401          // Store headers for gateway detection
2402          responseHeaders = resp.headers
2403        }
2404      } catch (streamingError) {
2405        // Clear the idle timeout watchdog on error path too
2406        clearStreamIdleTimers()
2407  
2408        // Instrumentation: if the watchdog had already fired and the for-await
2409        // threw (rather than exiting cleanly), record that the loop DID exit and
2410        // how long after the watchdog. Distinguishes true hangs from error exits.
2411        if (streamIdleAborted && streamWatchdogFiredAt !== null) {
2412          const exitDelayMs = Math.round(
2413            performance.now() - streamWatchdogFiredAt,
2414          )
2415          logForDiagnosticsNoPII(
2416            'info',
2417            'cli_stream_loop_exited_after_watchdog_error',
2418          )
2419          logEvent('tengu_stream_loop_exited_after_watchdog', {
2420            request_id: (streamRequestId ??
2421              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2422            exit_delay_ms: exitDelayMs,
2423            exit_path:
2424              'error' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2425            error_name:
2426              streamingError instanceof Error
2427                ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2428                : ('unknown' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
2429            model:
2430              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2431          })
2432        }
2433  
2434        if (streamingError instanceof APIUserAbortError) {
2435          // Check if the abort signal was triggered by the user (ESC key)
2436          // If the signal is aborted, it's a user-initiated abort
2437          // If not, it's likely a timeout from the SDK
2438          if (signal.aborted) {
2439            // This is a real user abort (ESC key was pressed)
2440            logForDebugging(
2441              `Streaming aborted by user: ${errorMessage(streamingError)}`,
2442            )
2443            if (isAdvisorInProgress) {
2444              logEvent('tengu_advisor_tool_interrupted', {
2445                model:
2446                  options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2447                advisor_model: (advisorModel ??
2448                  'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2449              })
2450            }
2451            throw streamingError
2452          } else {
2453            // The SDK threw APIUserAbortError but our signal wasn't aborted
2454            // This means it's a timeout from the SDK's internal timeout
2455            logForDebugging(
2456              `Streaming timeout (SDK abort): ${streamingError.message}`,
2457              { level: 'error' },
2458            )
2459            // Throw a more specific error for timeout
2460            throw new APIConnectionTimeoutError({ message: 'Request timed out' })
2461          }
2462        }
2463  
2464        // When the flag is enabled, skip the non-streaming fallback and let the
2465        // error propagate to withRetry. The mid-stream fallback causes double tool
2466        // execution when streaming tool execution is active: the partial stream
2467        // starts a tool, then the non-streaming retry produces the same tool_use
2468        // and runs it again. See inc-4258.
2469        const disableFallback =
2470          isEnvTruthy(process.env.CLAUDE_CODE_DISABLE_NONSTREAMING_FALLBACK) ||
2471          getFeatureValue_CACHED_MAY_BE_STALE(
2472            'tengu_disable_streaming_to_non_streaming_fallback',
2473            false,
2474          )
2475  
2476        if (disableFallback) {
2477          logForDebugging(
2478            `Error streaming (non-streaming fallback disabled): ${errorMessage(streamingError)}`,
2479            { level: 'error' },
2480          )
2481          logEvent('tengu_streaming_fallback_to_non_streaming', {
2482            model:
2483              options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2484            error:
2485              streamingError instanceof Error
2486                ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2487                : (String(
2488                    streamingError,
2489                  ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
2490            attemptNumber,
2491            maxOutputTokens,
2492            thinkingType:
2493              thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2494            fallback_disabled: true,
2495            request_id: (streamRequestId ??
2496              'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2497            fallback_cause: (streamIdleAborted
2498              ? 'watchdog'
2499              : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2500          })
2501          throw streamingError
2502        }
2503  
2504        logForDebugging(
2505          `Error streaming, falling back to non-streaming mode: ${errorMessage(streamingError)}`,
2506          { level: 'error' },
2507        )
2508        didFallBackToNonStreaming = true
2509        if (options.onStreamingFallback) {
2510          options.onStreamingFallback()
2511        }
2512  
2513        logEvent('tengu_streaming_fallback_to_non_streaming', {
2514          model:
2515            options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2516          error:
2517            streamingError instanceof Error
2518              ? (streamingError.name as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS)
2519              : (String(
2520                  streamingError,
2521                ) as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS),
2522          attemptNumber,
2523          maxOutputTokens,
2524          thinkingType:
2525            thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2526          fallback_disabled: false,
2527          request_id: (streamRequestId ??
2528            'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2529          fallback_cause: (streamIdleAborted
2530            ? 'watchdog'
2531            : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2532        })
2533  
2534        // Fall back to non-streaming mode with retries.
2535        // If the streaming failure was itself a 529, count it toward the
2536        // consecutive-529 budget so total 529s-before-model-fallback is the
2537        // same whether the overload was hit in streaming or non-streaming mode.
2538        // This is a speculative fix for https://github.com/anthropics/claude-code/issues/1513
2539        // Instrumentation: proves executeNonStreamingRequest was entered (vs. the
2540        // fallback event firing but the call itself hanging at dispatch).
2541        logForDiagnosticsNoPII('info', 'cli_nonstreaming_fallback_started')
2542        logEvent('tengu_nonstreaming_fallback_started', {
2543          request_id: (streamRequestId ??
2544            'unknown') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2545          model:
2546            options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2547          fallback_cause: (streamIdleAborted
2548            ? 'watchdog'
2549            : 'other') as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2550        })
2551        const result = yield* executeNonStreamingRequest(
2552          { model: options.model, source: options.querySource },
2553          {
2554            model: options.model,
2555            fallbackModel: options.fallbackModel,
2556            thinkingConfig,
2557            ...(isFastModeEnabled() && { fastMode: isFastMode }),
2558            signal,
2559            initialConsecutive529Errors: is529Error(streamingError) ? 1 : 0,
2560            querySource: options.querySource,
2561          },
2562          paramsFromContext,
2563          (attempt, _startTime, tokens) => {
2564            attemptNumber = attempt
2565            maxOutputTokens = tokens
2566          },
2567          params => captureAPIRequest(params, options.querySource),
2568          streamRequestId,
2569        )
2570  
2571        const m: AssistantMessage = {
2572          message: {
2573            ...result,
2574            content: normalizeContentFromAPI(
2575              result.content,
2576              tools,
2577              options.agentId,
2578            ),
2579          },
2580          requestId: streamRequestId ?? undefined,
2581          type: 'assistant',
2582          uuid: randomUUID(),
2583          timestamp: new Date().toISOString(),
2584          ...(process.env.USER_TYPE === 'ant' &&
2585            research !== undefined && {
2586              research,
2587            }),
2588          ...(advisorModel && {
2589            advisorModel,
2590          }),
2591        }
2592        newMessages.push(m)
2593        fallbackMessage = m
2594        yield m
2595      } finally {
2596        clearStreamIdleTimers()
2597      }
2598    } catch (errorFromRetry) {
2599      // FallbackTriggeredError must propagate to query.ts, which performs the
2600      // actual model switch. Swallowing it here would turn the fallback into a
2601      // no-op — the user would just see "Model fallback triggered: X -> Y" as
2602      // an error message with no actual retry on the fallback model.
2603      if (errorFromRetry instanceof FallbackTriggeredError) {
2604        throw errorFromRetry
2605      }
2606  
2607      // Check if this is a 404 error during stream creation that should trigger
2608      // non-streaming fallback. This handles gateways that return 404 for streaming
2609      // endpoints but work fine with non-streaming. Before v2.1.8, BetaMessageStream
2610      // threw 404s during iteration (caught by inner catch with fallback), but now
2611      // with raw streams, 404s are thrown during creation (caught here).
2612      const is404StreamCreationError =
2613        !didFallBackToNonStreaming &&
2614        errorFromRetry instanceof CannotRetryError &&
2615        errorFromRetry.originalError instanceof APIError &&
2616        errorFromRetry.originalError.status === 404
2617  
2618      if (is404StreamCreationError) {
2619        // 404 is thrown at .withResponse() before streamRequestId is assigned,
2620        // and CannotRetryError means every retry failed — so grab the failed
2621        // request's ID from the error header instead.
2622        const failedRequestId =
2623          (errorFromRetry.originalError as APIError).requestID ?? 'unknown'
2624        logForDebugging(
2625          'Streaming endpoint returned 404, falling back to non-streaming mode',
2626          { level: 'warn' },
2627        )
2628        didFallBackToNonStreaming = true
2629        if (options.onStreamingFallback) {
2630          options.onStreamingFallback()
2631        }
2632  
2633        logEvent('tengu_streaming_fallback_to_non_streaming', {
2634          model:
2635            options.model as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2636          error:
2637            '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2638          attemptNumber,
2639          maxOutputTokens,
2640          thinkingType:
2641            thinkingConfig.type as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2642          request_id:
2643            failedRequestId as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2644          fallback_cause:
2645            '404_stream_creation' as AnalyticsMetadata_I_VERIFIED_THIS_IS_NOT_CODE_OR_FILEPATHS,
2646        })
2647  
2648        try {
2649          // Fall back to non-streaming mode
2650          const result = yield* executeNonStreamingRequest(
2651            { model: options.model, source: options.querySource },
2652            {
2653              model: options.model,
2654              fallbackModel: options.fallbackModel,
2655              thinkingConfig,
2656              ...(isFastModeEnabled() && { fastMode: isFastMode }),
2657              signal,
2658            },
2659            paramsFromContext,
2660            (attempt, _startTime, tokens) => {
2661              attemptNumber = attempt
2662              maxOutputTokens = tokens
2663            },
2664            params => captureAPIRequest(params, options.querySource),
2665            failedRequestId,
2666          )
2667  
2668          const m: AssistantMessage = {
2669            message: {
2670              ...result,
2671              content: normalizeContentFromAPI(
2672                result.content,
2673                tools,
2674                options.agentId,
2675              ),
2676            },
2677            requestId: streamRequestId ?? undefined,
2678            type: 'assistant',
2679            uuid: randomUUID(),
2680            timestamp: new Date().toISOString(),
2681            ...(process.env.USER_TYPE === 'ant' &&
2682              research !== undefined && { research }),
2683            ...(advisorModel && { advisorModel }),
2684          }
2685          newMessages.push(m)
2686          fallbackMessage = m
2687          yield m
2688  
2689          // Continue to success logging below
2690        } catch (fallbackError) {
2691          // Propagate model-fallback signal to query.ts (see comment above).
2692          if (fallbackError instanceof FallbackTriggeredError) {
2693            throw fallbackError
2694          }
2695  
2696          // Fallback also failed, handle as normal error
2697          logForDebugging(
2698            `Non-streaming fallback also failed: ${errorMessage(fallbackError)}`,
2699            { level: 'error' },
2700          )
2701  
2702          let error = fallbackError
2703          let errorModel = options.model
2704          if (fallbackError instanceof CannotRetryError) {
2705            error = fallbackError.originalError
2706            errorModel = fallbackError.retryContext.model
2707          }
2708  
2709          if (error instanceof APIError) {
2710            extractQuotaStatusFromError(error)
2711          }
2712  
2713          const requestId =
2714            streamRequestId ||
2715            (error instanceof APIError ? error.requestID : undefined) ||
2716            (error instanceof APIError
2717              ? (error.error as { request_id?: string })?.request_id
2718              : undefined)
2719  
2720          logAPIError({
2721            error,
2722            model: errorModel,
2723            messageCount: messagesForAPI.length,
2724            messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
2725            durationMs: Date.now() - start,
2726            durationMsIncludingRetries: Date.now() - startIncludingRetries,
2727            attempt: attemptNumber,
2728            requestId,
2729            clientRequestId,
2730            didFallBackToNonStreaming,
2731            queryTracking: options.queryTracking,
2732            querySource: options.querySource,
2733            llmSpan,
2734            fastMode: isFastModeRequest,
2735            previousRequestId,
2736          })
2737  
2738          if (error instanceof APIUserAbortError) {
2739            releaseStreamResources()
2740            return
2741          }
2742  
2743          yield getAssistantMessageFromError(error, errorModel, {
2744            messages,
2745            messagesForAPI,
2746          })
2747          releaseStreamResources()
2748          return
2749        }
2750      } else {
2751        // Original error handling for non-404 errors
2752        logForDebugging(`Error in API request: ${errorMessage(errorFromRetry)}`, {
2753          level: 'error',
2754        })
2755  
2756        let error = errorFromRetry
2757        let errorModel = options.model
2758        if (errorFromRetry instanceof CannotRetryError) {
2759          error = errorFromRetry.originalError
2760          errorModel = errorFromRetry.retryContext.model
2761        }
2762  
2763        // Extract quota status from error headers if it's a rate limit error
2764        if (error instanceof APIError) {
2765          extractQuotaStatusFromError(error)
2766        }
2767  
2768        // Extract requestId from stream, error header, or error body
2769        const requestId =
2770          streamRequestId ||
2771          (error instanceof APIError ? error.requestID : undefined) ||
2772          (error instanceof APIError
2773            ? (error.error as { request_id?: string })?.request_id
2774            : undefined)
2775  
2776        logAPIError({
2777          error,
2778          model: errorModel,
2779          messageCount: messagesForAPI.length,
2780          messageTokens: tokenCountFromLastAPIResponse(messagesForAPI),
2781          durationMs: Date.now() - start,
2782          durationMsIncludingRetries: Date.now() - startIncludingRetries,
2783          attempt: attemptNumber,
2784          requestId,
2785          clientRequestId,
2786          didFallBackToNonStreaming,
2787          queryTracking: options.queryTracking,
2788          querySource: options.querySource,
2789          llmSpan,
2790          fastMode: isFastModeRequest,
2791          previousRequestId,
2792        })
2793  
2794        // Don't yield an assistant error message for user aborts
2795        // The interruption message is handled in query.ts
2796        if (error instanceof APIUserAbortError) {
2797          releaseStreamResources()
2798          return
2799        }
2800  
2801        yield getAssistantMessageFromError(error, errorModel, {
2802          messages,
2803          messagesForAPI,
2804        })
2805        releaseStreamResources()
2806        return
2807      }
2808    } finally {
2809      stopSessionActivity('api_call')
2810      // Must be in the finally block: if the generator is terminated early
2811      // via .return() (e.g. consumer breaks out of for-await-of, or query.ts
2812      // encounters an abort), code after the try/finally never executes.
2813      // Without this, the Response object's native TLS/socket buffers leak
2814      // until the generator itself is GC'd (see GH #32920).
2815      releaseStreamResources()
2816  
2817      // Non-streaming fallback cost: the streaming path tracks cost in the
2818      // message_delta handler before any yield. Fallback pushes to newMessages
2819      // then yields, so tracking must be here to survive .return() at the yield.
2820      if (fallbackMessage) {
2821        const fallbackUsage = fallbackMessage.message.usage
2822        usage = updateUsage(EMPTY_USAGE, fallbackUsage)
2823        stopReason = fallbackMessage.message.stop_reason
2824        const fallbackCost = calculateUSDCost(resolvedModel, fallbackUsage)
2825        costUSD += addToTotalSessionCost(
2826          fallbackCost,
2827          fallbackUsage,
2828          options.model,
2829        )
2830      }
2831    }
2832  
2833    // Mark all registered tools as sent to API so they become eligible for deletion
2834    if (feature('CACHED_MICROCOMPACT') && cachedMCEnabled) {
2835      markToolsSentToAPIState()
2836    }
2837  
2838    // Track the last requestId for the main conversation chain so shutdown
2839    // can send a cache eviction hint to inference. Exclude backgrounded
2840    // sessions (Ctrl+B) which share the repl_main_thread querySource but
2841    // run inside an agent context — they are independent conversation chains
2842    // whose cache should not be evicted when the foreground session clears.
2843    if (
2844      streamRequestId &&
2845      !getAgentContext() &&
2846      (options.querySource.startsWith('repl_main_thread') ||
2847        options.querySource === 'sdk')
2848    ) {
2849      setLastMainRequestId(streamRequestId)
2850    }
2851  
2852    // Precompute scalars so the fire-and-forget .then() closure doesn't pin the
2853    // full messagesForAPI array (the entire conversation up to the context window
2854    // limit) until getToolPermissionContext() resolves.
2855    const logMessageCount = messagesForAPI.length
2856    const logMessageTokens = tokenCountFromLastAPIResponse(messagesForAPI)
2857    void options.getToolPermissionContext().then(permissionContext => {
2858      logAPISuccessAndDuration({
2859        model:
2860          newMessages[0]?.message.model ?? partialMessage?.model ?? options.model,
2861        preNormalizedModel: options.model,
2862        usage,
2863        start,
2864        startIncludingRetries,
2865        attempt: attemptNumber,
2866        messageCount: logMessageCount,
2867        messageTokens: logMessageTokens,
2868        requestId: streamRequestId ?? null,
2869        stopReason,
2870        ttftMs,
2871        didFallBackToNonStreaming,
2872        querySource: options.querySource,
2873        headers: responseHeaders,
2874        costUSD,
2875        queryTracking: options.queryTracking,
2876        permissionMode: permissionContext.mode,
2877        // Pass newMessages for beta tracing - extraction happens in logging.ts
2878        // only when beta tracing is enabled
2879        newMessages,
2880        llmSpan,
2881        globalCacheStrategy,
2882        requestSetupMs: start - startIncludingRetries,
2883        attemptStartTimes,
2884        fastMode: isFastModeRequest,
2885        previousRequestId,
2886        betas: lastRequestBetas,
2887      })
2888    })
2889  
2890    // Defensive: also release on normal completion (no-op if finally already ran).
2891    releaseStreamResources()
2892  }
2893  
2894  /**
2895   * Cleans up stream resources to prevent memory leaks.
2896   * @internal Exported for testing
2897   */
2898  export function cleanupStream(
2899    stream: Stream<BetaRawMessageStreamEvent> | undefined,
2900  ): void {
2901    if (!stream) {
2902      return
2903    }
2904    try {
2905      // Abort the stream via its controller if not already aborted
2906      if (!stream.controller.signal.aborted) {
2907        stream.controller.abort()
2908      }
2909    } catch {
2910      // Ignore - stream may already be closed
2911    }
2912  }
2913  
2914  /**
2915   * Updates usage statistics with new values from streaming API events.
2916   * Note: Anthropic's streaming API provides cumulative usage totals, not incremental deltas.
2917   * Each event contains the complete usage up to that point in the stream.
2918   *
2919   * Input-related tokens (input_tokens, cache_creation_input_tokens, cache_read_input_tokens)
2920   * are typically set in message_start and remain constant. message_delta events may send
2921   * explicit 0 values for these fields, which should not overwrite the values from message_start.
2922   * We only update these fields if they have a non-null, non-zero value.
2923   */
2924  export function updateUsage(
2925    usage: Readonly<NonNullableUsage>,
2926    partUsage: BetaMessageDeltaUsage | undefined,
2927  ): NonNullableUsage {
2928    if (!partUsage) {
2929      return { ...usage }
2930    }
2931    return {
2932      input_tokens:
2933        partUsage.input_tokens !== null && partUsage.input_tokens > 0
2934          ? partUsage.input_tokens
2935          : usage.input_tokens,
2936      cache_creation_input_tokens:
2937        partUsage.cache_creation_input_tokens !== null &&
2938        partUsage.cache_creation_input_tokens > 0
2939          ? partUsage.cache_creation_input_tokens
2940          : usage.cache_creation_input_tokens,
2941      cache_read_input_tokens:
2942        partUsage.cache_read_input_tokens !== null &&
2943        partUsage.cache_read_input_tokens > 0
2944          ? partUsage.cache_read_input_tokens
2945          : usage.cache_read_input_tokens,
2946      output_tokens: partUsage.output_tokens ?? usage.output_tokens,
2947      server_tool_use: {
2948        web_search_requests:
2949          partUsage.server_tool_use?.web_search_requests ??
2950          usage.server_tool_use.web_search_requests,
2951        web_fetch_requests:
2952          partUsage.server_tool_use?.web_fetch_requests ??
2953          usage.server_tool_use.web_fetch_requests,
2954      },
2955      service_tier: usage.service_tier,
2956      cache_creation: {
2957        // SDK type BetaMessageDeltaUsage is missing cache_creation, but it's real!
2958        ephemeral_1h_input_tokens:
2959          (partUsage as BetaUsage).cache_creation?.ephemeral_1h_input_tokens ??
2960          usage.cache_creation.ephemeral_1h_input_tokens,
2961        ephemeral_5m_input_tokens:
2962          (partUsage as BetaUsage).cache_creation?.ephemeral_5m_input_tokens ??
2963          usage.cache_creation.ephemeral_5m_input_tokens,
2964      },
2965      // cache_deleted_input_tokens: returned by the API when cache editing
2966      // deletes KV cache content, but not in SDK types. Kept off NonNullableUsage
2967      // so the string is eliminated from external builds by dead code elimination.
2968      // Uses the same > 0 guard as other token fields to prevent message_delta
2969      // from overwriting the real value with 0.
2970      ...(feature('CACHED_MICROCOMPACT')
2971        ? {
2972            cache_deleted_input_tokens:
2973              (partUsage as unknown as { cache_deleted_input_tokens?: number })
2974                .cache_deleted_input_tokens != null &&
2975              (partUsage as unknown as { cache_deleted_input_tokens: number })
2976                .cache_deleted_input_tokens > 0
2977                ? (partUsage as unknown as { cache_deleted_input_tokens: number })
2978                    .cache_deleted_input_tokens
2979                : ((usage as unknown as { cache_deleted_input_tokens?: number })
2980                    .cache_deleted_input_tokens ?? 0),
2981          }
2982        : {}),
2983      inference_geo: usage.inference_geo,
2984      iterations: partUsage.iterations ?? usage.iterations,
2985      speed: (partUsage as BetaUsage).speed ?? usage.speed,
2986    }
2987  }
2988  
2989  /**
2990   * Accumulates usage from one message into a total usage object.
2991   * Used to track cumulative usage across multiple assistant turns.
2992   */
2993  export function accumulateUsage(
2994    totalUsage: Readonly<NonNullableUsage>,
2995    messageUsage: Readonly<NonNullableUsage>,
2996  ): NonNullableUsage {
2997    return {
2998      input_tokens: totalUsage.input_tokens + messageUsage.input_tokens,
2999      cache_creation_input_tokens:
3000        totalUsage.cache_creation_input_tokens +
3001        messageUsage.cache_creation_input_tokens,
3002      cache_read_input_tokens:
3003        totalUsage.cache_read_input_tokens + messageUsage.cache_read_input_tokens,
3004      output_tokens: totalUsage.output_tokens + messageUsage.output_tokens,
3005      server_tool_use: {
3006        web_search_requests:
3007          totalUsage.server_tool_use.web_search_requests +
3008          messageUsage.server_tool_use.web_search_requests,
3009        web_fetch_requests:
3010          totalUsage.server_tool_use.web_fetch_requests +
3011          messageUsage.server_tool_use.web_fetch_requests,
3012      },
3013      service_tier: messageUsage.service_tier, // Use the most recent service tier
3014      cache_creation: {
3015        ephemeral_1h_input_tokens:
3016          totalUsage.cache_creation.ephemeral_1h_input_tokens +
3017          messageUsage.cache_creation.ephemeral_1h_input_tokens,
3018        ephemeral_5m_input_tokens:
3019          totalUsage.cache_creation.ephemeral_5m_input_tokens +
3020          messageUsage.cache_creation.ephemeral_5m_input_tokens,
3021      },
3022      // See comment in updateUsage — field is not on NonNullableUsage to keep
3023      // the string out of external builds.
3024      ...(feature('CACHED_MICROCOMPACT')
3025        ? {
3026            cache_deleted_input_tokens:
3027              ((totalUsage as unknown as { cache_deleted_input_tokens?: number })
3028                .cache_deleted_input_tokens ?? 0) +
3029              ((
3030                messageUsage as unknown as { cache_deleted_input_tokens?: number }
3031              ).cache_deleted_input_tokens ?? 0),
3032          }
3033        : {}),
3034      inference_geo: messageUsage.inference_geo, // Use the most recent
3035      iterations: messageUsage.iterations, // Use the most recent
3036      speed: messageUsage.speed, // Use the most recent
3037    }
3038  }
3039  
3040  function isToolResultBlock(
3041    block: unknown,
3042  ): block is { type: 'tool_result'; tool_use_id: string } {
3043    return (
3044      block !== null &&
3045      typeof block === 'object' &&
3046      'type' in block &&
3047      (block as { type: string }).type === 'tool_result' &&
3048      'tool_use_id' in block
3049    )
3050  }
3051  
3052  type CachedMCEditsBlock = {
3053    type: 'cache_edits'
3054    edits: { type: 'delete'; cache_reference: string }[]
3055  }
3056  
3057  type CachedMCPinnedEdits = {
3058    userMessageIndex: number
3059    block: CachedMCEditsBlock
3060  }
3061  
3062  // Exported for testing cache_reference placement constraints
3063  export function addCacheBreakpoints(
3064    messages: (UserMessage | AssistantMessage)[],
3065    enablePromptCaching: boolean,
3066    querySource?: QuerySource,
3067    useCachedMC = false,
3068    newCacheEdits?: CachedMCEditsBlock | null,
3069    pinnedEdits?: CachedMCPinnedEdits[],
3070    skipCacheWrite = false,
3071  ): MessageParam[] {
3072    logEvent('tengu_api_cache_breakpoints', {
3073      totalMessageCount: messages.length,
3074      cachingEnabled: enablePromptCaching,
3075      skipCacheWrite,
3076    })
3077  
3078    // Exactly one message-level cache_control marker per request. Mycro's
3079    // turn-to-turn eviction (page_manager/index.rs: Index::insert) frees
3080    // local-attention KV pages at any cached prefix position NOT in
3081    // cache_store_int_token_boundaries. With two markers the second-to-last
3082    // position is protected and its locals survive an extra turn even though
3083    // nothing will ever resume from there — with one marker they're freed
3084    // immediately. For fire-and-forget forks (skipCacheWrite) we shift the
3085    // marker to the second-to-last message: that's the last shared-prefix
3086    // point, so the write is a no-op merge on mycro (entry already exists)
3087    // and the fork doesn't leave its own tail in the KVCC. Dense pages are
3088    // refcounted and survive via the new hash either way.
3089    const markerIndex = skipCacheWrite ? messages.length - 2 : messages.length - 1
3090    const result = messages.map((msg, index) => {
3091      const addCache = index === markerIndex
3092      if (msg.type === 'user') {
3093        return userMessageToMessageParam(
3094          msg,
3095          addCache,
3096          enablePromptCaching,
3097          querySource,
3098        )
3099      }
3100      return assistantMessageToMessageParam(
3101        msg,
3102        addCache,
3103        enablePromptCaching,
3104        querySource,
3105      )
3106    })
3107  
3108    if (!useCachedMC) {
3109      return result
3110    }
3111  
3112    // Track all cache_references being deleted to prevent duplicates across blocks.
3113    const seenDeleteRefs = new Set<string>()
3114  
3115    // Helper to deduplicate a cache_edits block against already-seen deletions
3116    const deduplicateEdits = (block: CachedMCEditsBlock): CachedMCEditsBlock => {
3117      const uniqueEdits = block.edits.filter(edit => {
3118        if (seenDeleteRefs.has(edit.cache_reference)) {
3119          return false
3120        }
3121        seenDeleteRefs.add(edit.cache_reference)
3122        return true
3123      })
3124      return { ...block, edits: uniqueEdits }
3125    }
3126  
3127    // Re-insert all previously-pinned cache_edits at their original positions
3128    for (const pinned of pinnedEdits ?? []) {
3129      const msg = result[pinned.userMessageIndex]
3130      if (msg && msg.role === 'user') {
3131        if (!Array.isArray(msg.content)) {
3132          msg.content = [{ type: 'text', text: msg.content as string }]
3133        }
3134        const dedupedBlock = deduplicateEdits(pinned.block)
3135        if (dedupedBlock.edits.length > 0) {
3136          insertBlockAfterToolResults(msg.content, dedupedBlock)
3137        }
3138      }
3139    }
3140  
3141    // Insert new cache_edits into the last user message and pin them
3142    if (newCacheEdits && result.length > 0) {
3143      const dedupedNewEdits = deduplicateEdits(newCacheEdits)
3144      if (dedupedNewEdits.edits.length > 0) {
3145        for (let i = result.length - 1; i >= 0; i--) {
3146          const msg = result[i]
3147          if (msg && msg.role === 'user') {
3148            if (!Array.isArray(msg.content)) {
3149              msg.content = [{ type: 'text', text: msg.content as string }]
3150            }
3151            insertBlockAfterToolResults(msg.content, dedupedNewEdits)
3152            // Pin so this block is re-sent at the same position in future calls
3153            pinCacheEdits(i, newCacheEdits)
3154  
3155            logForDebugging(
3156              `Added cache_edits block with ${dedupedNewEdits.edits.length} deletion(s) to message[${i}]: ${dedupedNewEdits.edits.map(e => e.cache_reference).join(', ')}`,
3157            )
3158            break
3159          }
3160        }
3161      }
3162    }
3163  
3164    // Add cache_reference to tool_result blocks that are within the cached prefix.
3165    // Must be done AFTER cache_edits insertion since that modifies content arrays.
3166    if (enablePromptCaching) {
3167      // Find the last message containing a cache_control marker
3168      let lastCCMsg = -1
3169      for (let i = 0; i < result.length; i++) {
3170        const msg = result[i]!
3171        if (Array.isArray(msg.content)) {
3172          for (const block of msg.content) {
3173            if (block && typeof block === 'object' && 'cache_control' in block) {
3174              lastCCMsg = i
3175            }
3176          }
3177        }
3178      }
3179  
3180      // Add cache_reference to tool_result blocks that are strictly before
3181      // the last cache_control marker. The API requires cache_reference to
3182      // appear "before or on" the last cache_control — we use strict "before"
3183      // to avoid edge cases where cache_edits splicing shifts block indices.
3184      //
3185      // Create new objects instead of mutating in-place to avoid contaminating
3186      // blocks reused by secondary queries that use models without cache_editing support.
3187      if (lastCCMsg >= 0) {
3188        for (let i = 0; i < lastCCMsg; i++) {
3189          const msg = result[i]!
3190          if (msg.role !== 'user' || !Array.isArray(msg.content)) {
3191            continue
3192          }
3193          let cloned = false
3194          for (let j = 0; j < msg.content.length; j++) {
3195            const block = msg.content[j]
3196            if (block && isToolResultBlock(block)) {
3197              if (!cloned) {
3198                msg.content = [...msg.content]
3199                cloned = true
3200              }
3201              msg.content[j] = Object.assign({}, block, {
3202                cache_reference: block.tool_use_id,
3203              })
3204            }
3205          }
3206        }
3207      }
3208    }
3209  
3210    return result
3211  }
3212  
3213  export function buildSystemPromptBlocks(
3214    systemPrompt: SystemPrompt,
3215    enablePromptCaching: boolean,
3216    options?: {
3217      skipGlobalCacheForSystemPrompt?: boolean
3218      querySource?: QuerySource
3219    },
3220  ): TextBlockParam[] {
3221    // IMPORTANT: Do not add any more blocks for caching or you will get a 400
3222    return splitSysPromptPrefix(systemPrompt, {
3223      skipGlobalCacheForSystemPrompt: options?.skipGlobalCacheForSystemPrompt,
3224    }).map(block => {
3225      return {
3226        type: 'text' as const,
3227        text: block.text,
3228        ...(enablePromptCaching &&
3229          block.cacheScope !== null && {
3230            cache_control: getCacheControl({
3231              scope: block.cacheScope,
3232              querySource: options?.querySource,
3233            }),
3234          }),
3235      }
3236    })
3237  }
3238  
3239  type HaikuOptions = Omit<Options, 'model' | 'getToolPermissionContext'>
3240  
3241  export async function queryHaiku({
3242    systemPrompt = asSystemPrompt([]),
3243    userPrompt,
3244    outputFormat,
3245    signal,
3246    options,
3247  }: {
3248    systemPrompt: SystemPrompt
3249    userPrompt: string
3250    outputFormat?: BetaJSONOutputFormat
3251    signal: AbortSignal
3252    options: HaikuOptions
3253  }): Promise<AssistantMessage> {
3254    const result = await withVCR(
3255      [
3256        createUserMessage({
3257          content: systemPrompt.map(text => ({ type: 'text', text })),
3258        }),
3259        createUserMessage({
3260          content: userPrompt,
3261        }),
3262      ],
3263      async () => {
3264        const messages = [
3265          createUserMessage({
3266            content: userPrompt,
3267          }),
3268        ]
3269  
3270        const result = await queryModelWithoutStreaming({
3271          messages,
3272          systemPrompt,
3273          thinkingConfig: { type: 'disabled' },
3274          tools: [],
3275          signal,
3276          options: {
3277            ...options,
3278            model: getSmallFastModel(),
3279            enablePromptCaching: options.enablePromptCaching ?? false,
3280            outputFormat,
3281            async getToolPermissionContext() {
3282              return getEmptyToolPermissionContext()
3283            },
3284          },
3285        })
3286        return [result]
3287      },
3288    )
3289    // We don't use streaming for Haiku so this is safe
3290    return result[0]! as AssistantMessage
3291  }
3292  
3293  type QueryWithModelOptions = Omit<Options, 'getToolPermissionContext'>
3294  
3295  /**
3296   * Query a specific model through the Claude Code infrastructure.
3297   * This goes through the full query pipeline including proper authentication,
3298   * betas, and headers - unlike direct API calls.
3299   */
3300  export async function queryWithModel({
3301    systemPrompt = asSystemPrompt([]),
3302    userPrompt,
3303    outputFormat,
3304    signal,
3305    options,
3306  }: {
3307    systemPrompt: SystemPrompt
3308    userPrompt: string
3309    outputFormat?: BetaJSONOutputFormat
3310    signal: AbortSignal
3311    options: QueryWithModelOptions
3312  }): Promise<AssistantMessage> {
3313    const result = await withVCR(
3314      [
3315        createUserMessage({
3316          content: systemPrompt.map(text => ({ type: 'text', text })),
3317        }),
3318        createUserMessage({
3319          content: userPrompt,
3320        }),
3321      ],
3322      async () => {
3323        const messages = [
3324          createUserMessage({
3325            content: userPrompt,
3326          }),
3327        ]
3328  
3329        const result = await queryModelWithoutStreaming({
3330          messages,
3331          systemPrompt,
3332          thinkingConfig: { type: 'disabled' },
3333          tools: [],
3334          signal,
3335          options: {
3336            ...options,
3337            enablePromptCaching: options.enablePromptCaching ?? false,
3338            outputFormat,
3339            async getToolPermissionContext() {
3340              return getEmptyToolPermissionContext()
3341            },
3342          },
3343        })
3344        return [result]
3345      },
3346    )
3347    return result[0]! as AssistantMessage
3348  }
3349  
3350  // Non-streaming requests have a 10min max per the docs:
3351  // https://platform.claude.com/docs/en/api/errors#long-requests
3352  // The SDK's 21333-token cap is derived from 10min × 128k tokens/hour, but we
3353  // bypass it by setting a client-level timeout, so we can cap higher.
3354  export const MAX_NON_STREAMING_TOKENS = 64_000
3355  
3356  /**
3357   * Adjusts thinking budget when max_tokens is capped for non-streaming fallback.
3358   * Ensures the API constraint: max_tokens > thinking.budget_tokens
3359   *
3360   * @param params - The parameters that will be sent to the API
3361   * @param maxTokensCap - The maximum allowed tokens (MAX_NON_STREAMING_TOKENS)
3362   * @returns Adjusted parameters with thinking budget capped if needed
3363   */
3364  export function adjustParamsForNonStreaming<
3365    T extends {
3366      max_tokens: number
3367      thinking?: BetaMessageStreamParams['thinking']
3368    },
3369  >(params: T, maxTokensCap: number): T {
3370    const cappedMaxTokens = Math.min(params.max_tokens, maxTokensCap)
3371  
3372    // Adjust thinking budget if it would exceed capped max_tokens
3373    // to maintain the constraint: max_tokens > thinking.budget_tokens
3374    const adjustedParams = { ...params }
3375    if (
3376      adjustedParams.thinking?.type === 'enabled' &&
3377      adjustedParams.thinking.budget_tokens
3378    ) {
3379      adjustedParams.thinking = {
3380        ...adjustedParams.thinking,
3381        budget_tokens: Math.min(
3382          adjustedParams.thinking.budget_tokens,
3383          cappedMaxTokens - 1, // Must be at least 1 less than max_tokens
3384        ),
3385      }
3386    }
3387  
3388    return {
3389      ...adjustedParams,
3390      max_tokens: cappedMaxTokens,
3391    }
3392  }
3393  
3394  function isMaxTokensCapEnabled(): boolean {
3395    // 3P default: false (not validated on Bedrock/Vertex)
3396    return getFeatureValue_CACHED_MAY_BE_STALE('tengu_otk_slot_v1', false)
3397  }
3398  
3399  export function getMaxOutputTokensForModel(model: string): number {
3400    const maxOutputTokens = getModelMaxOutputTokens(model)
3401  
3402    // Slot-reservation cap: drop default to 8k for all models. BQ p99 output
3403    // = 4,911 tokens; 32k/64k defaults over-reserve 8-16× slot capacity.
3404    // Requests hitting the cap get one clean retry at 64k (query.ts
3405    // max_output_tokens_escalate). Math.min keeps models with lower native
3406    // defaults (e.g. claude-3-opus at 4k) at their native value. Applied
3407    // before the env-var override so CLAUDE_CODE_MAX_OUTPUT_TOKENS still wins.
3408    const defaultTokens = isMaxTokensCapEnabled()
3409      ? Math.min(maxOutputTokens.default, CAPPED_DEFAULT_MAX_TOKENS)
3410      : maxOutputTokens.default
3411  
3412    const result = validateBoundedIntEnvVar(
3413      'CLAUDE_CODE_MAX_OUTPUT_TOKENS',
3414      process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS,
3415      defaultTokens,
3416      maxOutputTokens.upperLimit,
3417    )
3418    return result.effective
3419  }