/ utils / ultraplan / ccrSession.ts
ccrSession.ts
  1  // CCR session polling for /ultraplan. Waits for an approved ExitPlanMode
  2  // tool_result, then extracts the plan text. Uses pollRemoteSessionEvents
  3  // (shared with RemoteAgentTask) for pagination + typed SDKMessage[].
  4  // Plan mode is set via set_permission_mode control_request in
  5  // teleportToRemote's CreateSession events array.
  6  
  7  import type {
  8    ToolResultBlockParam,
  9    ToolUseBlock,
 10  } from '@anthropic-ai/sdk/resources'
 11  import type { SDKMessage } from '../../entrypoints/agentSdkTypes.js'
 12  import { EXIT_PLAN_MODE_V2_TOOL_NAME } from '../../tools/ExitPlanModeTool/constants.js'
 13  import { logForDebugging } from '../debug.js'
 14  import { sleep } from '../sleep.js'
 15  import { isTransientNetworkError } from '../teleport/api.js'
 16  import {
 17    type PollRemoteSessionResponse,
 18    pollRemoteSessionEvents,
 19  } from '../teleport.js'
 20  
 21  const POLL_INTERVAL_MS = 3000
 22  // pollRemoteSessionEvents doesn't retry. A 30min poll makes ~600 calls;
 23  // at any nonzero 5xx rate one blip would kill the run.
 24  const MAX_CONSECUTIVE_FAILURES = 5
 25  
 26  export type PollFailReason =
 27    | 'terminated'
 28    | 'timeout_pending'
 29    | 'timeout_no_plan'
 30    | 'extract_marker_missing'
 31    | 'network_or_unknown'
 32    | 'stopped'
 33  
 34  export class UltraplanPollError extends Error {
 35    constructor(
 36      message: string,
 37      readonly reason: PollFailReason,
 38      readonly rejectCount: number,
 39      options?: ErrorOptions,
 40    ) {
 41      super(message, options)
 42      this.name = 'UltraplanPollError'
 43    }
 44  }
 45  
 46  // Sentinel string the browser PlanModal includes in the feedback when the user
 47  // clicks "teleport back to terminal". Plan text follows on the next line.
 48  export const ULTRAPLAN_TELEPORT_SENTINEL = '__ULTRAPLAN_TELEPORT_LOCAL__'
 49  
 50  export type ScanResult =
 51    | { kind: 'approved'; plan: string }
 52    | { kind: 'teleport'; plan: string }
 53    | { kind: 'rejected'; id: string }
 54    | { kind: 'pending' }
 55    | { kind: 'terminated'; subtype: string }
 56    | { kind: 'unchanged' }
 57  
 58  /**
 59   * Pill/detail-view state derived from the event stream. Transitions:
 60   *   running → (turn ends, no ExitPlanMode) → needs_input
 61   *   needs_input → (user replies in browser) → running
 62   *   running → (ExitPlanMode emitted, no result yet) → plan_ready
 63   *   plan_ready → (rejected) → running
 64   *   plan_ready → (approved) → poll resolves, pill removed
 65   */
 66  export type UltraplanPhase = 'running' | 'needs_input' | 'plan_ready'
 67  
 68  /**
 69   * Pure stateful classifier for the CCR event stream. Ingests SDKMessage[]
 70   * batches (as delivered by pollRemoteSessionEvents) and returns the current
 71   * ExitPlanMode verdict. No I/O, no timers — feed it synthetic or recorded
 72   * events for unit tests and offline replay.
 73   *
 74   * Precedence (approved > terminated > rejected > pending > unchanged):
 75   * pollRemoteSessionEvents paginates up to 50 pages per call, so one ingest
 76   * can span seconds of session activity. A batch may contain both an approved
 77   * tool_result AND a subsequent {type:'result'} (user approved, then remote
 78   * crashed). The approved plan is real and in threadstore — don't drop it.
 79   */
 80  export class ExitPlanModeScanner {
 81    private exitPlanCalls: string[] = []
 82    private results = new Map<string, ToolResultBlockParam>()
 83    private rejectedIds = new Set<string>()
 84    private terminated: { subtype: string } | null = null
 85    private rescanAfterRejection = false
 86    everSeenPending = false
 87  
 88    get rejectCount(): number {
 89      return this.rejectedIds.size
 90    }
 91  
 92    /**
 93     * True when an ExitPlanMode tool_use exists with no tool_result yet —
 94     * the remote is showing the approval dialog in the browser.
 95     */
 96    get hasPendingPlan(): boolean {
 97      const id = this.exitPlanCalls.findLast(c => !this.rejectedIds.has(c))
 98      return id !== undefined && !this.results.has(id)
 99    }
100  
101    ingest(newEvents: SDKMessage[]): ScanResult {
102      for (const m of newEvents) {
103        if (m.type === 'assistant') {
104          for (const block of m.message.content) {
105            if (block.type !== 'tool_use') continue
106            const tu = block as ToolUseBlock
107            if (tu.name === EXIT_PLAN_MODE_V2_TOOL_NAME) {
108              this.exitPlanCalls.push(tu.id)
109            }
110          }
111        } else if (m.type === 'user') {
112          const content = m.message.content
113          if (!Array.isArray(content)) continue
114          for (const block of content) {
115            if (block.type === 'tool_result') {
116              this.results.set(block.tool_use_id, block)
117            }
118          }
119        } else if (m.type === 'result' && m.subtype !== 'success') {
120          // result(success) fires after EVERY CCR turn
121          // If the remote asks a clarifying question (turn ends without
122          // ExitPlanMode), we must keep polling — the user can reply in
123          // the browser and reach ExitPlanMode in a later turn.
124          // Only error subtypes (error_during_execution, error_max_turns,
125          // etc.) mean the session is actually dead.
126          this.terminated = { subtype: m.subtype }
127        }
128      }
129  
130      // Skip-scan when nothing could have moved the target: no new events, no
131      // rejection last tick. A rejection moves the newest-non-rejected target.
132      const shouldScan = newEvents.length > 0 || this.rescanAfterRejection
133      this.rescanAfterRejection = false
134  
135      let found:
136        | { kind: 'approved'; plan: string }
137        | { kind: 'teleport'; plan: string }
138        | { kind: 'rejected'; id: string }
139        | { kind: 'pending' }
140        | null = null
141      if (shouldScan) {
142        for (let i = this.exitPlanCalls.length - 1; i >= 0; i--) {
143          const id = this.exitPlanCalls[i]!
144          if (this.rejectedIds.has(id)) continue
145          const tr = this.results.get(id)
146          if (!tr) {
147            found = { kind: 'pending' }
148          } else if (tr.is_error === true) {
149            const teleportPlan = extractTeleportPlan(tr.content)
150            found =
151              teleportPlan !== null
152                ? { kind: 'teleport', plan: teleportPlan }
153                : { kind: 'rejected', id }
154          } else {
155            found = { kind: 'approved', plan: extractApprovedPlan(tr.content) }
156          }
157          break
158        }
159        if (found?.kind === 'approved' || found?.kind === 'teleport') return found
160      }
161  
162      // Bookkeeping before the terminated check — a batch can contain BOTH a
163      // rejected tool_result and a {type:'result'}; rejectCount must reflect
164      // the rejection even though terminated takes return precedence.
165      if (found?.kind === 'rejected') {
166        this.rejectedIds.add(found.id)
167        this.rescanAfterRejection = true
168      }
169      if (this.terminated) {
170        return { kind: 'terminated', subtype: this.terminated.subtype }
171      }
172      if (found?.kind === 'rejected') {
173        return found
174      }
175      if (found?.kind === 'pending') {
176        this.everSeenPending = true
177        return found
178      }
179      return { kind: 'unchanged' }
180    }
181  }
182  
183  export type PollResult = {
184    plan: string
185    rejectCount: number
186    /** 'local' = user clicked teleport (execute here, archive remote). 'remote' = user approved in-CCR execution (don't archive). */
187    executionTarget: 'local' | 'remote'
188  }
189  
190  // Returns the approved plan text and where the user wants it executed.
191  // 'approved' scrapes from the "## Approved Plan:" marker (ExitPlanModeV2Tool
192  // default branch) — the model writes plan to a file inside CCR and calls
193  // ExitPlanMode({allowedPrompts}), so input.plan is never in threadstore.
194  // 'teleport' scrapes from the ULTRAPLAN_TELEPORT_SENTINEL in a deny tool_result —
195  // browser sends a rejection so the remote stays in plan mode, with the plan
196  // text embedded in the feedback. Normal rejections (is_error === true, no
197  // sentinel) are tracked and skipped so the user can iterate in the browser.
198  export async function pollForApprovedExitPlanMode(
199    sessionId: string,
200    timeoutMs: number,
201    onPhaseChange?: (phase: UltraplanPhase) => void,
202    shouldStop?: () => boolean,
203  ): Promise<PollResult> {
204    const deadline = Date.now() + timeoutMs
205    const scanner = new ExitPlanModeScanner()
206    let cursor: string | null = null
207    let failures = 0
208    let lastPhase: UltraplanPhase = 'running'
209  
210    while (Date.now() < deadline) {
211      if (shouldStop?.()) {
212        throw new UltraplanPollError(
213          'poll stopped by caller',
214          'stopped',
215          scanner.rejectCount,
216        )
217      }
218      let newEvents: SDKMessage[]
219      let sessionStatus: PollRemoteSessionResponse['sessionStatus']
220      try {
221        // Metadata fetch (session_status) is the needs_input signal —
222        // threadstore doesn't persist result(success) turn-end events, so
223        // idle status is the only authoritative "remote is waiting" marker.
224        const resp = await pollRemoteSessionEvents(sessionId, cursor)
225        newEvents = resp.newEvents
226        cursor = resp.lastEventId
227        sessionStatus = resp.sessionStatus
228        failures = 0
229      } catch (e) {
230        const transient = isTransientNetworkError(e)
231        if (!transient || ++failures >= MAX_CONSECUTIVE_FAILURES) {
232          throw new UltraplanPollError(
233            e instanceof Error ? e.message : String(e),
234            'network_or_unknown',
235            scanner.rejectCount,
236            { cause: e },
237          )
238        }
239        await sleep(POLL_INTERVAL_MS)
240        continue
241      }
242  
243      let result: ScanResult
244      try {
245        result = scanner.ingest(newEvents)
246      } catch (e) {
247        throw new UltraplanPollError(
248          e instanceof Error ? e.message : String(e),
249          'extract_marker_missing',
250          scanner.rejectCount,
251        )
252      }
253      if (result.kind === 'approved') {
254        return {
255          plan: result.plan,
256          rejectCount: scanner.rejectCount,
257          executionTarget: 'remote',
258        }
259      }
260      if (result.kind === 'teleport') {
261        return {
262          plan: result.plan,
263          rejectCount: scanner.rejectCount,
264          executionTarget: 'local',
265        }
266      }
267      if (result.kind === 'terminated') {
268        throw new UltraplanPollError(
269          `remote session ended (${result.subtype}) before plan approval`,
270          'terminated',
271          scanner.rejectCount,
272        )
273      }
274      // plan_ready from the event stream wins; otherwise idle session status
275      // means the remote asked a question and is waiting for a browser reply.
276      // requires_action with no pending plan is also needs_input — the remote
277      // may be blocked on a non-ExitPlanMode permission prompt.
278      // CCR briefly flips to 'idle' between tool turns (see STABLE_IDLE_POLLS
279      // in RemoteAgentTask). Only trust idle when no new events arrived —
280      // events flowing means the session is working regardless of the status
281      // snapshot. This also makes needs_input → running snap back on the first
282      // poll that sees the user's reply event, even if session_status lags.
283      const quietIdle =
284        (sessionStatus === 'idle' || sessionStatus === 'requires_action') &&
285        newEvents.length === 0
286      const phase: UltraplanPhase = scanner.hasPendingPlan
287        ? 'plan_ready'
288        : quietIdle
289          ? 'needs_input'
290          : 'running'
291      if (phase !== lastPhase) {
292        logForDebugging(`[ultraplan] phase ${lastPhase} → ${phase}`)
293        lastPhase = phase
294        onPhaseChange?.(phase)
295      }
296      await sleep(POLL_INTERVAL_MS)
297    }
298  
299    throw new UltraplanPollError(
300      scanner.everSeenPending
301        ? `no approval after ${timeoutMs / 1000}s`
302        : `ExitPlanMode never reached after ${timeoutMs / 1000}s (the remote container failed to start, or session ID mismatch?)`,
303      scanner.everSeenPending ? 'timeout_pending' : 'timeout_no_plan',
304      scanner.rejectCount,
305    )
306  }
307  
308  // tool_result content may be string or [{type:'text',text}] depending on
309  // threadstore encoding.
310  function contentToText(content: ToolResultBlockParam['content']): string {
311    return typeof content === 'string'
312      ? content
313      : Array.isArray(content)
314        ? content.map(b => ('text' in b ? b.text : '')).join('')
315        : ''
316  }
317  
318  // Extracts the plan text after the ULTRAPLAN_TELEPORT_SENTINEL marker.
319  // Returns null when the sentinel is absent — callers treat null as a normal
320  // user rejection (scanner falls through to { kind: 'rejected' }).
321  function extractTeleportPlan(
322    content: ToolResultBlockParam['content'],
323  ): string | null {
324    const text = contentToText(content)
325    const marker = `${ULTRAPLAN_TELEPORT_SENTINEL}\n`
326    const idx = text.indexOf(marker)
327    if (idx === -1) return null
328    return text.slice(idx + marker.length).trimEnd()
329  }
330  
331  // Plan is echoed in tool_result content as "## Approved Plan:\n<text>" or
332  // "## Approved Plan (edited by user):\n<text>" (ExitPlanModeV2Tool).
333  function extractApprovedPlan(content: ToolResultBlockParam['content']): string {
334    const text = contentToText(content)
335    // Try both markers — edited plans use a different label.
336    const markers = [
337      '## Approved Plan (edited by user):\n',
338      '## Approved Plan:\n',
339    ]
340    for (const marker of markers) {
341      const idx = text.indexOf(marker)
342      if (idx !== -1) {
343        return text.slice(idx + marker.length).trimEnd()
344      }
345    }
346    throw new Error(
347      `ExitPlanMode approved but tool_result has no "## Approved Plan:" marker — remote may have hit the empty-plan or isAgent branch. Content preview: ${text.slice(0, 200)}`,
348    )
349  }