ccrSession.ts
1 // CCR session polling for /ultraplan. Waits for an approved ExitPlanMode 2 // tool_result, then extracts the plan text. Uses pollRemoteSessionEvents 3 // (shared with RemoteAgentTask) for pagination + typed SDKMessage[]. 4 // Plan mode is set via set_permission_mode control_request in 5 // teleportToRemote's CreateSession events array. 6 7 import type { 8 ToolResultBlockParam, 9 ToolUseBlock, 10 } from '@anthropic-ai/sdk/resources' 11 import type { SDKMessage } from '../../entrypoints/agentSdkTypes.js' 12 import { EXIT_PLAN_MODE_V2_TOOL_NAME } from '../../tools/ExitPlanModeTool/constants.js' 13 import { logForDebugging } from '../debug.js' 14 import { sleep } from '../sleep.js' 15 import { isTransientNetworkError } from '../teleport/api.js' 16 import { 17 type PollRemoteSessionResponse, 18 pollRemoteSessionEvents, 19 } from '../teleport.js' 20 21 const POLL_INTERVAL_MS = 3000 22 // pollRemoteSessionEvents doesn't retry. A 30min poll makes ~600 calls; 23 // at any nonzero 5xx rate one blip would kill the run. 24 const MAX_CONSECUTIVE_FAILURES = 5 25 26 export type PollFailReason = 27 | 'terminated' 28 | 'timeout_pending' 29 | 'timeout_no_plan' 30 | 'extract_marker_missing' 31 | 'network_or_unknown' 32 | 'stopped' 33 34 export class UltraplanPollError extends Error { 35 constructor( 36 message: string, 37 readonly reason: PollFailReason, 38 readonly rejectCount: number, 39 options?: ErrorOptions, 40 ) { 41 super(message, options) 42 this.name = 'UltraplanPollError' 43 } 44 } 45 46 // Sentinel string the browser PlanModal includes in the feedback when the user 47 // clicks "teleport back to terminal". Plan text follows on the next line. 48 export const ULTRAPLAN_TELEPORT_SENTINEL = '__ULTRAPLAN_TELEPORT_LOCAL__' 49 50 export type ScanResult = 51 | { kind: 'approved'; plan: string } 52 | { kind: 'teleport'; plan: string } 53 | { kind: 'rejected'; id: string } 54 | { kind: 'pending' } 55 | { kind: 'terminated'; subtype: string } 56 | { kind: 'unchanged' } 57 58 /** 59 * Pill/detail-view state derived from the event stream. Transitions: 60 * running → (turn ends, no ExitPlanMode) → needs_input 61 * needs_input → (user replies in browser) → running 62 * running → (ExitPlanMode emitted, no result yet) → plan_ready 63 * plan_ready → (rejected) → running 64 * plan_ready → (approved) → poll resolves, pill removed 65 */ 66 export type UltraplanPhase = 'running' | 'needs_input' | 'plan_ready' 67 68 /** 69 * Pure stateful classifier for the CCR event stream. Ingests SDKMessage[] 70 * batches (as delivered by pollRemoteSessionEvents) and returns the current 71 * ExitPlanMode verdict. No I/O, no timers — feed it synthetic or recorded 72 * events for unit tests and offline replay. 73 * 74 * Precedence (approved > terminated > rejected > pending > unchanged): 75 * pollRemoteSessionEvents paginates up to 50 pages per call, so one ingest 76 * can span seconds of session activity. A batch may contain both an approved 77 * tool_result AND a subsequent {type:'result'} (user approved, then remote 78 * crashed). The approved plan is real and in threadstore — don't drop it. 79 */ 80 export class ExitPlanModeScanner { 81 private exitPlanCalls: string[] = [] 82 private results = new Map<string, ToolResultBlockParam>() 83 private rejectedIds = new Set<string>() 84 private terminated: { subtype: string } | null = null 85 private rescanAfterRejection = false 86 everSeenPending = false 87 88 get rejectCount(): number { 89 return this.rejectedIds.size 90 } 91 92 /** 93 * True when an ExitPlanMode tool_use exists with no tool_result yet — 94 * the remote is showing the approval dialog in the browser. 95 */ 96 get hasPendingPlan(): boolean { 97 const id = this.exitPlanCalls.findLast(c => !this.rejectedIds.has(c)) 98 return id !== undefined && !this.results.has(id) 99 } 100 101 ingest(newEvents: SDKMessage[]): ScanResult { 102 for (const m of newEvents) { 103 if (m.type === 'assistant') { 104 for (const block of m.message.content) { 105 if (block.type !== 'tool_use') continue 106 const tu = block as ToolUseBlock 107 if (tu.name === EXIT_PLAN_MODE_V2_TOOL_NAME) { 108 this.exitPlanCalls.push(tu.id) 109 } 110 } 111 } else if (m.type === 'user') { 112 const content = m.message.content 113 if (!Array.isArray(content)) continue 114 for (const block of content) { 115 if (block.type === 'tool_result') { 116 this.results.set(block.tool_use_id, block) 117 } 118 } 119 } else if (m.type === 'result' && m.subtype !== 'success') { 120 // result(success) fires after EVERY CCR turn 121 // If the remote asks a clarifying question (turn ends without 122 // ExitPlanMode), we must keep polling — the user can reply in 123 // the browser and reach ExitPlanMode in a later turn. 124 // Only error subtypes (error_during_execution, error_max_turns, 125 // etc.) mean the session is actually dead. 126 this.terminated = { subtype: m.subtype } 127 } 128 } 129 130 // Skip-scan when nothing could have moved the target: no new events, no 131 // rejection last tick. A rejection moves the newest-non-rejected target. 132 const shouldScan = newEvents.length > 0 || this.rescanAfterRejection 133 this.rescanAfterRejection = false 134 135 let found: 136 | { kind: 'approved'; plan: string } 137 | { kind: 'teleport'; plan: string } 138 | { kind: 'rejected'; id: string } 139 | { kind: 'pending' } 140 | null = null 141 if (shouldScan) { 142 for (let i = this.exitPlanCalls.length - 1; i >= 0; i--) { 143 const id = this.exitPlanCalls[i]! 144 if (this.rejectedIds.has(id)) continue 145 const tr = this.results.get(id) 146 if (!tr) { 147 found = { kind: 'pending' } 148 } else if (tr.is_error === true) { 149 const teleportPlan = extractTeleportPlan(tr.content) 150 found = 151 teleportPlan !== null 152 ? { kind: 'teleport', plan: teleportPlan } 153 : { kind: 'rejected', id } 154 } else { 155 found = { kind: 'approved', plan: extractApprovedPlan(tr.content) } 156 } 157 break 158 } 159 if (found?.kind === 'approved' || found?.kind === 'teleport') return found 160 } 161 162 // Bookkeeping before the terminated check — a batch can contain BOTH a 163 // rejected tool_result and a {type:'result'}; rejectCount must reflect 164 // the rejection even though terminated takes return precedence. 165 if (found?.kind === 'rejected') { 166 this.rejectedIds.add(found.id) 167 this.rescanAfterRejection = true 168 } 169 if (this.terminated) { 170 return { kind: 'terminated', subtype: this.terminated.subtype } 171 } 172 if (found?.kind === 'rejected') { 173 return found 174 } 175 if (found?.kind === 'pending') { 176 this.everSeenPending = true 177 return found 178 } 179 return { kind: 'unchanged' } 180 } 181 } 182 183 export type PollResult = { 184 plan: string 185 rejectCount: number 186 /** 'local' = user clicked teleport (execute here, archive remote). 'remote' = user approved in-CCR execution (don't archive). */ 187 executionTarget: 'local' | 'remote' 188 } 189 190 // Returns the approved plan text and where the user wants it executed. 191 // 'approved' scrapes from the "## Approved Plan:" marker (ExitPlanModeV2Tool 192 // default branch) — the model writes plan to a file inside CCR and calls 193 // ExitPlanMode({allowedPrompts}), so input.plan is never in threadstore. 194 // 'teleport' scrapes from the ULTRAPLAN_TELEPORT_SENTINEL in a deny tool_result — 195 // browser sends a rejection so the remote stays in plan mode, with the plan 196 // text embedded in the feedback. Normal rejections (is_error === true, no 197 // sentinel) are tracked and skipped so the user can iterate in the browser. 198 export async function pollForApprovedExitPlanMode( 199 sessionId: string, 200 timeoutMs: number, 201 onPhaseChange?: (phase: UltraplanPhase) => void, 202 shouldStop?: () => boolean, 203 ): Promise<PollResult> { 204 const deadline = Date.now() + timeoutMs 205 const scanner = new ExitPlanModeScanner() 206 let cursor: string | null = null 207 let failures = 0 208 let lastPhase: UltraplanPhase = 'running' 209 210 while (Date.now() < deadline) { 211 if (shouldStop?.()) { 212 throw new UltraplanPollError( 213 'poll stopped by caller', 214 'stopped', 215 scanner.rejectCount, 216 ) 217 } 218 let newEvents: SDKMessage[] 219 let sessionStatus: PollRemoteSessionResponse['sessionStatus'] 220 try { 221 // Metadata fetch (session_status) is the needs_input signal — 222 // threadstore doesn't persist result(success) turn-end events, so 223 // idle status is the only authoritative "remote is waiting" marker. 224 const resp = await pollRemoteSessionEvents(sessionId, cursor) 225 newEvents = resp.newEvents 226 cursor = resp.lastEventId 227 sessionStatus = resp.sessionStatus 228 failures = 0 229 } catch (e) { 230 const transient = isTransientNetworkError(e) 231 if (!transient || ++failures >= MAX_CONSECUTIVE_FAILURES) { 232 throw new UltraplanPollError( 233 e instanceof Error ? e.message : String(e), 234 'network_or_unknown', 235 scanner.rejectCount, 236 { cause: e }, 237 ) 238 } 239 await sleep(POLL_INTERVAL_MS) 240 continue 241 } 242 243 let result: ScanResult 244 try { 245 result = scanner.ingest(newEvents) 246 } catch (e) { 247 throw new UltraplanPollError( 248 e instanceof Error ? e.message : String(e), 249 'extract_marker_missing', 250 scanner.rejectCount, 251 ) 252 } 253 if (result.kind === 'approved') { 254 return { 255 plan: result.plan, 256 rejectCount: scanner.rejectCount, 257 executionTarget: 'remote', 258 } 259 } 260 if (result.kind === 'teleport') { 261 return { 262 plan: result.plan, 263 rejectCount: scanner.rejectCount, 264 executionTarget: 'local', 265 } 266 } 267 if (result.kind === 'terminated') { 268 throw new UltraplanPollError( 269 `remote session ended (${result.subtype}) before plan approval`, 270 'terminated', 271 scanner.rejectCount, 272 ) 273 } 274 // plan_ready from the event stream wins; otherwise idle session status 275 // means the remote asked a question and is waiting for a browser reply. 276 // requires_action with no pending plan is also needs_input — the remote 277 // may be blocked on a non-ExitPlanMode permission prompt. 278 // CCR briefly flips to 'idle' between tool turns (see STABLE_IDLE_POLLS 279 // in RemoteAgentTask). Only trust idle when no new events arrived — 280 // events flowing means the session is working regardless of the status 281 // snapshot. This also makes needs_input → running snap back on the first 282 // poll that sees the user's reply event, even if session_status lags. 283 const quietIdle = 284 (sessionStatus === 'idle' || sessionStatus === 'requires_action') && 285 newEvents.length === 0 286 const phase: UltraplanPhase = scanner.hasPendingPlan 287 ? 'plan_ready' 288 : quietIdle 289 ? 'needs_input' 290 : 'running' 291 if (phase !== lastPhase) { 292 logForDebugging(`[ultraplan] phase ${lastPhase} → ${phase}`) 293 lastPhase = phase 294 onPhaseChange?.(phase) 295 } 296 await sleep(POLL_INTERVAL_MS) 297 } 298 299 throw new UltraplanPollError( 300 scanner.everSeenPending 301 ? `no approval after ${timeoutMs / 1000}s` 302 : `ExitPlanMode never reached after ${timeoutMs / 1000}s (the remote container failed to start, or session ID mismatch?)`, 303 scanner.everSeenPending ? 'timeout_pending' : 'timeout_no_plan', 304 scanner.rejectCount, 305 ) 306 } 307 308 // tool_result content may be string or [{type:'text',text}] depending on 309 // threadstore encoding. 310 function contentToText(content: ToolResultBlockParam['content']): string { 311 return typeof content === 'string' 312 ? content 313 : Array.isArray(content) 314 ? content.map(b => ('text' in b ? b.text : '')).join('') 315 : '' 316 } 317 318 // Extracts the plan text after the ULTRAPLAN_TELEPORT_SENTINEL marker. 319 // Returns null when the sentinel is absent — callers treat null as a normal 320 // user rejection (scanner falls through to { kind: 'rejected' }). 321 function extractTeleportPlan( 322 content: ToolResultBlockParam['content'], 323 ): string | null { 324 const text = contentToText(content) 325 const marker = `${ULTRAPLAN_TELEPORT_SENTINEL}\n` 326 const idx = text.indexOf(marker) 327 if (idx === -1) return null 328 return text.slice(idx + marker.length).trimEnd() 329 } 330 331 // Plan is echoed in tool_result content as "## Approved Plan:\n<text>" or 332 // "## Approved Plan (edited by user):\n<text>" (ExitPlanModeV2Tool). 333 function extractApprovedPlan(content: ToolResultBlockParam['content']): string { 334 const text = contentToText(content) 335 // Try both markers — edited plans use a different label. 336 const markers = [ 337 '## Approved Plan (edited by user):\n', 338 '## Approved Plan:\n', 339 ] 340 for (const marker of markers) { 341 const idx = text.indexOf(marker) 342 if (idx !== -1) { 343 return text.slice(idx + marker.length).trimEnd() 344 } 345 } 346 throw new Error( 347 `ExitPlanMode approved but tool_result has no "## Approved Plan:" marker — remote may have hit the empty-plan or isAgent branch. Content preview: ${text.slice(0, 200)}`, 348 ) 349 }