/ src / lib / server / agents / subagent-runtime.ts
subagent-runtime.ts
  1  /**
  2   * Native Subagent Runtime
  3   *
  4   * Replaces CLI bridge delegation with direct, in-process subagent execution.
  5   * Uses lineage nodes for lifecycle state tracking (no separate state machine
  6   * registry). Provides a handle registry for promise-based waiting.
  7   */
  8  
  9  import { genId } from '@/lib/id'
 10  import { DEFAULT_DELEGATION_MAX_DEPTH } from '@/lib/runtime/runtime-loop'
 11  import { loadAgents } from '@/lib/server/agents/agent-repository'
 12  import { enqueueSessionRun, type EnqueueSessionRunResult } from '@/lib/server/runtime/session-run-manager'
 13  import { loadRuntimeSettings } from '@/lib/server/runtime/runtime-settings'
 14  import { applyResolvedRoute, resolvePrimaryAgentRoute } from '@/lib/server/agents/agent-runtime-config'
 15  import { resolveSubagentBrowserProfileId } from '@/lib/server/session-tools/subagent'
 16  import { runCapabilityHook, runCapabilitySubagentSpawning } from '@/lib/server/native-capabilities'
 17  import {
 18    appendDelegationCheckpoint,
 19    completeDelegationJob,
 20    createDelegationJob,
 21    failDelegationJob,
 22    getDelegationJob,
 23    registerDelegationRuntime,
 24    startDelegationJob,
 25  } from '@/lib/server/agents/delegation-jobs'
 26  import {
 27    createLineageNode,
 28    completeLineageNode,
 29    failLineageNode,
 30    cancelLineageNode,
 31    getLineageNode,
 32    getLineageNodeBySession,
 33    getAncestors,
 34    getChildren,
 35    getDescendants,
 36    buildLineageTree,
 37    cancelSubtree,
 38    setLineageStatus,
 39    isTerminalState,
 40    cleanupTerminalNodes,
 41    type LineageNode,
 42    type LineageTree,
 43    type SubagentState,
 44  } from '@/lib/server/agents/subagent-lineage'
 45  import { errorMessage, hmrSingleton } from '@/lib/shared-utils'
 46  import { log } from '@/lib/server/logger'
 47  import { debug } from '@/lib/server/debug'
 48  import { logExecution } from '@/lib/server/execution-log'
 49  import { enqueueSystemEvent } from '@/lib/server/runtime/system-events'
 50  import { getEnabledCapabilityIds, splitCapabilityIds } from '@/lib/capability-selection'
 51  import { getSession, loadSessions, saveSession } from '@/lib/server/sessions/session-repository'
 52  import { ensureRunContext } from '@/lib/server/run-context'
 53  import { buildExecutionBrief, serializeExecutionBriefForDelegation } from '@/lib/server/execution-brief'
 54  
 55  // ---------------------------------------------------------------------------
 56  // Types
 57  // ---------------------------------------------------------------------------
 58  
 59  export interface SpawnSubagentInput {
 60    /** Agent to spawn */
 61    agentId: string
 62    /** Message/task for the subagent */
 63    message: string
 64    /** Working directory override */
 65    cwd?: string
 66    /** Inherit parent's browser profile */
 67    shareBrowserProfile?: boolean
 68    /** Inherit parent session's extensions/tools (default true) */
 69    inheritExtensions?: boolean
 70    /** Caller-owned: controls whether the caller awaits `handle.promise`. Not read by the runtime. */
 71    waitForCompletion?: boolean
 72    /** Timeout in seconds (default 600). Set 0 to disable. */
 73    timeoutSec?: number
 74    /** Optional shared execution lane key for serializing sibling runs. */
 75    executionGroupKey?: string
 76    /** When true, skip the ancestor-agent cycle check (A → B → A). Default false. */
 77    allowCycle?: boolean
 78  }
 79  
 80  export interface SubagentHandle {
 81    /** Delegation job ID */
 82    jobId: string
 83    /** Child session ID */
 84    sessionId: string
 85    /** Lineage node ID */
 86    lineageId: string
 87    /** Agent info */
 88    agentId: string
 89    agentName: string
 90    /** Session run handle (for abort) */
 91    run: EnqueueSessionRunResult
 92    /** Promise that resolves when the subagent completes */
 93    promise: Promise<SubagentResult>
 94  }
 95  
 96  export interface SubagentResult {
 97    jobId: string
 98    sessionId: string
 99    lineageId: string
100    agentId: string
101    agentName: string
102    status: 'completed' | 'failed' | 'cancelled' | 'timed_out'
103    response: string | null
104    error: string | null
105    depth: number
106    parentSessionId: string | null
107    childCount: number
108    durationMs: number
109  }
110  
111  export interface SubagentContext {
112    sessionId?: string
113    cwd: string
114    /** Pre-loaded sessions map — avoids repeated SQLite reads in batch/swarm */
115    _sessions?: Record<string, unknown>
116  }
117  
118  // ---------------------------------------------------------------------------
119  // Handle Registry (for promise-based waiting instead of polling)
120  // ---------------------------------------------------------------------------
121  
122  const handleRegistry = hmrSingleton('__swarmclaw_subagent_handles__', () => new Map<string, SubagentHandle>())
123  
124  /** Retrieve a handle by job ID (for promise-based waiting). */
125  export function getHandle(jobId: string): SubagentHandle | null {
126    return handleRegistry.get(jobId) ?? null
127  }
128  
129  // ---------------------------------------------------------------------------
130  // Extension Inheritance
131  // ---------------------------------------------------------------------------
132  
133  /**
134   * Merge parent session extensions with agent-defined extensions.
135   * Agent extensions take precedence (listed first), parent extensions fill in gaps.
136   * Case-insensitive deduplication, original casing preserved.
137   */
138  function mergeCapabilities(
139    agentCapabilities: string[],
140    parentSession: Record<string, unknown> | null | undefined,
141  ): string[] {
142    const parentCapabilities = getEnabledCapabilityIds(parentSession as { tools?: string[] | null, extensions?: string[] | null } | null)
143  
144    if (parentCapabilities.length === 0) return agentCapabilities
145    if (agentCapabilities.length === 0) return [...parentCapabilities]
146  
147    const seen = new Set<string>()
148    const merged: string[] = []
149    for (const id of [...agentCapabilities, ...parentCapabilities]) {
150      const trimmed = typeof id === 'string' ? id.trim() : ''
151      const normalized = trimmed.toLowerCase()
152      if (normalized && !seen.has(normalized)) {
153        seen.add(normalized)
154        merged.push(trimmed)
155      }
156    }
157    return merged
158  }
159  
160  // ---------------------------------------------------------------------------
161  // Depth Guard
162  // ---------------------------------------------------------------------------
163  
164  export function getSessionDepth(
165    sessionId: string | undefined,
166    maxDepth: number,
167    sessions?: Record<string, unknown>,
168  ): number {
169    if (!sessionId) return 0
170    const allSessions = sessions ?? loadSessions()
171    const session = allSessions[sessionId] as unknown as Record<string, unknown> | undefined
172    // Use stored delegationDepth if available (O(1) vs O(depth) chain walk)
173    if (session && typeof session.delegationDepth === 'number' && session.delegationDepth >= 0) {
174      return session.delegationDepth
175    }
176    // Fallback: walk the parent chain
177    let depth = 0
178    let current = sessionId
179    while (current && depth < maxDepth + 1) {
180      const s = allSessions[current] as unknown as Record<string, unknown> | undefined
181      if (!s?.parentSessionId) break
182      current = s.parentSessionId as string
183      depth++
184    }
185    return depth
186  }
187  
188  /**
189   * Collect agentIds of every session in the parent chain including the given
190   * session. Used to detect delegation cycles (A → B → A) before spawning.
191   */
192  export function collectAncestorAgentIds(
193    sessionId: string | undefined,
194    sessions: Record<string, unknown>,
195    limit = 32,
196  ): string[] {
197    if (!sessionId) return []
198    const ids: string[] = []
199    let current: string | undefined = sessionId
200    const visited = new Set<string>()
201    while (current && ids.length < limit && !visited.has(current)) {
202      visited.add(current)
203      const s = sessions[current] as Record<string, unknown> | undefined
204      const agentId = typeof s?.agentId === 'string' ? s.agentId.trim() : ''
205      if (agentId) ids.push(agentId)
206      const parentId = typeof s?.parentSessionId === 'string' ? s.parentSessionId : null
207      if (!parentId) break
208      current = parentId
209    }
210    return ids
211  }
212  
213  // ---------------------------------------------------------------------------
214  // Core: Spawn a Native Subagent
215  // ---------------------------------------------------------------------------
216  
217  export function spawnSubagent(
218    input: SpawnSubagentInput,
219    context: SubagentContext,
220  ): Promise<SubagentHandle> {
221    return spawnSubagentImpl(input, context)
222  }
223  
224  async function spawnSubagentImpl(
225    input: SpawnSubagentInput,
226    context: SubagentContext,
227  ): Promise<SubagentHandle> {
228    const runtime = loadRuntimeSettings()
229    const maxDepth = runtime.delegationMaxDepth || DEFAULT_DELEGATION_MAX_DEPTH
230    const agents = loadAgents()
231    const agent = agents[input.agentId]
232  
233    if (!agent) {
234      log.warn('subagent', 'Spawn rejected: agent not found', { agentId: input.agentId })
235      throw new Error(`Agent "${input.agentId}" not found.`)
236    }
237  
238    // Use cached sessions if available (batch/swarm pass this to avoid N reads)
239    const sessions = (context._sessions ?? loadSessions()) as unknown as Record<string, Record<string, unknown>>
240    const depth = getSessionDepth(context.sessionId, maxDepth, sessions)
241    if (depth >= maxDepth) {
242      log.warn('subagent', 'Spawn rejected: max depth exceeded', { agentId: input.agentId, depth, maxDepth })
243      throw new Error(`Max subagent depth (${maxDepth}) reached.`)
244    }
245    if (input.allowCycle !== true && context.sessionId) {
246      const ancestorAgentIds = collectAncestorAgentIds(context.sessionId, sessions)
247      if (ancestorAgentIds.includes(input.agentId)) {
248        log.warn('subagent', 'Spawn rejected: delegation cycle', { agentId: input.agentId, chain: ancestorAgentIds })
249        throw new Error(
250          `Delegation cycle: agent "${input.agentId}" is already active higher in this chain. `
251          + 'Pick a different sibling agent, or pass allowCycle=true to override.',
252        )
253      }
254    }
255    const parent = context.sessionId ? sessions[context.sessionId] : null
256    const parentExtensions = getEnabledCapabilityIds(parent as { tools?: string[] | null, extensions?: string[] | null } | null)
257    const spawningResult = await runCapabilitySubagentSpawning(
258      {
259        parentSessionId: context.sessionId || null,
260        agentId: input.agentId,
261        agentName: agent.name,
262        message: input.message,
263        cwd: input.cwd || context.cwd,
264        mode: 'run',
265        threadRequested: false,
266      },
267      { enabledIds: parentExtensions },
268    )
269    if (spawningResult.status === 'error') {
270      throw new Error(spawningResult.error || 'Subagent spawn blocked by extension hook')
271    }
272  
273    // 1. Create delegation job
274    const job = createDelegationJob({
275      kind: 'subagent',
276      parentSessionId: context.sessionId || null,
277      agentId: input.agentId,
278      task: input.message,
279      cwd: input.cwd || context.cwd,
280    })
281    appendDelegationCheckpoint(job.id, `Initializing subagent ${agent.name}`, 'queued')
282  
283    // 2. Create child session
284    const sid = genId()
285    const now = Date.now()
286    const browserProfileId = resolveSubagentBrowserProfileId(
287      parent,
288      sid,
289      input.shareBrowserProfile === true,
290    )
291  
292    const agentExtensions = getEnabledCapabilityIds(agent)
293    const effectiveExtensions = input.inheritExtensions === false
294      ? agentExtensions
295      : mergeCapabilities(agentExtensions, parent)
296    const effectiveSelection = splitCapabilityIds(effectiveExtensions)
297  
298    const nextSession = {
299      id: sid,
300      name: `subagent-${agent.name}`,
301      cwd: input.cwd || context.cwd,
302      user: 'agent',
303      provider: agent.provider,
304      model: agent.model,
305      credentialId: agent.credentialId || null,
306      messages: [],
307      createdAt: now,
308      lastActiveAt: now,
309      sessionType: 'delegated',
310      agentId: agent.id,
311      parentSessionId: context.sessionId || null,
312      delegationDepth: depth + 1,
313      tools: effectiveSelection.tools,
314      extensions: effectiveSelection.extensions,
315      browserProfileId,
316    }
317    sessions[sid] = applyResolvedRoute(nextSession, resolvePrimaryAgentRoute(agent))
318  
319    // Enrich child session with parent's RunContext for delegation handoff
320    const delegationContext = parent ? serializeExecutionBriefForDelegation(buildExecutionBrief({ sessionId: context.sessionId })) : null
321    if (delegationContext) {
322      const childCtx = ensureRunContext(null)
323      childCtx.parentContext = delegationContext
324      childCtx.objective = input.message.slice(0, 900)
325      sessions[sid].runContext = childCtx
326    }
327  
328    saveSession(sid, sessions[sid])
329  
330    log.info('subagent', 'Spawning', { agentId: agent.id, agentName: agent.name, depth: depth + 1, jobId: job.id, sessionId: sid })
331    logExecution(sid, 'delegation_start', `Subagent spawning: ${agent.name}`, {
332      detail: { agentId: agent.id, depth: depth + 1, jobId: job.id, parentSessionId: context.sessionId },
333    })
334  
335    // 3. Create lineage node (starts in 'initializing')
336    const lineageNode = createLineageNode({
337      sessionId: sid,
338      agentId: agent.id,
339      agentName: agent.name,
340      parentSessionId: context.sessionId || null,
341      jobId: job.id,
342      task: input.message,
343      cwd: input.cwd || context.cwd,
344    })
345  
346    // 4. Mark lineage node ready
347    setLineageStatus(lineageNode.id, 'ready')
348  
349    // 5. Start delegation job
350    startDelegationJob(job.id, {
351      childSessionId: sid,
352      agentId: agent.id,
353      agentName: agent.name,
354      cwd: input.cwd || context.cwd,
355    })
356    appendDelegationCheckpoint(job.id, `Created child session ${sid}`, 'running')
357  
358    // 6. Mark lineage node running
359    setLineageStatus(lineageNode.id, 'running')
360  
361    // 7. Enqueue session run (native execution — no CLI)
362    const run = enqueueSessionRun({
363      sessionId: sid,
364      message: input.message,
365      internal: true,
366      source: 'subagent',
367      mode: 'followup',
368      executionGroupKey: input.executionGroupKey,
369    })
370    await runCapabilityHook(
371      'subagentSpawned',
372      {
373        parentSessionId: context.sessionId || null,
374        childSessionId: sid,
375        agentId: agent.id,
376        agentName: agent.name,
377        runId: run.runId,
378        mode: 'run',
379        threadRequested: false,
380      },
381      { enabledIds: parentExtensions },
382    )
383  
384    // 8. Register runtime handle for cancellation
385    registerDelegationRuntime(job.id, {
386      cancel: () => {
387        run.abort()
388        const node = getLineageNode(lineageNode.id)
389        if (node && !isTerminalState(node.status)) {
390          cancelLineageNode(lineageNode.id)
391        }
392      },
393    })
394  
395    // 9. Build result promise (with optional timeout)
396    const DEFAULT_TIMEOUT_SEC = 600 // 10 minutes
397    const effectiveTimeoutSec = input.timeoutSec ?? DEFAULT_TIMEOUT_SEC
398    const timeoutPromise = effectiveTimeoutSec > 0
399      ? new Promise<never>((_, reject) => {
400          setTimeout(() => reject(new Error('__subagent_timeout__')), effectiveTimeoutSec * 1000)
401        })
402      : null
403  
404    const racedPromise = timeoutPromise
405      ? Promise.race([run.promise, timeoutPromise])
406      : run.promise
407  
408    const resultPromise = racedPromise
409      .then(async (result): Promise<SubagentResult> => {
410        const latest = getDelegationJob(job.id)
411        const node = getLineageNode(lineageNode.id)
412        let subagentResult: SubagentResult
413        if (latest?.status === 'cancelled' || node?.status === 'cancelled') {
414          subagentResult = buildResult(job.id, sid, lineageNode, agent, 'cancelled', null, null)
415        } else {
416          const responseText = (result.text || '').slice(0, 32_000)
417          completeLineageNode(lineageNode.id, responseText.slice(0, 1000))
418          appendDelegationCheckpoint(job.id, 'Child session completed', 'completed')
419          completeDelegationJob(job.id, responseText, { childSessionId: sid })
420  
421          subagentResult = buildResult(job.id, sid, lineageNode, agent, 'completed', responseText, null)
422        }
423  
424        log.info('subagent', 'Completed', { agentId: agent.id, agentName: agent.name, durationMs: subagentResult.durationMs, status: subagentResult.status })
425        debug.verbose('subagent', 'Result', { jobId: job.id, response: subagentResult.response?.slice(0, 2000) })
426  
427        await runCapabilityHook(
428          'subagentEnded',
429          {
430            parentSessionId: context.sessionId || null,
431            childSessionId: sid,
432            agentId: agent.id,
433            agentName: agent.name,
434            status: subagentResult.status,
435            response: subagentResult.response,
436            error: subagentResult.error,
437            durationMs: subagentResult.durationMs,
438          },
439          { enabledIds: parentExtensions },
440        )
441        // Auto-announce completion to parent session
442        if (context.sessionId) {
443          const preview = (subagentResult.response || subagentResult.error || '').slice(0, 200)
444          enqueueSystemEvent(
445            context.sessionId,
446            `[subagent_completed] ${agent.name} (job ${job.id}): ${subagentResult.status}. ${preview}`,
447            `subagent:${job.id}`,
448          )
449        }
450        const completedSession = getSession(sid)
451        await runCapabilityHook(
452          'sessionEnd',
453          {
454            sessionId: sid,
455            session: completedSession,
456            messageCount: Array.isArray(completedSession?.messages) ? completedSession.messages.length : 0,
457            durationMs: subagentResult.durationMs,
458            reason: subagentResult.status,
459          },
460          { enabledIds: parentExtensions },
461        )
462        return subagentResult
463      })
464      .catch(async (err: unknown): Promise<SubagentResult> => {
465        const message = errorMessage(err)
466        const isTimeout = message === '__subagent_timeout__'
467        const latest = getDelegationJob(job.id)
468        const node = getLineageNode(lineageNode.id)
469        let subagentResult: SubagentResult
470        if (latest?.status === 'cancelled' || node?.status === 'cancelled') {
471          subagentResult = buildResult(job.id, sid, lineageNode, agent, 'cancelled', null, null)
472        } else if (isTimeout) {
473          // Abort the underlying run on timeout
474          run.abort()
475          const timeoutMsg = `Subagent timed out after ${effectiveTimeoutSec}s`
476          failLineageNode(lineageNode.id, timeoutMsg)
477          appendDelegationCheckpoint(job.id, timeoutMsg, 'failed')
478          failDelegationJob(job.id, timeoutMsg, { childSessionId: sid })
479          subagentResult = buildResult(job.id, sid, lineageNode, agent, 'timed_out', null, timeoutMsg)
480        } else {
481          failLineageNode(lineageNode.id, message)
482          appendDelegationCheckpoint(job.id, `Child session failed: ${message}`, 'failed')
483          failDelegationJob(job.id, message, { childSessionId: sid })
484  
485          subagentResult = buildResult(job.id, sid, lineageNode, agent, 'failed', null, message)
486        }
487  
488        log.warn('subagent', 'Failed', { agentId: agent.id, agentName: agent.name, error: message })
489  
490        await runCapabilityHook(
491          'subagentEnded',
492          {
493            parentSessionId: context.sessionId || null,
494            childSessionId: sid,
495            agentId: agent.id,
496            agentName: agent.name,
497            status: subagentResult.status,
498            response: subagentResult.response,
499            error: subagentResult.error,
500            durationMs: subagentResult.durationMs,
501          },
502          { enabledIds: parentExtensions },
503        )
504        // Auto-announce failure to parent session
505        if (context.sessionId) {
506          const preview = (subagentResult.error || '').slice(0, 200)
507          enqueueSystemEvent(
508            context.sessionId,
509            `[subagent_completed] ${agent.name} (job ${job.id}): ${subagentResult.status}. ${preview}`,
510            `subagent:${job.id}`,
511          )
512        }
513        const failedSession = getSession(sid)
514        await runCapabilityHook(
515          'sessionEnd',
516          {
517            sessionId: sid,
518            session: failedSession,
519            messageCount: Array.isArray(failedSession?.messages) ? failedSession.messages.length : 0,
520            durationMs: subagentResult.durationMs,
521            reason: subagentResult.status,
522          },
523          { enabledIds: parentExtensions },
524        )
525        return subagentResult
526      })
527  
528    const handle: SubagentHandle = {
529      jobId: job.id,
530      sessionId: sid,
531      lineageId: lineageNode.id,
532      agentId: agent.id,
533      agentName: agent.name,
534      run,
535      promise: resultPromise,
536    }
537  
538    // Register handle for promise-based waiting
539    handleRegistry.set(job.id, handle)
540  
541    return handle
542  }
543  
544  // ---------------------------------------------------------------------------
545  // Result Builder
546  // ---------------------------------------------------------------------------
547  
548  function buildResult(
549    jobId: string,
550    sessionId: string,
551    lineageNode: LineageNode,
552    agent: { id?: string; name?: string },
553    status: SubagentResult['status'],
554    response: string | null,
555    error: string | null,
556  ): SubagentResult {
557    const children = getChildren(lineageNode.id)
558    return {
559      jobId,
560      sessionId,
561      lineageId: lineageNode.id,
562      agentId: String(agent.id ?? ''),
563      agentName: String(agent.name ?? ''),
564      status,
565      response,
566      error,
567      depth: lineageNode.depth,
568      parentSessionId: lineageNode.parentSessionId,
569      childCount: children.length,
570      durationMs: Date.now() - lineageNode.createdAt,
571    }
572  }
573  
574  // ---------------------------------------------------------------------------
575  // Query helpers (re-exported for convenience)
576  // ---------------------------------------------------------------------------
577  
578  export {
579    getLineageNodeBySession,
580    getAncestors,
581    getChildren,
582    getDescendants,
583    buildLineageTree,
584    cancelSubtree,
585    mergeCapabilities as _mergeExtensions,
586  }
587  
588  export type {
589    LineageNode,
590    LineageTree,
591    SubagentState,
592  }
593  
594  // ---------------------------------------------------------------------------
595  // Cancel a running subagent by session ID
596  // ---------------------------------------------------------------------------
597  
598  export function cancelSubagentBySession(sessionId: string): boolean {
599    const node = getLineageNodeBySession(sessionId)
600    if (!node) return false
601  
602    if (!isTerminalState(node.status)) {
603      cancelLineageNode(node.id)
604    }
605  
606    cancelSubtree(node.id)
607    return true
608  }
609  
610  // ---------------------------------------------------------------------------
611  // Cleanup finished subagents (call periodically)
612  // ---------------------------------------------------------------------------
613  
614  export function cleanupFinishedSubagents(maxAgeMs = 30 * 60_000): number {
615    const removedIds = cleanupTerminalNodes(maxAgeMs)
616    const removedSet = new Set(removedIds)
617    // Clean up handle registry entries for removed lineage nodes
618    // Also purge stale handles whose lineage nodes no longer exist (TTL safety net)
619    for (const [jobId, handle] of handleRegistry.entries()) {
620      if (removedSet.has(handle.lineageId)) {
621        handleRegistry.delete(jobId)
622      } else if (!getLineageNode(handle.lineageId)) {
623        // Lineage node already gone — handle is orphaned, clean it up
624        handleRegistry.delete(jobId)
625      }
626    }
627    return removedIds.length
628  }