subagent-runtime.ts
1 /** 2 * Native Subagent Runtime 3 * 4 * Replaces CLI bridge delegation with direct, in-process subagent execution. 5 * Uses lineage nodes for lifecycle state tracking (no separate state machine 6 * registry). Provides a handle registry for promise-based waiting. 7 */ 8 9 import { genId } from '@/lib/id' 10 import { DEFAULT_DELEGATION_MAX_DEPTH } from '@/lib/runtime/runtime-loop' 11 import { loadAgents } from '@/lib/server/agents/agent-repository' 12 import { enqueueSessionRun, type EnqueueSessionRunResult } from '@/lib/server/runtime/session-run-manager' 13 import { loadRuntimeSettings } from '@/lib/server/runtime/runtime-settings' 14 import { applyResolvedRoute, resolvePrimaryAgentRoute } from '@/lib/server/agents/agent-runtime-config' 15 import { resolveSubagentBrowserProfileId } from '@/lib/server/session-tools/subagent' 16 import { runCapabilityHook, runCapabilitySubagentSpawning } from '@/lib/server/native-capabilities' 17 import { 18 appendDelegationCheckpoint, 19 completeDelegationJob, 20 createDelegationJob, 21 failDelegationJob, 22 getDelegationJob, 23 registerDelegationRuntime, 24 startDelegationJob, 25 } from '@/lib/server/agents/delegation-jobs' 26 import { 27 createLineageNode, 28 completeLineageNode, 29 failLineageNode, 30 cancelLineageNode, 31 getLineageNode, 32 getLineageNodeBySession, 33 getAncestors, 34 getChildren, 35 getDescendants, 36 buildLineageTree, 37 cancelSubtree, 38 setLineageStatus, 39 isTerminalState, 40 cleanupTerminalNodes, 41 type LineageNode, 42 type LineageTree, 43 type SubagentState, 44 } from '@/lib/server/agents/subagent-lineage' 45 import { errorMessage, hmrSingleton } from '@/lib/shared-utils' 46 import { log } from '@/lib/server/logger' 47 import { debug } from '@/lib/server/debug' 48 import { logExecution } from '@/lib/server/execution-log' 49 import { enqueueSystemEvent } from '@/lib/server/runtime/system-events' 50 import { getEnabledCapabilityIds, splitCapabilityIds } from '@/lib/capability-selection' 51 import { getSession, loadSessions, saveSession } from '@/lib/server/sessions/session-repository' 52 import { ensureRunContext } from '@/lib/server/run-context' 53 import { buildExecutionBrief, serializeExecutionBriefForDelegation } from '@/lib/server/execution-brief' 54 55 // --------------------------------------------------------------------------- 56 // Types 57 // --------------------------------------------------------------------------- 58 59 export interface SpawnSubagentInput { 60 /** Agent to spawn */ 61 agentId: string 62 /** Message/task for the subagent */ 63 message: string 64 /** Working directory override */ 65 cwd?: string 66 /** Inherit parent's browser profile */ 67 shareBrowserProfile?: boolean 68 /** Inherit parent session's extensions/tools (default true) */ 69 inheritExtensions?: boolean 70 /** Caller-owned: controls whether the caller awaits `handle.promise`. Not read by the runtime. */ 71 waitForCompletion?: boolean 72 /** Timeout in seconds (default 600). Set 0 to disable. */ 73 timeoutSec?: number 74 /** Optional shared execution lane key for serializing sibling runs. */ 75 executionGroupKey?: string 76 /** When true, skip the ancestor-agent cycle check (A → B → A). Default false. */ 77 allowCycle?: boolean 78 } 79 80 export interface SubagentHandle { 81 /** Delegation job ID */ 82 jobId: string 83 /** Child session ID */ 84 sessionId: string 85 /** Lineage node ID */ 86 lineageId: string 87 /** Agent info */ 88 agentId: string 89 agentName: string 90 /** Session run handle (for abort) */ 91 run: EnqueueSessionRunResult 92 /** Promise that resolves when the subagent completes */ 93 promise: Promise<SubagentResult> 94 } 95 96 export interface SubagentResult { 97 jobId: string 98 sessionId: string 99 lineageId: string 100 agentId: string 101 agentName: string 102 status: 'completed' | 'failed' | 'cancelled' | 'timed_out' 103 response: string | null 104 error: string | null 105 depth: number 106 parentSessionId: string | null 107 childCount: number 108 durationMs: number 109 } 110 111 export interface SubagentContext { 112 sessionId?: string 113 cwd: string 114 /** Pre-loaded sessions map — avoids repeated SQLite reads in batch/swarm */ 115 _sessions?: Record<string, unknown> 116 } 117 118 // --------------------------------------------------------------------------- 119 // Handle Registry (for promise-based waiting instead of polling) 120 // --------------------------------------------------------------------------- 121 122 const handleRegistry = hmrSingleton('__swarmclaw_subagent_handles__', () => new Map<string, SubagentHandle>()) 123 124 /** Retrieve a handle by job ID (for promise-based waiting). */ 125 export function getHandle(jobId: string): SubagentHandle | null { 126 return handleRegistry.get(jobId) ?? null 127 } 128 129 // --------------------------------------------------------------------------- 130 // Extension Inheritance 131 // --------------------------------------------------------------------------- 132 133 /** 134 * Merge parent session extensions with agent-defined extensions. 135 * Agent extensions take precedence (listed first), parent extensions fill in gaps. 136 * Case-insensitive deduplication, original casing preserved. 137 */ 138 function mergeCapabilities( 139 agentCapabilities: string[], 140 parentSession: Record<string, unknown> | null | undefined, 141 ): string[] { 142 const parentCapabilities = getEnabledCapabilityIds(parentSession as { tools?: string[] | null, extensions?: string[] | null } | null) 143 144 if (parentCapabilities.length === 0) return agentCapabilities 145 if (agentCapabilities.length === 0) return [...parentCapabilities] 146 147 const seen = new Set<string>() 148 const merged: string[] = [] 149 for (const id of [...agentCapabilities, ...parentCapabilities]) { 150 const trimmed = typeof id === 'string' ? id.trim() : '' 151 const normalized = trimmed.toLowerCase() 152 if (normalized && !seen.has(normalized)) { 153 seen.add(normalized) 154 merged.push(trimmed) 155 } 156 } 157 return merged 158 } 159 160 // --------------------------------------------------------------------------- 161 // Depth Guard 162 // --------------------------------------------------------------------------- 163 164 export function getSessionDepth( 165 sessionId: string | undefined, 166 maxDepth: number, 167 sessions?: Record<string, unknown>, 168 ): number { 169 if (!sessionId) return 0 170 const allSessions = sessions ?? loadSessions() 171 const session = allSessions[sessionId] as unknown as Record<string, unknown> | undefined 172 // Use stored delegationDepth if available (O(1) vs O(depth) chain walk) 173 if (session && typeof session.delegationDepth === 'number' && session.delegationDepth >= 0) { 174 return session.delegationDepth 175 } 176 // Fallback: walk the parent chain 177 let depth = 0 178 let current = sessionId 179 while (current && depth < maxDepth + 1) { 180 const s = allSessions[current] as unknown as Record<string, unknown> | undefined 181 if (!s?.parentSessionId) break 182 current = s.parentSessionId as string 183 depth++ 184 } 185 return depth 186 } 187 188 /** 189 * Collect agentIds of every session in the parent chain including the given 190 * session. Used to detect delegation cycles (A → B → A) before spawning. 191 */ 192 export function collectAncestorAgentIds( 193 sessionId: string | undefined, 194 sessions: Record<string, unknown>, 195 limit = 32, 196 ): string[] { 197 if (!sessionId) return [] 198 const ids: string[] = [] 199 let current: string | undefined = sessionId 200 const visited = new Set<string>() 201 while (current && ids.length < limit && !visited.has(current)) { 202 visited.add(current) 203 const s = sessions[current] as Record<string, unknown> | undefined 204 const agentId = typeof s?.agentId === 'string' ? s.agentId.trim() : '' 205 if (agentId) ids.push(agentId) 206 const parentId = typeof s?.parentSessionId === 'string' ? s.parentSessionId : null 207 if (!parentId) break 208 current = parentId 209 } 210 return ids 211 } 212 213 // --------------------------------------------------------------------------- 214 // Core: Spawn a Native Subagent 215 // --------------------------------------------------------------------------- 216 217 export function spawnSubagent( 218 input: SpawnSubagentInput, 219 context: SubagentContext, 220 ): Promise<SubagentHandle> { 221 return spawnSubagentImpl(input, context) 222 } 223 224 async function spawnSubagentImpl( 225 input: SpawnSubagentInput, 226 context: SubagentContext, 227 ): Promise<SubagentHandle> { 228 const runtime = loadRuntimeSettings() 229 const maxDepth = runtime.delegationMaxDepth || DEFAULT_DELEGATION_MAX_DEPTH 230 const agents = loadAgents() 231 const agent = agents[input.agentId] 232 233 if (!agent) { 234 log.warn('subagent', 'Spawn rejected: agent not found', { agentId: input.agentId }) 235 throw new Error(`Agent "${input.agentId}" not found.`) 236 } 237 238 // Use cached sessions if available (batch/swarm pass this to avoid N reads) 239 const sessions = (context._sessions ?? loadSessions()) as unknown as Record<string, Record<string, unknown>> 240 const depth = getSessionDepth(context.sessionId, maxDepth, sessions) 241 if (depth >= maxDepth) { 242 log.warn('subagent', 'Spawn rejected: max depth exceeded', { agentId: input.agentId, depth, maxDepth }) 243 throw new Error(`Max subagent depth (${maxDepth}) reached.`) 244 } 245 if (input.allowCycle !== true && context.sessionId) { 246 const ancestorAgentIds = collectAncestorAgentIds(context.sessionId, sessions) 247 if (ancestorAgentIds.includes(input.agentId)) { 248 log.warn('subagent', 'Spawn rejected: delegation cycle', { agentId: input.agentId, chain: ancestorAgentIds }) 249 throw new Error( 250 `Delegation cycle: agent "${input.agentId}" is already active higher in this chain. ` 251 + 'Pick a different sibling agent, or pass allowCycle=true to override.', 252 ) 253 } 254 } 255 const parent = context.sessionId ? sessions[context.sessionId] : null 256 const parentExtensions = getEnabledCapabilityIds(parent as { tools?: string[] | null, extensions?: string[] | null } | null) 257 const spawningResult = await runCapabilitySubagentSpawning( 258 { 259 parentSessionId: context.sessionId || null, 260 agentId: input.agentId, 261 agentName: agent.name, 262 message: input.message, 263 cwd: input.cwd || context.cwd, 264 mode: 'run', 265 threadRequested: false, 266 }, 267 { enabledIds: parentExtensions }, 268 ) 269 if (spawningResult.status === 'error') { 270 throw new Error(spawningResult.error || 'Subagent spawn blocked by extension hook') 271 } 272 273 // 1. Create delegation job 274 const job = createDelegationJob({ 275 kind: 'subagent', 276 parentSessionId: context.sessionId || null, 277 agentId: input.agentId, 278 task: input.message, 279 cwd: input.cwd || context.cwd, 280 }) 281 appendDelegationCheckpoint(job.id, `Initializing subagent ${agent.name}`, 'queued') 282 283 // 2. Create child session 284 const sid = genId() 285 const now = Date.now() 286 const browserProfileId = resolveSubagentBrowserProfileId( 287 parent, 288 sid, 289 input.shareBrowserProfile === true, 290 ) 291 292 const agentExtensions = getEnabledCapabilityIds(agent) 293 const effectiveExtensions = input.inheritExtensions === false 294 ? agentExtensions 295 : mergeCapabilities(agentExtensions, parent) 296 const effectiveSelection = splitCapabilityIds(effectiveExtensions) 297 298 const nextSession = { 299 id: sid, 300 name: `subagent-${agent.name}`, 301 cwd: input.cwd || context.cwd, 302 user: 'agent', 303 provider: agent.provider, 304 model: agent.model, 305 credentialId: agent.credentialId || null, 306 messages: [], 307 createdAt: now, 308 lastActiveAt: now, 309 sessionType: 'delegated', 310 agentId: agent.id, 311 parentSessionId: context.sessionId || null, 312 delegationDepth: depth + 1, 313 tools: effectiveSelection.tools, 314 extensions: effectiveSelection.extensions, 315 browserProfileId, 316 } 317 sessions[sid] = applyResolvedRoute(nextSession, resolvePrimaryAgentRoute(agent)) 318 319 // Enrich child session with parent's RunContext for delegation handoff 320 const delegationContext = parent ? serializeExecutionBriefForDelegation(buildExecutionBrief({ sessionId: context.sessionId })) : null 321 if (delegationContext) { 322 const childCtx = ensureRunContext(null) 323 childCtx.parentContext = delegationContext 324 childCtx.objective = input.message.slice(0, 900) 325 sessions[sid].runContext = childCtx 326 } 327 328 saveSession(sid, sessions[sid]) 329 330 log.info('subagent', 'Spawning', { agentId: agent.id, agentName: agent.name, depth: depth + 1, jobId: job.id, sessionId: sid }) 331 logExecution(sid, 'delegation_start', `Subagent spawning: ${agent.name}`, { 332 detail: { agentId: agent.id, depth: depth + 1, jobId: job.id, parentSessionId: context.sessionId }, 333 }) 334 335 // 3. Create lineage node (starts in 'initializing') 336 const lineageNode = createLineageNode({ 337 sessionId: sid, 338 agentId: agent.id, 339 agentName: agent.name, 340 parentSessionId: context.sessionId || null, 341 jobId: job.id, 342 task: input.message, 343 cwd: input.cwd || context.cwd, 344 }) 345 346 // 4. Mark lineage node ready 347 setLineageStatus(lineageNode.id, 'ready') 348 349 // 5. Start delegation job 350 startDelegationJob(job.id, { 351 childSessionId: sid, 352 agentId: agent.id, 353 agentName: agent.name, 354 cwd: input.cwd || context.cwd, 355 }) 356 appendDelegationCheckpoint(job.id, `Created child session ${sid}`, 'running') 357 358 // 6. Mark lineage node running 359 setLineageStatus(lineageNode.id, 'running') 360 361 // 7. Enqueue session run (native execution — no CLI) 362 const run = enqueueSessionRun({ 363 sessionId: sid, 364 message: input.message, 365 internal: true, 366 source: 'subagent', 367 mode: 'followup', 368 executionGroupKey: input.executionGroupKey, 369 }) 370 await runCapabilityHook( 371 'subagentSpawned', 372 { 373 parentSessionId: context.sessionId || null, 374 childSessionId: sid, 375 agentId: agent.id, 376 agentName: agent.name, 377 runId: run.runId, 378 mode: 'run', 379 threadRequested: false, 380 }, 381 { enabledIds: parentExtensions }, 382 ) 383 384 // 8. Register runtime handle for cancellation 385 registerDelegationRuntime(job.id, { 386 cancel: () => { 387 run.abort() 388 const node = getLineageNode(lineageNode.id) 389 if (node && !isTerminalState(node.status)) { 390 cancelLineageNode(lineageNode.id) 391 } 392 }, 393 }) 394 395 // 9. Build result promise (with optional timeout) 396 const DEFAULT_TIMEOUT_SEC = 600 // 10 minutes 397 const effectiveTimeoutSec = input.timeoutSec ?? DEFAULT_TIMEOUT_SEC 398 const timeoutPromise = effectiveTimeoutSec > 0 399 ? new Promise<never>((_, reject) => { 400 setTimeout(() => reject(new Error('__subagent_timeout__')), effectiveTimeoutSec * 1000) 401 }) 402 : null 403 404 const racedPromise = timeoutPromise 405 ? Promise.race([run.promise, timeoutPromise]) 406 : run.promise 407 408 const resultPromise = racedPromise 409 .then(async (result): Promise<SubagentResult> => { 410 const latest = getDelegationJob(job.id) 411 const node = getLineageNode(lineageNode.id) 412 let subagentResult: SubagentResult 413 if (latest?.status === 'cancelled' || node?.status === 'cancelled') { 414 subagentResult = buildResult(job.id, sid, lineageNode, agent, 'cancelled', null, null) 415 } else { 416 const responseText = (result.text || '').slice(0, 32_000) 417 completeLineageNode(lineageNode.id, responseText.slice(0, 1000)) 418 appendDelegationCheckpoint(job.id, 'Child session completed', 'completed') 419 completeDelegationJob(job.id, responseText, { childSessionId: sid }) 420 421 subagentResult = buildResult(job.id, sid, lineageNode, agent, 'completed', responseText, null) 422 } 423 424 log.info('subagent', 'Completed', { agentId: agent.id, agentName: agent.name, durationMs: subagentResult.durationMs, status: subagentResult.status }) 425 debug.verbose('subagent', 'Result', { jobId: job.id, response: subagentResult.response?.slice(0, 2000) }) 426 427 await runCapabilityHook( 428 'subagentEnded', 429 { 430 parentSessionId: context.sessionId || null, 431 childSessionId: sid, 432 agentId: agent.id, 433 agentName: agent.name, 434 status: subagentResult.status, 435 response: subagentResult.response, 436 error: subagentResult.error, 437 durationMs: subagentResult.durationMs, 438 }, 439 { enabledIds: parentExtensions }, 440 ) 441 // Auto-announce completion to parent session 442 if (context.sessionId) { 443 const preview = (subagentResult.response || subagentResult.error || '').slice(0, 200) 444 enqueueSystemEvent( 445 context.sessionId, 446 `[subagent_completed] ${agent.name} (job ${job.id}): ${subagentResult.status}. ${preview}`, 447 `subagent:${job.id}`, 448 ) 449 } 450 const completedSession = getSession(sid) 451 await runCapabilityHook( 452 'sessionEnd', 453 { 454 sessionId: sid, 455 session: completedSession, 456 messageCount: Array.isArray(completedSession?.messages) ? completedSession.messages.length : 0, 457 durationMs: subagentResult.durationMs, 458 reason: subagentResult.status, 459 }, 460 { enabledIds: parentExtensions }, 461 ) 462 return subagentResult 463 }) 464 .catch(async (err: unknown): Promise<SubagentResult> => { 465 const message = errorMessage(err) 466 const isTimeout = message === '__subagent_timeout__' 467 const latest = getDelegationJob(job.id) 468 const node = getLineageNode(lineageNode.id) 469 let subagentResult: SubagentResult 470 if (latest?.status === 'cancelled' || node?.status === 'cancelled') { 471 subagentResult = buildResult(job.id, sid, lineageNode, agent, 'cancelled', null, null) 472 } else if (isTimeout) { 473 // Abort the underlying run on timeout 474 run.abort() 475 const timeoutMsg = `Subagent timed out after ${effectiveTimeoutSec}s` 476 failLineageNode(lineageNode.id, timeoutMsg) 477 appendDelegationCheckpoint(job.id, timeoutMsg, 'failed') 478 failDelegationJob(job.id, timeoutMsg, { childSessionId: sid }) 479 subagentResult = buildResult(job.id, sid, lineageNode, agent, 'timed_out', null, timeoutMsg) 480 } else { 481 failLineageNode(lineageNode.id, message) 482 appendDelegationCheckpoint(job.id, `Child session failed: ${message}`, 'failed') 483 failDelegationJob(job.id, message, { childSessionId: sid }) 484 485 subagentResult = buildResult(job.id, sid, lineageNode, agent, 'failed', null, message) 486 } 487 488 log.warn('subagent', 'Failed', { agentId: agent.id, agentName: agent.name, error: message }) 489 490 await runCapabilityHook( 491 'subagentEnded', 492 { 493 parentSessionId: context.sessionId || null, 494 childSessionId: sid, 495 agentId: agent.id, 496 agentName: agent.name, 497 status: subagentResult.status, 498 response: subagentResult.response, 499 error: subagentResult.error, 500 durationMs: subagentResult.durationMs, 501 }, 502 { enabledIds: parentExtensions }, 503 ) 504 // Auto-announce failure to parent session 505 if (context.sessionId) { 506 const preview = (subagentResult.error || '').slice(0, 200) 507 enqueueSystemEvent( 508 context.sessionId, 509 `[subagent_completed] ${agent.name} (job ${job.id}): ${subagentResult.status}. ${preview}`, 510 `subagent:${job.id}`, 511 ) 512 } 513 const failedSession = getSession(sid) 514 await runCapabilityHook( 515 'sessionEnd', 516 { 517 sessionId: sid, 518 session: failedSession, 519 messageCount: Array.isArray(failedSession?.messages) ? failedSession.messages.length : 0, 520 durationMs: subagentResult.durationMs, 521 reason: subagentResult.status, 522 }, 523 { enabledIds: parentExtensions }, 524 ) 525 return subagentResult 526 }) 527 528 const handle: SubagentHandle = { 529 jobId: job.id, 530 sessionId: sid, 531 lineageId: lineageNode.id, 532 agentId: agent.id, 533 agentName: agent.name, 534 run, 535 promise: resultPromise, 536 } 537 538 // Register handle for promise-based waiting 539 handleRegistry.set(job.id, handle) 540 541 return handle 542 } 543 544 // --------------------------------------------------------------------------- 545 // Result Builder 546 // --------------------------------------------------------------------------- 547 548 function buildResult( 549 jobId: string, 550 sessionId: string, 551 lineageNode: LineageNode, 552 agent: { id?: string; name?: string }, 553 status: SubagentResult['status'], 554 response: string | null, 555 error: string | null, 556 ): SubagentResult { 557 const children = getChildren(lineageNode.id) 558 return { 559 jobId, 560 sessionId, 561 lineageId: lineageNode.id, 562 agentId: String(agent.id ?? ''), 563 agentName: String(agent.name ?? ''), 564 status, 565 response, 566 error, 567 depth: lineageNode.depth, 568 parentSessionId: lineageNode.parentSessionId, 569 childCount: children.length, 570 durationMs: Date.now() - lineageNode.createdAt, 571 } 572 } 573 574 // --------------------------------------------------------------------------- 575 // Query helpers (re-exported for convenience) 576 // --------------------------------------------------------------------------- 577 578 export { 579 getLineageNodeBySession, 580 getAncestors, 581 getChildren, 582 getDescendants, 583 buildLineageTree, 584 cancelSubtree, 585 mergeCapabilities as _mergeExtensions, 586 } 587 588 export type { 589 LineageNode, 590 LineageTree, 591 SubagentState, 592 } 593 594 // --------------------------------------------------------------------------- 595 // Cancel a running subagent by session ID 596 // --------------------------------------------------------------------------- 597 598 export function cancelSubagentBySession(sessionId: string): boolean { 599 const node = getLineageNodeBySession(sessionId) 600 if (!node) return false 601 602 if (!isTerminalState(node.status)) { 603 cancelLineageNode(node.id) 604 } 605 606 cancelSubtree(node.id) 607 return true 608 } 609 610 // --------------------------------------------------------------------------- 611 // Cleanup finished subagents (call periodically) 612 // --------------------------------------------------------------------------- 613 614 export function cleanupFinishedSubagents(maxAgeMs = 30 * 60_000): number { 615 const removedIds = cleanupTerminalNodes(maxAgeMs) 616 const removedSet = new Set(removedIds) 617 // Clean up handle registry entries for removed lineage nodes 618 // Also purge stale handles whose lineage nodes no longer exist (TTL safety net) 619 for (const [jobId, handle] of handleRegistry.entries()) { 620 if (removedSet.has(handle.lineageId)) { 621 handleRegistry.delete(jobId) 622 } else if (!getLineageNode(handle.lineageId)) { 623 // Lineage node already gone — handle is orphaned, clean it up 624 handleRegistry.delete(jobId) 625 } 626 } 627 return removedIds.length 628 }