/ tools / delegate_tool.py
delegate_tool.py
   1  #!/usr/bin/env python3
   2  """
   3  Delegate Tool -- Subagent Architecture
   4  
   5  Spawns child AIAgent instances with isolated context, restricted toolsets,
   6  and their own terminal sessions. Supports single-task and batch (parallel)
   7  modes. The parent blocks until all children complete.
   8  
   9  Each child gets:
  10    - A fresh conversation (no parent history)
  11    - Its own task_id (own terminal session, file ops cache)
  12    - A restricted toolset (configurable, with blocked tools always stripped)
  13    - A focused system prompt built from the delegated goal + context
  14  
  15  The parent's context only sees the delegation call and the summary result,
  16  never the child's intermediate tool calls or reasoning.
  17  """
  18  
  19  import enum
  20  import json
  21  import logging
  22  
  23  logger = logging.getLogger(__name__)
  24  import os
  25  import threading
  26  import time
  27  from concurrent.futures import (
  28      ThreadPoolExecutor,
  29      TimeoutError as FuturesTimeoutError,
  30  )
  31  from typing import Any, Dict, List, Optional
  32  
  33  from toolsets import TOOLSETS
  34  from tools import file_state
  35  from tools.terminal_tool import set_approval_callback as _set_subagent_approval_cb
  36  from utils import base_url_hostname, is_truthy_value
  37  
  38  
  39  # Tools that children must never have access to
  40  DELEGATE_BLOCKED_TOOLS = frozenset(
  41      [
  42          "delegate_task",  # no recursive delegation
  43          "clarify",  # no user interaction
  44          "memory",  # no writes to shared MEMORY.md
  45          "send_message",  # no cross-platform side effects
  46          "execute_code",  # children should reason step-by-step, not write scripts
  47      ]
  48  )
  49  
  50  
  51  # ---------------------------------------------------------------------------
  52  # Subagent approval callbacks
  53  # ---------------------------------------------------------------------------
  54  # Subagents run inside a ThreadPoolExecutor worker. The CLI's interactive
  55  # approval callback is stored in tools/terminal_tool.py's threading.local(),
  56  # so worker threads do NOT inherit it. Without a callback,
  57  # prompt_dangerous_approval() falls back to input() from the worker thread,
  58  # which deadlocks against the parent's prompt_toolkit TUI that owns stdin.
  59  #
  60  # Fix: install a non-interactive callback into every subagent worker thread
  61  # via ThreadPoolExecutor(initializer=_set_subagent_approval_cb, initargs=(cb,)).
  62  # The callback is chosen by the `delegation.subagent_auto_approve` config:
  63  #   false (default) → _subagent_auto_deny (safe; matches leaf tool blocklist)
  64  #   true            → _subagent_auto_approve (opt-in YOLO for cron/batch)
  65  # Both emit a logger.warning for audit; gateway sessions are unaffected
  66  # because they resolve approvals via tools/approval.py's per-session queue,
  67  # not through these TLS callbacks.
  68  def _subagent_auto_deny(command: str, description: str, **kwargs) -> str:
  69      """Auto-deny dangerous commands in subagent threads (safe default).
  70  
  71      Returns 'deny' so the subagent sees a refusal it can recover from, and
  72      never calls input() (which would deadlock the parent TUI).
  73      """
  74      logger.warning(
  75          "Subagent auto-denied dangerous command: %s (%s). "
  76          "Set delegation.subagent_auto_approve: true to allow.",
  77          command, description,
  78      )
  79      return "deny"
  80  
  81  
  82  def _subagent_auto_approve(command: str, description: str, **kwargs) -> str:
  83      """Auto-approve dangerous commands in subagent threads (opt-in YOLO).
  84  
  85      Only installed when delegation.subagent_auto_approve=true. Returns 'once'
  86      so the subagent proceeds without blocking the parent UI.
  87      """
  88      logger.warning(
  89          "Subagent auto-approved dangerous command: %s (%s)",
  90          command, description,
  91      )
  92      return "once"
  93  
  94  
  95  def _get_subagent_approval_callback():
  96      """Return the callback to install into subagent worker threads.
  97  
  98      Config key: delegation.subagent_auto_approve (bool, default False).
  99      Reads via the same _load_config() path as the rest of delegate_task so
 100      priority is config.yaml > (no env override for this knob) > default.
 101      """
 102      cfg = _load_config()
 103      val = cfg.get("subagent_auto_approve", False)
 104      if is_truthy_value(val):
 105          return _subagent_auto_approve
 106      return _subagent_auto_deny
 107  
 108  # Build a description fragment listing toolsets available for subagents.
 109  # Excludes toolsets where ALL tools are blocked, composite/platform toolsets
 110  # (hermes-* prefixed), and scenario toolsets.
 111  #
 112  # NOTE: "delegation" is in this exclusion set so the subagent-facing
 113  # capability hint string (_TOOLSET_LIST_STR) doesn't advertise it as a
 114  # toolset to request explicitly — the correct mechanism for nested
 115  # delegation is role='orchestrator', which re-adds "delegation" in
 116  # _build_child_agent regardless of this exclusion.
 117  _EXCLUDED_TOOLSET_NAMES = frozenset({"debugging", "safe", "delegation", "moa", "rl"})
 118  _SUBAGENT_TOOLSETS = sorted(
 119      name
 120      for name, defn in TOOLSETS.items()
 121      if name not in _EXCLUDED_TOOLSET_NAMES
 122      and not name.startswith("hermes-")
 123      and not all(t in DELEGATE_BLOCKED_TOOLS for t in defn.get("tools", []))
 124  )
 125  _TOOLSET_LIST_STR = ", ".join(f"'{n}'" for n in _SUBAGENT_TOOLSETS)
 126  
 127  _DEFAULT_MAX_CONCURRENT_CHILDREN = 3
 128  MAX_DEPTH = 1  # flat by default: parent (0) -> child (1); grandchild rejected unless max_spawn_depth raised.
 129  # Configurable depth cap consulted by _get_max_spawn_depth; MAX_DEPTH
 130  # stays as the default fallback and is still the symbol tests import.
 131  _MIN_SPAWN_DEPTH = 1
 132  _MAX_SPAWN_DEPTH_CAP = 3
 133  
 134  
 135  # ---------------------------------------------------------------------------
 136  # Runtime state: pause flag + active subagent registry
 137  #
 138  # Consumed by the TUI observability layer (overlay/control surface) and the
 139  # gateway RPCs `delegation.pause`, `delegation.status`, `subagent.interrupt`.
 140  # Kept module-level so they span every delegate_task invocation in the
 141  # process, including nested orchestrator -> worker chains.
 142  # ---------------------------------------------------------------------------
 143  
 144  _spawn_pause_lock = threading.Lock()
 145  _spawn_paused: bool = False
 146  
 147  _active_subagents_lock = threading.Lock()
 148  # subagent_id -> mutable record tracking the live child agent.  Stays only
 149  # for the lifetime of the run; _run_single_child is the owner.
 150  _active_subagents: Dict[str, Dict[str, Any]] = {}
 151  
 152  
 153  def set_spawn_paused(paused: bool) -> bool:
 154      """Globally block/unblock new delegate_task spawns.
 155  
 156      Active children keep running; only NEW calls to delegate_task fail fast
 157      with a "spawning paused" error until unblocked.  Returns the new state.
 158      """
 159      global _spawn_paused
 160      with _spawn_pause_lock:
 161          _spawn_paused = bool(paused)
 162          return _spawn_paused
 163  
 164  
 165  def is_spawn_paused() -> bool:
 166      with _spawn_pause_lock:
 167          return _spawn_paused
 168  
 169  
 170  def _register_subagent(record: Dict[str, Any]) -> None:
 171      sid = record.get("subagent_id")
 172      if not sid:
 173          return
 174      with _active_subagents_lock:
 175          _active_subagents[sid] = record
 176  
 177  
 178  def _unregister_subagent(subagent_id: str) -> None:
 179      with _active_subagents_lock:
 180          _active_subagents.pop(subagent_id, None)
 181  
 182  
 183  def interrupt_subagent(subagent_id: str) -> bool:
 184      """Request that a single running subagent stop at its next iteration boundary.
 185  
 186      Does not hard-kill the worker thread (Python can't); sets the child's
 187      interrupt flag which propagates to in-flight tools and recurses into
 188      grandchildren via AIAgent.interrupt().  Returns True if a matching
 189      subagent was found.
 190      """
 191      with _active_subagents_lock:
 192          record = _active_subagents.get(subagent_id)
 193      if not record:
 194          return False
 195      agent = record.get("agent")
 196      if agent is None:
 197          return False
 198      try:
 199          agent.interrupt(f"Interrupted via TUI ({subagent_id})")
 200      except Exception as exc:
 201          logger.debug("interrupt_subagent(%s) failed: %s", subagent_id, exc)
 202          return False
 203      return True
 204  
 205  
 206  def list_active_subagents() -> List[Dict[str, Any]]:
 207      """Snapshot of the currently running subagent tree.
 208  
 209      Each record: {subagent_id, parent_id, depth, goal, model, started_at,
 210      tool_count, status}.  Safe to call from any thread — returns a copy.
 211      """
 212      with _active_subagents_lock:
 213          return [
 214              {k: v for k, v in r.items() if k != "agent"}
 215              for r in _active_subagents.values()
 216          ]
 217  
 218  
 219  def _extract_output_tail(
 220      result: Dict[str, Any],
 221      *,
 222      max_entries: int = 12,
 223      max_chars: int = 8000,
 224  ) -> List[Dict[str, Any]]:
 225      """Pull the last N tool-call results from a child's conversation.
 226  
 227      Powers the overlay's "Output" section — the cc-swarm-parity feature.
 228      We reuse the same messages list the trajectory saver walks, taking
 229      only the tail to keep event payloads small.  Each entry is
 230      ``{tool, preview, is_error}``.
 231      """
 232      messages = result.get("messages") if isinstance(result, dict) else None
 233      if not isinstance(messages, list):
 234          return []
 235  
 236      # Walk in reverse to build a tail; stop when we have enough.
 237      tail: List[Dict[str, Any]] = []
 238      pending_call_by_id: Dict[str, str] = {}
 239  
 240      # First pass (forward): build tool_call_id -> tool_name map
 241      for msg in messages:
 242          if not isinstance(msg, dict):
 243              continue
 244          if msg.get("role") == "assistant":
 245              for tc in msg.get("tool_calls") or []:
 246                  tc_id = tc.get("id")
 247                  fn = tc.get("function") or {}
 248                  if tc_id:
 249                      pending_call_by_id[tc_id] = str(fn.get("name") or "tool")
 250  
 251      # Second pass (reverse): pick tool results, newest first
 252      for msg in reversed(messages):
 253          if len(tail) >= max_entries:
 254              break
 255          if not isinstance(msg, dict) or msg.get("role") != "tool":
 256              continue
 257          content = msg.get("content") or ""
 258          if not isinstance(content, str):
 259              content = str(content)
 260          is_error = _looks_like_error_output(content)
 261          tool_name = pending_call_by_id.get(msg.get("tool_call_id") or "", "tool")
 262          # Preserve line structure so the overlay's wrapped scroll region can
 263          # show real output rather than a whitespace-collapsed blob. We still
 264          # cap the payload size to keep events bounded.
 265          preview = content[:max_chars]
 266          tail.append({"tool": tool_name, "preview": preview, "is_error": is_error})
 267  
 268      tail.reverse()  # restore chronological order for display
 269      return tail
 270  
 271  
 272  def _looks_like_error_output(content: str) -> bool:
 273      """Conservative stderr/error detector for tool-result previews.
 274  
 275      The old heuristic flagged any preview containing the substring "error",
 276      which painted perfectly normal terminal/json output red.  We now only
 277      mark output as an error when there is stronger evidence:
 278        - structured JSON with an ``error`` key
 279        - structured JSON with ``status`` of error/failed
 280        - first line starts with a classic error marker
 281      """
 282      if not content:
 283          return False
 284  
 285      head = content.lstrip()
 286      if head.startswith("{") or head.startswith("["):
 287          try:
 288              parsed = json.loads(content)
 289              if isinstance(parsed, dict):
 290                  if parsed.get("error"):
 291                      return True
 292                  status = str(parsed.get("status") or "").strip().lower()
 293                  if status in {"error", "failed", "failure", "timeout"}:
 294                      return True
 295          except Exception:
 296              pass
 297  
 298      first = content.splitlines()[0].strip().lower() if content.splitlines() else ""
 299      return (
 300          first.startswith("error:")
 301          or first.startswith("failed:")
 302          or first.startswith("traceback ")
 303          or first.startswith("exception:")
 304      )
 305  
 306  
 307  def _normalize_role(r: Optional[str]) -> str:
 308      """Normalise a caller-provided role to 'leaf' or 'orchestrator'.
 309  
 310      None/empty -> 'leaf'.  Unknown strings coerce to 'leaf' with a
 311      warning log (matches the silent-degrade pattern of
 312      _get_orchestrator_enabled).  _build_child_agent adds a second
 313      degrade layer for depth/kill-switch bounds.
 314      """
 315      if r is None or not r:
 316          return "leaf"
 317      r_norm = str(r).strip().lower()
 318      if r_norm in ("leaf", "orchestrator"):
 319          return r_norm
 320      logger.warning("Unknown delegate_task role=%r, coercing to 'leaf'", r)
 321      return "leaf"
 322  
 323  
 324  def _get_max_concurrent_children() -> int:
 325      """Read delegation.max_concurrent_children from config, falling back to
 326      DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
 327  
 328      Users can raise this as high as they want; only the floor (1) is enforced.
 329  
 330      Uses the same ``_load_config()`` path that the rest of ``delegate_task``
 331      uses, keeping config priority consistent (config.yaml > env > default).
 332      """
 333      cfg = _load_config()
 334      val = cfg.get("max_concurrent_children")
 335      if val is not None:
 336          try:
 337              result = max(1, int(val))
 338              if result > 10:
 339                  logger.warning(
 340                      "delegation.max_concurrent_children=%d: each child consumes API tokens "
 341                      "independently. High values multiply cost linearly.",
 342                      result,
 343                  )
 344              return result
 345          except (TypeError, ValueError):
 346              logger.warning(
 347                  "delegation.max_concurrent_children=%r is not a valid integer; "
 348                  "using default %d",
 349                  val,
 350                  _DEFAULT_MAX_CONCURRENT_CHILDREN,
 351              )
 352              return _DEFAULT_MAX_CONCURRENT_CHILDREN
 353      env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
 354      if env_val:
 355          try:
 356              return max(1, int(env_val))
 357          except (TypeError, ValueError):
 358              return _DEFAULT_MAX_CONCURRENT_CHILDREN
 359      return _DEFAULT_MAX_CONCURRENT_CHILDREN
 360  
 361  
 362  def _get_child_timeout() -> float:
 363      """Read delegation.child_timeout_seconds from config.
 364  
 365      Returns the number of seconds a single child agent is allowed to run
 366      before being considered stuck.  Default: 600 s (10 minutes).
 367      """
 368      cfg = _load_config()
 369      val = cfg.get("child_timeout_seconds")
 370      if val is not None:
 371          try:
 372              return max(30.0, float(val))
 373          except (TypeError, ValueError):
 374              logger.warning(
 375                  "delegation.child_timeout_seconds=%r is not a valid number; "
 376                  "using default %d",
 377                  val,
 378                  DEFAULT_CHILD_TIMEOUT,
 379              )
 380      env_val = os.getenv("DELEGATION_CHILD_TIMEOUT_SECONDS")
 381      if env_val:
 382          try:
 383              return max(30.0, float(env_val))
 384          except (TypeError, ValueError):
 385              pass
 386      return float(DEFAULT_CHILD_TIMEOUT)
 387  
 388  
 389  def _get_max_spawn_depth() -> int:
 390      """Read delegation.max_spawn_depth from config, clamped to [1, 3].
 391  
 392      depth 0 = parent agent.  max_spawn_depth = N means agents at depths
 393      0..N-1 can spawn; depth N is the leaf floor.  Default 1 is flat:
 394      parent spawns children (depth 1), depth-1 children cannot spawn
 395      (blocked by this guard AND, for leaf children, by the delegation
 396      toolset strip in _strip_blocked_tools).
 397  
 398      Raise to 2 or 3 to unlock nested orchestration. role="orchestrator"
 399      removes the toolset strip for depth-1 children when
 400      max_spawn_depth >= 2, enabling them to spawn their own workers.
 401      """
 402      cfg = _load_config()
 403      val = cfg.get("max_spawn_depth")
 404      if val is None:
 405          return MAX_DEPTH
 406      try:
 407          ival = int(val)
 408      except (TypeError, ValueError):
 409          logger.warning(
 410              "delegation.max_spawn_depth=%r is not a valid integer; " "using default %d",
 411              val,
 412              MAX_DEPTH,
 413          )
 414          return MAX_DEPTH
 415      clamped = max(_MIN_SPAWN_DEPTH, min(_MAX_SPAWN_DEPTH_CAP, ival))
 416      if clamped != ival:
 417          logger.warning(
 418              "delegation.max_spawn_depth=%d out of range [%d, %d]; " "clamping to %d",
 419              ival,
 420              _MIN_SPAWN_DEPTH,
 421              _MAX_SPAWN_DEPTH_CAP,
 422              clamped,
 423          )
 424      return clamped
 425  
 426  
 427  def _get_orchestrator_enabled() -> bool:
 428      """Global kill switch for the orchestrator role.
 429  
 430      When False, role="orchestrator" is silently forced to "leaf" in
 431      _build_child_agent and the delegation toolset is stripped as before.
 432      Lets an operator disable the feature without a code revert.
 433      """
 434      cfg = _load_config()
 435      val = cfg.get("orchestrator_enabled", True)
 436      if isinstance(val, bool):
 437          return val
 438      # Accept "true"/"false" strings from YAML that doesn't auto-coerce.
 439      if isinstance(val, str):
 440          return val.strip().lower() in ("true", "1", "yes", "on")
 441      return True
 442  
 443  
 444  def _get_inherit_mcp_toolsets() -> bool:
 445      """Whether narrowed child toolsets should keep the parent's MCP toolsets."""
 446      cfg = _load_config()
 447      return is_truthy_value(cfg.get("inherit_mcp_toolsets"), default=True)
 448  
 449  
 450  def _is_mcp_toolset_name(name: str) -> bool:
 451      """Return True for canonical MCP toolsets and their registered aliases."""
 452      if not name:
 453          return False
 454      if str(name).startswith("mcp-"):
 455          return True
 456      try:
 457          from tools.registry import registry
 458  
 459          target = registry.get_toolset_alias_target(str(name))
 460      except Exception:
 461          target = None
 462      return bool(target and str(target).startswith("mcp-"))
 463  
 464  
 465  def _preserve_parent_mcp_toolsets(
 466      child_toolsets: List[str], parent_toolsets: set[str]
 467  ) -> List[str]:
 468      """Append any parent MCP toolsets that are missing from a narrowed child."""
 469      preserved = list(child_toolsets)
 470      for toolset_name in sorted(parent_toolsets):
 471          if _is_mcp_toolset_name(toolset_name) and toolset_name not in preserved:
 472              preserved.append(toolset_name)
 473      return preserved
 474  
 475  
 476  DEFAULT_MAX_ITERATIONS = 50
 477  DEFAULT_CHILD_TIMEOUT = 600  # seconds before a child agent is considered stuck
 478  _HEARTBEAT_INTERVAL = 30  # seconds between parent activity heartbeats during delegation
 479  # Stale-heartbeat thresholds. A child with no API-call progress is either:
 480  #   - idle between turns (no current_tool) — probably stuck on a slow API call
 481  #   - inside a tool (current_tool set) — probably running a legitimately long
 482  #     operation (terminal command, web fetch, large file read)
 483  # The idle ceiling stays tight so genuinely stuck children don't mask the gateway
 484  # timeout. The in-tool ceiling is much higher so legit long-running tools get
 485  # time to finish; child_timeout_seconds (default 600s) is still the hard cap.
 486  _HEARTBEAT_STALE_CYCLES_IDLE = 15  # 15 * 30s = 450s idle between turns → stale
 487  _HEARTBEAT_STALE_CYCLES_IN_TOOL = 40  # 40 * 30s = 1200s stuck on same tool → stale
 488  DEFAULT_TOOLSETS = ["terminal", "file", "web"]
 489  
 490  
 491  # ---------------------------------------------------------------------------
 492  # Delegation progress event types
 493  # ---------------------------------------------------------------------------
 494  
 495  
 496  class DelegateEvent(str, enum.Enum):
 497      """Formal event types emitted during delegation progress.
 498  
 499      _build_child_progress_callback normalises incoming legacy strings
 500      (``tool.started``, ``_thinking``, …) to these enum values via
 501      ``_LEGACY_EVENT_MAP``.  External consumers (gateway SSE, ACP adapter,
 502      CLI) still receive the legacy strings during the deprecation window.
 503  
 504      TASK_SPAWNED / TASK_COMPLETED / TASK_FAILED are reserved for
 505      future orchestrator lifecycle events and are not currently emitted.
 506      """
 507  
 508      TASK_SPAWNED = "delegate.task_spawned"
 509      TASK_PROGRESS = "delegate.task_progress"
 510      TASK_COMPLETED = "delegate.task_completed"
 511      TASK_FAILED = "delegate.task_failed"
 512      TASK_THINKING = "delegate.task_thinking"
 513      TASK_TOOL_STARTED = "delegate.tool_started"
 514      TASK_TOOL_COMPLETED = "delegate.tool_completed"
 515  
 516  
 517  # Legacy event strings → DelegateEvent mapping.
 518  # Incoming child-agent events use the old names; the callback normalises them.
 519  _LEGACY_EVENT_MAP: Dict[str, DelegateEvent] = {
 520      "_thinking": DelegateEvent.TASK_THINKING,
 521      "reasoning.available": DelegateEvent.TASK_THINKING,
 522      "tool.started": DelegateEvent.TASK_TOOL_STARTED,
 523      "tool.completed": DelegateEvent.TASK_TOOL_COMPLETED,
 524      "subagent_progress": DelegateEvent.TASK_PROGRESS,
 525  }
 526  
 527  
 528  def check_delegate_requirements() -> bool:
 529      """Delegation has no external requirements -- always available."""
 530      return True
 531  
 532  
 533  def _build_child_system_prompt(
 534      goal: str,
 535      context: Optional[str] = None,
 536      *,
 537      workspace_path: Optional[str] = None,
 538      role: str = "leaf",
 539      max_spawn_depth: int = 2,
 540      child_depth: int = 1,
 541  ) -> str:
 542      """Build a focused system prompt for a child agent.
 543  
 544      When role='orchestrator', appends a delegation-capability block
 545      modeled on OpenClaw's buildSubagentSystemPrompt (canSpawn branch at
 546      inspiration/openclaw/src/agents/subagent-system-prompt.ts:63-95).
 547      The depth note is literal truth (grounded in the passed config) so
 548      the LLM doesn't confabulate nesting capabilities that don't exist.
 549      """
 550      parts = [
 551          "You are a focused subagent working on a specific delegated task.",
 552          "",
 553          f"YOUR TASK:\n{goal}",
 554      ]
 555      if context and context.strip():
 556          parts.append(f"\nCONTEXT:\n{context}")
 557      if workspace_path and str(workspace_path).strip():
 558          parts.append(
 559              "\nWORKSPACE PATH:\n"
 560              f"{workspace_path}\n"
 561              "Use this exact path for local repository/workdir operations unless the task explicitly says otherwise."
 562          )
 563      parts.append(
 564          "\nComplete this task using the tools available to you. "
 565          "When finished, provide a clear, concise summary of:\n"
 566          "- What you did\n"
 567          "- What you found or accomplished\n"
 568          "- Any files you created or modified\n"
 569          "- Any issues encountered\n\n"
 570          "Important workspace rule: Never assume a repository lives at /workspace/... or any other container-style path unless the task/context explicitly gives that path. "
 571          "If no exact local path is provided, discover it first before issuing git/workdir-specific commands.\n\n"
 572          "Be thorough but concise -- your response is returned to the "
 573          "parent agent as a summary."
 574      )
 575      if role == "orchestrator":
 576          child_note = (
 577              "Your own children MUST be leaves (cannot delegate further) "
 578              "because they would be at the depth floor — you cannot pass "
 579              "role='orchestrator' to your own delegate_task calls."
 580              if child_depth + 1 >= max_spawn_depth
 581              else "Your own children can themselves be orchestrators or leaves, "
 582              "depending on the `role` you pass to delegate_task. Default is "
 583              "'leaf'; pass role='orchestrator' explicitly when a child "
 584              "needs to further decompose its work."
 585          )
 586          parts.append(
 587              "\n## Subagent Spawning (Orchestrator Role)\n"
 588              "You have access to the `delegate_task` tool and CAN spawn "
 589              "your own subagents to parallelize independent work.\n\n"
 590              "WHEN to delegate:\n"
 591              "- The goal decomposes into 2+ independent subtasks that can "
 592              "run in parallel (e.g. research A and B simultaneously).\n"
 593              "- A subtask is reasoning-heavy and would flood your context "
 594              "with intermediate data.\n\n"
 595              "WHEN NOT to delegate:\n"
 596              "- Single-step mechanical work — do it directly.\n"
 597              "- Trivial tasks you can execute in one or two tool calls.\n"
 598              "- Re-delegating your entire assigned goal to one worker "
 599              "(that's just pass-through with no value added).\n\n"
 600              "Coordinate your workers' results and synthesize them before "
 601              "reporting back to your parent. You are responsible for the "
 602              "final summary, not your workers.\n\n"
 603              f"NOTE: You are at depth {child_depth}. The delegation tree "
 604              f"is capped at max_spawn_depth={max_spawn_depth}. {child_note}"
 605          )
 606      return "\n".join(parts)
 607  
 608  
 609  def _resolve_workspace_hint(parent_agent) -> Optional[str]:
 610      """Best-effort local workspace hint for child prompts.
 611  
 612      We only inject a path when we have a concrete absolute directory. This avoids
 613      teaching subagents a fake container path while still helping them avoid
 614      guessing `/workspace/...` for local repo tasks.
 615      """
 616      candidates = [
 617          os.getenv("TERMINAL_CWD"),
 618          getattr(
 619              getattr(parent_agent, "_subdirectory_hints", None), "working_dir", None
 620          ),
 621          getattr(parent_agent, "terminal_cwd", None),
 622          getattr(parent_agent, "cwd", None),
 623      ]
 624      for candidate in candidates:
 625          if not candidate:
 626              continue
 627          try:
 628              text = os.path.abspath(os.path.expanduser(str(candidate)))
 629          except Exception:
 630              continue
 631          if os.path.isabs(text) and os.path.isdir(text):
 632              return text
 633      return None
 634  
 635  
 636  def _strip_blocked_tools(toolsets: List[str]) -> List[str]:
 637      """Remove toolsets that contain only blocked tools."""
 638      blocked_toolset_names = {
 639          "delegation",
 640          "clarify",
 641          "memory",
 642          "code_execution",
 643      }
 644      return [t for t in toolsets if t not in blocked_toolset_names]
 645  
 646  
 647  def _build_child_progress_callback(
 648      task_index: int,
 649      goal: str,
 650      parent_agent,
 651      task_count: int = 1,
 652      *,
 653      subagent_id: Optional[str] = None,
 654      parent_id: Optional[str] = None,
 655      depth: Optional[int] = None,
 656      model: Optional[str] = None,
 657      toolsets: Optional[List[str]] = None,
 658  ) -> Optional[callable]:
 659      """Build a callback that relays child agent tool calls to the parent display.
 660  
 661      Two display paths:
 662        CLI:     prints tree-view lines above the parent's delegation spinner
 663        Gateway: batches tool names and relays to parent's progress callback
 664  
 665      The identity kwargs (``subagent_id``, ``parent_id``, ``depth``, ``model``,
 666      ``toolsets``) are threaded into every relayed event so the TUI can
 667      reconstruct the live spawn tree and route per-branch controls (kill,
 668      pause) back by ``subagent_id``.  All are optional for backward compat —
 669      older callers that ignore them still produce a flat list on the TUI.
 670  
 671      Returns None if no display mechanism is available, in which case the
 672      child agent runs with no progress callback (identical to current behavior).
 673      """
 674      spinner = getattr(parent_agent, "_delegate_spinner", None)
 675      parent_cb = getattr(parent_agent, "tool_progress_callback", None)
 676  
 677      if not spinner and not parent_cb:
 678          return None  # No display → no callback → zero behavior change
 679  
 680      # Show 1-indexed prefix only in batch mode (multiple tasks)
 681      prefix = f"[{task_index + 1}] " if task_count > 1 else ""
 682      goal_label = (goal or "").strip()
 683  
 684      # Gateway: batch tool names, flush periodically
 685      _BATCH_SIZE = 5
 686      _batch: List[str] = []
 687      _tool_count = [0]  # per-subagent running counter (list for closure mutation)
 688  
 689      def _identity_kwargs() -> Dict[str, Any]:
 690          kw: Dict[str, Any] = {
 691              "task_index": task_index,
 692              "task_count": task_count,
 693              "goal": goal_label,
 694          }
 695          if subagent_id is not None:
 696              kw["subagent_id"] = subagent_id
 697          if parent_id is not None:
 698              kw["parent_id"] = parent_id
 699          if depth is not None:
 700              kw["depth"] = depth
 701          if model is not None:
 702              kw["model"] = model
 703          if toolsets is not None:
 704              kw["toolsets"] = list(toolsets)
 705          kw["tool_count"] = _tool_count[0]
 706          return kw
 707  
 708      def _relay(
 709          event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs
 710      ):
 711          if not parent_cb:
 712              return
 713          payload = _identity_kwargs()
 714          payload.update(kwargs)  # caller overrides (e.g. status, duration_seconds)
 715          try:
 716              parent_cb(event_type, tool_name, preview, args, **payload)
 717          except Exception as e:
 718              logger.debug("Parent callback failed: %s", e)
 719  
 720      def _callback(
 721          event_type, tool_name: str = None, preview: str = None, args=None, **kwargs
 722      ):
 723          # Lifecycle events emitted by the orchestrator itself — handled
 724          # before enum normalisation since they are not part of DelegateEvent.
 725          if event_type == "subagent.start":
 726              if spinner and goal_label:
 727                  short = (
 728                      (goal_label[:55] + "...") if len(goal_label) > 55 else goal_label
 729                  )
 730                  try:
 731                      spinner.print_above(f" {prefix}├─ 🔀 {short}")
 732                  except Exception as e:
 733                      logger.debug("Spinner print_above failed: %s", e)
 734              _relay("subagent.start", preview=preview or goal_label or "", **kwargs)
 735              return
 736  
 737          if event_type == "subagent.complete":
 738              _relay("subagent.complete", preview=preview, **kwargs)
 739              return
 740  
 741          # Normalise legacy strings, new-style "delegate.*" strings, and
 742          # DelegateEvent enum values all to a single DelegateEvent.  The
 743          # original implementation only accepted the five legacy strings;
 744          # enum-typed callers were silently dropped.
 745          if isinstance(event_type, DelegateEvent):
 746              event = event_type
 747          else:
 748              event = _LEGACY_EVENT_MAP.get(event_type)
 749              if event is None:
 750                  try:
 751                      event = DelegateEvent(event_type)
 752                  except (ValueError, TypeError):
 753                      return  # Unknown event — ignore
 754  
 755          if event == DelegateEvent.TASK_THINKING:
 756              text = preview or tool_name or ""
 757              if spinner:
 758                  short = (text[:55] + "...") if len(text) > 55 else text
 759                  try:
 760                      spinner.print_above(f' {prefix}├─ 💭 "{short}"')
 761                  except Exception as e:
 762                      logger.debug("Spinner print_above failed: %s", e)
 763              _relay("subagent.thinking", preview=text)
 764              return
 765  
 766          if event == DelegateEvent.TASK_TOOL_COMPLETED:
 767              return
 768  
 769          if event == DelegateEvent.TASK_PROGRESS:
 770              # Pre-batched progress summary relayed from a nested
 771              # orchestrator's grandchild (upstream emits as
 772              # parent_cb("subagent_progress", summary_string) where the
 773              # summary lands in the tool_name positional slot).  Treat as
 774              # a pass-through: render distinctly (not via the tool-start
 775              # emoji lookup, which would mistake the summary string for a
 776              # tool name) and relay upward without re-batching.
 777              summary_text = tool_name or preview or ""
 778              if spinner and summary_text:
 779                  try:
 780                      spinner.print_above(f" {prefix}├─ 🔀 {summary_text}")
 781                  except Exception as e:
 782                      logger.debug("Spinner print_above failed: %s", e)
 783              if parent_cb:
 784                  try:
 785                      parent_cb("subagent_progress", f"{prefix}{summary_text}")
 786                  except Exception as e:
 787                      logger.debug("Parent callback relay failed: %s", e)
 788              return
 789  
 790          # TASK_TOOL_STARTED — display and batch for parent relay
 791          _tool_count[0] += 1
 792          if subagent_id is not None:
 793              with _active_subagents_lock:
 794                  rec = _active_subagents.get(subagent_id)
 795                  if rec is not None:
 796                      rec["tool_count"] = _tool_count[0]
 797                      rec["last_tool"] = tool_name or ""
 798          if spinner:
 799              short = (
 800                  (preview[:35] + "...")
 801                  if preview and len(preview) > 35
 802                  else (preview or "")
 803              )
 804              from agent.display import get_tool_emoji
 805  
 806              emoji = get_tool_emoji(tool_name or "")
 807              line = f" {prefix}├─ {emoji} {tool_name}"
 808              if short:
 809                  line += f'  "{short}"'
 810              try:
 811                  spinner.print_above(line)
 812              except Exception as e:
 813                  logger.debug("Spinner print_above failed: %s", e)
 814  
 815          if parent_cb:
 816              _relay("subagent.tool", tool_name, preview, args)
 817              _batch.append(tool_name or "")
 818              if len(_batch) >= _BATCH_SIZE:
 819                  summary = ", ".join(_batch)
 820                  _relay("subagent.progress", preview=f"🔀 {prefix}{summary}")
 821                  _batch.clear()
 822  
 823      def _flush():
 824          """Flush remaining batched tool names to gateway on completion."""
 825          if parent_cb and _batch:
 826              summary = ", ".join(_batch)
 827              _relay("subagent.progress", preview=f"🔀 {prefix}{summary}")
 828              _batch.clear()
 829  
 830      _callback._flush = _flush
 831      return _callback
 832  
 833  
 834  def _build_child_agent(
 835      task_index: int,
 836      goal: str,
 837      context: Optional[str],
 838      toolsets: Optional[List[str]],
 839      model: Optional[str],
 840      max_iterations: int,
 841      task_count: int,
 842      parent_agent,
 843      # Credential overrides from delegation config (provider:model resolution)
 844      override_provider: Optional[str] = None,
 845      override_base_url: Optional[str] = None,
 846      override_api_key: Optional[str] = None,
 847      override_api_mode: Optional[str] = None,
 848      # ACP transport overrides — lets a non-ACP parent spawn ACP child agents
 849      override_acp_command: Optional[str] = None,
 850      override_acp_args: Optional[List[str]] = None,
 851      # Per-call role controlling whether the child can further delegate.
 852      # 'leaf' (default) cannot; 'orchestrator' retains the delegation
 853      # toolset subject to depth/kill-switch bounds applied below.
 854      role: str = "leaf",
 855  ):
 856      """
 857      Build a child AIAgent on the main thread (thread-safe construction).
 858      Returns the constructed child agent without running it.
 859  
 860      When override_* params are set (from delegation config), the child uses
 861      those credentials instead of inheriting from the parent.  This enables
 862      routing subagents to a different provider:model pair (e.g. cheap/fast
 863      model on OpenRouter while the parent runs on Nous Portal).
 864      """
 865      from run_agent import AIAgent
 866      import uuid as _uuid
 867  
 868      # ── Role resolution ─────────────────────────────────────────────────
 869      # Honor the caller's role only when BOTH the kill switch and the
 870      # child's depth allow it.  This is the single point where role
 871      # degrades to 'leaf' — keeps the rule predictable.  Callers pass
 872      # the normalised role (_normalize_role ran in delegate_task) so
 873      # we only deal with 'leaf' or 'orchestrator' here.
 874      child_depth = getattr(parent_agent, "_delegate_depth", 0) + 1
 875      max_spawn = _get_max_spawn_depth()
 876      orchestrator_ok = _get_orchestrator_enabled() and child_depth < max_spawn
 877      effective_role = role if (role == "orchestrator" and orchestrator_ok) else "leaf"
 878  
 879      # ── Subagent identity (stable across events, 0-indexed for TUI) ─────
 880      # subagent_id is generated here so the progress callback, the
 881      # spawn_requested event, and the _active_subagents registry all share
 882      # one key.  parent_id is non-None when THIS parent is itself a subagent
 883      # (nested orchestrator -> worker chain).
 884      subagent_id = f"sa-{task_index}-{_uuid.uuid4().hex[:8]}"
 885      parent_subagent_id = getattr(parent_agent, "_subagent_id", None)
 886      tui_depth = max(0, child_depth - 1)  # 0 = first-level child for the UI
 887  
 888      delegation_cfg = _load_config()
 889  
 890      # When no explicit toolsets given, inherit from parent's enabled toolsets
 891      # so disabled tools (e.g. web) don't leak to subagents.
 892      # Note: enabled_toolsets=None means "all tools enabled" (the default),
 893      # so we must derive effective toolsets from the parent's loaded tools.
 894      parent_enabled = getattr(parent_agent, "enabled_toolsets", None)
 895      if parent_enabled is not None:
 896          parent_toolsets = set(parent_enabled)
 897      elif parent_agent and hasattr(parent_agent, "valid_tool_names"):
 898          # enabled_toolsets is None (all tools) — derive from loaded tool names
 899          import model_tools
 900  
 901          parent_toolsets = {
 902              ts
 903              for name in parent_agent.valid_tool_names
 904              if (ts := model_tools.get_toolset_for_tool(name)) is not None
 905          }
 906      else:
 907          parent_toolsets = set(DEFAULT_TOOLSETS)
 908  
 909      if toolsets:
 910          # Intersect with parent — subagent must not gain tools the parent lacks
 911          child_toolsets = [t for t in toolsets if t in parent_toolsets]
 912          if _get_inherit_mcp_toolsets():
 913              child_toolsets = _preserve_parent_mcp_toolsets(
 914                  child_toolsets, parent_toolsets
 915              )
 916          child_toolsets = _strip_blocked_tools(child_toolsets)
 917      elif parent_agent and parent_enabled is not None:
 918          child_toolsets = _strip_blocked_tools(parent_enabled)
 919      elif parent_toolsets:
 920          child_toolsets = _strip_blocked_tools(sorted(parent_toolsets))
 921      else:
 922          child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
 923  
 924      # Orchestrators retain the 'delegation' toolset that _strip_blocked_tools
 925      # removed.  The re-add is unconditional on parent-toolset membership because
 926      # orchestrator capability is granted by role, not inherited — see the
 927      # test_intersection_preserves_delegation_bound test for the design rationale.
 928      if effective_role == "orchestrator" and "delegation" not in child_toolsets:
 929          child_toolsets.append("delegation")
 930  
 931      workspace_hint = _resolve_workspace_hint(parent_agent)
 932      child_prompt = _build_child_system_prompt(
 933          goal,
 934          context,
 935          workspace_path=workspace_hint,
 936          role=effective_role,
 937          max_spawn_depth=max_spawn,
 938          child_depth=child_depth,
 939      )
 940      # Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
 941      parent_api_key = getattr(parent_agent, "api_key", None)
 942      if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
 943          parent_api_key = parent_agent._client_kwargs.get("api_key")
 944  
 945      # Resolve the child's effective model early so it can ride on every event.
 946      effective_model_for_cb = model or getattr(parent_agent, "model", None)
 947  
 948      # Build progress callback to relay tool calls to parent display.
 949      # Identity kwargs thread the subagent_id through every emitted event so the
 950      # TUI can reconstruct the spawn tree and route per-branch controls.
 951      child_progress_cb = _build_child_progress_callback(
 952          task_index,
 953          goal,
 954          parent_agent,
 955          task_count,
 956          subagent_id=subagent_id,
 957          parent_id=parent_subagent_id,
 958          depth=tui_depth,
 959          model=effective_model_for_cb,
 960          toolsets=child_toolsets,
 961      )
 962  
 963      # Each subagent gets its own iteration budget capped at max_iterations
 964      # (configurable via delegation.max_iterations, default 50).  This means
 965      # total iterations across parent + subagents can exceed the parent's
 966      # max_iterations.  The user controls the per-subagent cap in config.yaml.
 967  
 968      child_thinking_cb = None
 969      if child_progress_cb:
 970  
 971          def _child_thinking(text: str) -> None:
 972              if not text:
 973                  return
 974              try:
 975                  child_progress_cb("_thinking", text)
 976              except Exception as e:
 977                  logger.debug("Child thinking callback relay failed: %s", e)
 978  
 979          child_thinking_cb = _child_thinking
 980  
 981      # Resolve effective credentials: config override > parent inherit
 982      effective_model = model or parent_agent.model
 983      effective_provider = override_provider or getattr(parent_agent, "provider", None)
 984      effective_base_url = override_base_url or parent_agent.base_url
 985      effective_api_key = override_api_key or parent_api_key
 986      effective_api_mode = override_api_mode or getattr(parent_agent, "api_mode", None)
 987      effective_acp_command = override_acp_command or getattr(
 988          parent_agent, "acp_command", None
 989      )
 990      effective_acp_args = list(
 991          override_acp_args
 992          if override_acp_args is not None
 993          else (getattr(parent_agent, "acp_args", []) or [])
 994      )
 995  
 996      # When override_provider is set (e.g. delegation.provider: minimax-cn),
 997      # the subagent must use direct API calls — not the parent's ACP transport.
 998      # Inheriting acp_command unconditionally causes run_agent.py to initialize
 999      # CopilotACPClient, bypassing override credentials entirely (issue #16816).
1000      if override_provider and not override_acp_command:
1001          effective_acp_command = None
1002          effective_acp_args = []
1003  
1004      if override_acp_command:
1005          # If explicitly forcing an ACP transport override, the provider MUST be copilot-acp
1006          # so run_agent.py initializes the CopilotACPClient.
1007          effective_provider = "copilot-acp"
1008          effective_api_mode = "chat_completions"
1009  
1010      # Resolve reasoning config: delegation override > parent inherit
1011      parent_reasoning = getattr(parent_agent, "reasoning_config", None)
1012      child_reasoning = parent_reasoning
1013      try:
1014          delegation_effort = str(delegation_cfg.get("reasoning_effort") or "").strip()
1015          if delegation_effort:
1016              from hermes_constants import parse_reasoning_effort
1017  
1018              parsed = parse_reasoning_effort(delegation_effort)
1019              if parsed is not None:
1020                  child_reasoning = parsed
1021              else:
1022                  logger.warning(
1023                      "Unknown delegation.reasoning_effort '%s', inheriting parent level",
1024                      delegation_effort,
1025                  )
1026      except Exception as exc:
1027          logger.debug("Could not load delegation reasoning_effort: %s", exc)
1028  
1029      # Inherit the parent's fallback provider chain so subagents can recover
1030      # from rate-limits and credential exhaustion exactly like the top-level
1031      # agent does.  _fallback_chain is a list accepted by AIAgent's
1032      # fallback_model parameter (which handles both list and dict forms).
1033      parent_fallback = getattr(parent_agent, "_fallback_chain", None) or None
1034  
1035      # Inherit the parent's OpenRouter provider-preference filters by default
1036      # (so subagents routed to the same provider honour the same routing
1037      # constraints).  BUT: when `delegation.provider` is set the user is
1038      # explicitly asking the child to run on a different provider, and
1039      # parent-level OpenRouter filters (e.g. `only=["Anthropic"]`) would
1040      # silently force the child back onto the parent's provider. Clear the
1041      # filters in that case so the delegated provider is honoured.
1042      child_providers_allowed = getattr(parent_agent, "providers_allowed", None)
1043      child_providers_ignored = getattr(parent_agent, "providers_ignored", None)
1044      child_providers_order = getattr(parent_agent, "providers_order", None)
1045      child_provider_sort = getattr(parent_agent, "provider_sort", None)
1046      if override_provider:
1047          child_providers_allowed = None
1048          child_providers_ignored = None
1049          child_providers_order = None
1050          child_provider_sort = None
1051  
1052      child = AIAgent(
1053          base_url=effective_base_url,
1054          api_key=effective_api_key,
1055          model=effective_model,
1056          provider=effective_provider,
1057          api_mode=effective_api_mode,
1058          acp_command=effective_acp_command,
1059          acp_args=effective_acp_args,
1060          max_iterations=max_iterations,
1061          max_tokens=getattr(parent_agent, "max_tokens", None),
1062          reasoning_config=child_reasoning,
1063          prefill_messages=getattr(parent_agent, "prefill_messages", None),
1064          fallback_model=parent_fallback,
1065          enabled_toolsets=child_toolsets,
1066          quiet_mode=True,
1067          ephemeral_system_prompt=child_prompt,
1068          log_prefix=f"[subagent-{task_index}]",
1069          platform=parent_agent.platform,
1070          skip_context_files=True,
1071          skip_memory=True,
1072          clarify_callback=None,
1073          thinking_callback=child_thinking_cb,
1074          session_db=getattr(parent_agent, "_session_db", None),
1075          parent_session_id=getattr(parent_agent, "session_id", None),
1076          providers_allowed=child_providers_allowed,
1077          providers_ignored=child_providers_ignored,
1078          providers_order=child_providers_order,
1079          provider_sort=child_provider_sort,
1080          tool_progress_callback=child_progress_cb,
1081          iteration_budget=None,  # fresh budget per subagent
1082      )
1083      child._print_fn = getattr(parent_agent, "_print_fn", None)
1084      # Set delegation depth so children can't spawn grandchildren
1085      child._delegate_depth = child_depth
1086      # Stash the post-degrade role for introspection (leaf if the
1087      # kill switch or depth bounded the caller's requested role).
1088      child._delegate_role = effective_role
1089      # Stash subagent identity for nested-delegation event propagation and
1090      # for _run_single_child / interrupt_subagent to look up by id.
1091      child._subagent_id = subagent_id
1092      child._parent_subagent_id = parent_subagent_id
1093      child._subagent_goal = goal
1094  
1095      # Share a credential pool with the child when possible so subagents can
1096      # rotate credentials on rate limits instead of getting pinned to one key.
1097      child_pool = _resolve_child_credential_pool(effective_provider, parent_agent)
1098      if child_pool is not None:
1099          child._credential_pool = child_pool
1100  
1101      # Register child for interrupt propagation
1102      if hasattr(parent_agent, "_active_children"):
1103          lock = getattr(parent_agent, "_active_children_lock", None)
1104          if lock:
1105              with lock:
1106                  parent_agent._active_children.append(child)
1107          else:
1108              parent_agent._active_children.append(child)
1109  
1110      # Announce the spawn immediately — the child may sit in a queue
1111      # for seconds if max_concurrent_children is saturated, so the TUI
1112      # wants a node in the tree before run starts.
1113      if child_progress_cb:
1114          try:
1115              child_progress_cb("subagent.spawn_requested", preview=goal)
1116          except Exception as exc:
1117              logger.debug("spawn_requested relay failed: %s", exc)
1118  
1119      return child
1120  
1121  
1122  def _dump_subagent_timeout_diagnostic(
1123      *,
1124      child: Any,
1125      task_index: int,
1126      timeout_seconds: float,
1127      duration_seconds: float,
1128      worker_thread: Optional[threading.Thread],
1129      goal: str,
1130  ) -> Optional[str]:
1131      """Write a structured diagnostic dump for a subagent that timed out
1132      before making any API call.
1133  
1134      See issue #14726: users hit "subagent timed out after 300s with no response"
1135      with zero API calls and no way to inspect what happened. This helper
1136      writes a dedicated log under ``~/.hermes/logs/subagent-<sid>-<ts>.log``
1137      capturing the child's config, system-prompt / tool-schema sizes, activity
1138      tracker snapshot, and the worker thread's Python stack at timeout.
1139  
1140      Returns the absolute path to the diagnostic file, or None on failure.
1141      """
1142      try:
1143          from hermes_constants import get_hermes_home
1144          import datetime as _dt
1145          import sys as _sys
1146          import traceback as _traceback
1147  
1148          hermes_home = get_hermes_home()
1149          logs_dir = hermes_home / "logs"
1150          try:
1151              logs_dir.mkdir(parents=True, exist_ok=True)
1152          except Exception:
1153              return None
1154  
1155          subagent_id = getattr(child, "_subagent_id", None) or f"idx{task_index}"
1156          ts = _dt.datetime.now().strftime("%Y%m%d_%H%M%S")
1157          dump_path = logs_dir / f"subagent-timeout-{subagent_id}-{ts}.log"
1158  
1159          lines: List[str] = []
1160          def _w(line: str = "") -> None:
1161              lines.append(line)
1162  
1163          _w(f"# Subagent timeout diagnostic — issue #14726")
1164          _w(f"# Generated: {_dt.datetime.now().isoformat()}")
1165          _w("")
1166          _w("## Timeout")
1167          _w(f"  task_index:        {task_index}")
1168          _w(f"  subagent_id:       {subagent_id}")
1169          _w(f"  configured_timeout: {timeout_seconds}s")
1170          _w(f"  actual_duration:   {duration_seconds:.2f}s")
1171          _w("")
1172  
1173          _w("## Goal")
1174          _goal_preview = (goal or "").strip()
1175          if len(_goal_preview) > 1000:
1176              _goal_preview = _goal_preview[:1000] + " ...[truncated]"
1177          _w(_goal_preview or "(empty)")
1178          _w("")
1179  
1180          _w("## Child config")
1181          for attr in (
1182              "model", "provider", "api_mode", "base_url", "max_iterations",
1183              "quiet_mode", "skip_memory", "skip_context_files", "platform",
1184              "_delegate_role", "_delegate_depth",
1185          ):
1186              try:
1187                  val = getattr(child, attr, None)
1188                  # Redact api_key-shaped values defensively
1189                  if isinstance(val, str) and attr == "base_url":
1190                      pass
1191                  _w(f"  {attr}: {val!r}")
1192              except Exception:
1193                  _w(f"  {attr}: <unreadable>")
1194          _w("")
1195  
1196          _w("## Toolsets")
1197          enabled = getattr(child, "enabled_toolsets", None)
1198          _w(f"  enabled_toolsets:  {enabled!r}")
1199          tool_names = getattr(child, "valid_tool_names", None)
1200          if tool_names:
1201              _w(f"  loaded tool count: {len(tool_names)}")
1202              try:
1203                  _w(f"  loaded tools:      {sorted(list(tool_names))}")
1204              except Exception:
1205                  pass
1206          _w("")
1207  
1208          _w("## Prompt / schema sizes")
1209          try:
1210              sys_prompt = getattr(child, "ephemeral_system_prompt", None) \
1211                  or getattr(child, "system_prompt", None) \
1212                  or ""
1213              _w(f"  system_prompt_bytes: {len(sys_prompt.encode('utf-8')) if isinstance(sys_prompt, str) else 'n/a'}")
1214              _w(f"  system_prompt_chars: {len(sys_prompt) if isinstance(sys_prompt, str) else 'n/a'}")
1215          except Exception as exc:
1216              _w(f"  system_prompt: <error: {exc}>")
1217          try:
1218              tools_schema = getattr(child, "tools", None)
1219              if tools_schema is not None:
1220                  _schema_json = json.dumps(tools_schema, default=str)
1221                  _w(f"  tool_schema_count: {len(tools_schema)}")
1222                  _w(f"  tool_schema_bytes: {len(_schema_json.encode('utf-8'))}")
1223          except Exception as exc:
1224              _w(f"  tool_schema: <error: {exc}>")
1225          _w("")
1226  
1227          _w("## Activity summary")
1228          try:
1229              summary = child.get_activity_summary()
1230              for k, v in summary.items():
1231                  _w(f"  {k}: {v!r}")
1232          except Exception as exc:
1233              _w(f"  <get_activity_summary failed: {exc}>")
1234          _w("")
1235  
1236          _w("## Worker thread stack at timeout")
1237          if worker_thread is not None and worker_thread.is_alive():
1238              frames = _sys._current_frames()
1239              worker_frame = frames.get(worker_thread.ident)
1240              if worker_frame is not None:
1241                  stack = _traceback.format_stack(worker_frame)
1242                  for frame_line in stack:
1243                      for sub in frame_line.rstrip().split("\n"):
1244                          _w(f"  {sub}")
1245              else:
1246                  _w("  <worker frame not available>")
1247          elif worker_thread is None:
1248              _w("  <no worker thread handle>")
1249          else:
1250              _w("  <worker thread already exited>")
1251          _w("")
1252  
1253          _w("## Notes")
1254          _w("  This file is written ONLY when a subagent times out with 0 API calls.")
1255          _w("  0-API-call timeouts mean the child never reached its first LLM request.")
1256          _w("  Common causes: oversized prompt rejected by provider, transport hang,")
1257          _w("  credential resolution stuck. See issue #14726 for context.")
1258  
1259          dump_path.write_text("\n".join(lines), encoding="utf-8")
1260          return str(dump_path)
1261      except Exception as exc:
1262          logger.warning("Subagent timeout diagnostic dump failed: %s", exc)
1263          return None
1264  
1265  
1266  def _run_single_child(
1267      task_index: int,
1268      goal: str,
1269      child=None,
1270      parent_agent=None,
1271      **_kwargs,
1272  ) -> Dict[str, Any]:
1273      """
1274      Run a pre-built child agent. Called from within a thread.
1275      Returns a structured result dict.
1276      """
1277      child_start = time.monotonic()
1278  
1279      # Get the progress callback from the child agent
1280      child_progress_cb = getattr(child, "tool_progress_callback", None)
1281  
1282      # Restore parent tool names using the value saved before child construction
1283      # mutated the global. This is the correct parent toolset, not the child's.
1284      import model_tools
1285  
1286      _saved_tool_names = getattr(
1287          child, "_delegate_saved_tool_names", list(model_tools._last_resolved_tool_names)
1288      )
1289  
1290      child_pool = getattr(child, "_credential_pool", None)
1291      leased_cred_id = None
1292      if child_pool is not None:
1293          leased_cred_id = child_pool.acquire_lease()
1294          if leased_cred_id is not None:
1295              try:
1296                  leased_entry = child_pool.current()
1297                  if leased_entry is not None and hasattr(child, "_swap_credential"):
1298                      child._swap_credential(leased_entry)
1299              except Exception as exc:
1300                  logger.debug("Failed to bind child to leased credential: %s", exc)
1301  
1302      # Heartbeat: periodically propagate child activity to the parent so the
1303      # gateway inactivity timeout doesn't fire while the subagent is working.
1304      # Without this, the parent's _last_activity_ts freezes when delegate_task
1305      # starts and the gateway eventually kills the agent for "no activity".
1306      _heartbeat_stop = threading.Event()
1307      # Stale detection: track the child's (tool, iteration) pair across
1308      # heartbeat cycles. If neither advances, count the cycle as stale.
1309      # Different thresholds for idle vs in-tool (see _HEARTBEAT_STALE_CYCLES_*).
1310      _last_seen_iter = [0]
1311      _last_seen_tool = [None]  # type: list
1312      _stale_count = [0]
1313  
1314      def _heartbeat_loop():
1315          while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
1316              if parent_agent is None:
1317                  continue
1318              touch = getattr(parent_agent, "_touch_activity", None)
1319              if not touch:
1320                  continue
1321              # Pull detail from the child's own activity tracker
1322              desc = f"delegate_task: subagent {task_index} working"
1323              try:
1324                  child_summary = child.get_activity_summary()
1325                  child_tool = child_summary.get("current_tool")
1326                  child_iter = child_summary.get("api_call_count", 0)
1327                  child_max = child_summary.get("max_iterations", 0)
1328  
1329                  # Stale detection: count cycles where neither the iteration
1330                  # count nor the current_tool advances. A child running a
1331                  # legitimately long-running tool (terminal command, web
1332                  # fetch) keeps current_tool set but doesn't advance
1333                  # api_call_count — we don't want that to look stale at the
1334                  # idle threshold.
1335                  iter_advanced = child_iter > _last_seen_iter[0]
1336                  tool_changed = child_tool != _last_seen_tool[0]
1337                  if iter_advanced or tool_changed:
1338                      _last_seen_iter[0] = child_iter
1339                      _last_seen_tool[0] = child_tool
1340                      _stale_count[0] = 0
1341                  else:
1342                      _stale_count[0] += 1
1343  
1344                  # Pick threshold based on whether the child is currently
1345                  # inside a tool call. In-tool threshold is high enough to
1346                  # cover legitimately slow tools; idle threshold stays
1347                  # tight so the gateway timeout can fire on a truly wedged
1348                  # child.
1349                  stale_limit = (
1350                      _HEARTBEAT_STALE_CYCLES_IN_TOOL
1351                      if child_tool
1352                      else _HEARTBEAT_STALE_CYCLES_IDLE
1353                  )
1354                  if _stale_count[0] >= stale_limit:
1355                      logger.warning(
1356                          "Subagent %d appears stale (no progress for %d "
1357                          "heartbeat cycles, tool=%s) — stopping heartbeat",
1358                          task_index,
1359                          _stale_count[0],
1360                          child_tool or "<none>",
1361                      )
1362                      break  # stop touching parent, let gateway timeout fire
1363  
1364                  if child_tool:
1365                      desc = (
1366                          f"delegate_task: subagent running {child_tool} "
1367                          f"(iteration {child_iter}/{child_max})"
1368                      )
1369                  else:
1370                      child_desc = child_summary.get("last_activity_desc", "")
1371                      if child_desc:
1372                          desc = (
1373                              f"delegate_task: subagent {child_desc} "
1374                              f"(iteration {child_iter}/{child_max})"
1375                          )
1376              except Exception:
1377                  pass
1378              try:
1379                  touch(desc)
1380              except Exception:
1381                  pass
1382  
1383      _heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
1384      _heartbeat_thread.start()
1385  
1386      # Register the live agent in the module-level registry so the TUI can
1387      # target it by subagent_id (kill, pause, status queries).  Unregistered
1388      # in the finally block, even when the child raises.  Test doubles that
1389      # hand us a MagicMock don't carry stable ids; skip registration then.
1390      _raw_sid = getattr(child, "_subagent_id", None)
1391      _subagent_id = _raw_sid if isinstance(_raw_sid, str) else None
1392      if _subagent_id:
1393          _raw_depth = getattr(child, "_delegate_depth", 1)
1394          _tui_depth = max(0, _raw_depth - 1) if isinstance(_raw_depth, int) else 0
1395          _parent_sid = getattr(child, "_parent_subagent_id", None)
1396          _register_subagent(
1397              {
1398                  "subagent_id": _subagent_id,
1399                  "parent_id": _parent_sid if isinstance(_parent_sid, str) else None,
1400                  "depth": _tui_depth,
1401                  "goal": goal,
1402                  "model": (
1403                      getattr(child, "model", None)
1404                      if isinstance(getattr(child, "model", None), str)
1405                      else None
1406                  ),
1407                  "started_at": time.time(),
1408                  "status": "running",
1409                  "tool_count": 0,
1410                  "agent": child,
1411              }
1412          )
1413  
1414      try:
1415          if child_progress_cb:
1416              try:
1417                  child_progress_cb("subagent.start", preview=goal)
1418              except Exception as e:
1419                  logger.debug("Progress callback start failed: %s", e)
1420  
1421          # File-state coordination: reuse the stable subagent_id as the child's
1422          # task_id so file_state writes, active-subagents registry, and TUI
1423          # events all share one key.  Falls back to a fresh uuid only if the
1424          # pre-built id is somehow missing.
1425          import uuid as _uuid
1426  
1427          child_task_id = _subagent_id or f"subagent-{task_index}-{_uuid.uuid4().hex[:8]}"
1428          parent_task_id = getattr(parent_agent, "_current_task_id", None)
1429          wall_start = time.time()
1430          parent_reads_snapshot = (
1431              list(file_state.known_reads(parent_task_id)) if parent_task_id else []
1432          )
1433  
1434          # Run child with a hard timeout to prevent indefinite blocking
1435          # when the child's API call or tool-level HTTP request hangs.
1436          child_timeout = _get_child_timeout()
1437          _timeout_executor = ThreadPoolExecutor(
1438              max_workers=1,
1439              # Install a non-interactive approval callback in the worker thread
1440              # so dangerous-command prompts from the subagent don't fall back to
1441              # input() and deadlock the parent's prompt_toolkit TUI.
1442              # Callback (deny vs approve) is governed by delegation.subagent_auto_approve.
1443              initializer=_set_subagent_approval_cb,
1444              initargs=(_get_subagent_approval_callback(),),
1445          )
1446          # Capture the worker thread so the timeout diagnostic can dump its
1447          # Python stack (see #14726 — 0-API-call hangs are opaque without it).
1448          _worker_thread_holder: Dict[str, Optional[threading.Thread]] = {"t": None}
1449  
1450          def _run_with_thread_capture():
1451              _worker_thread_holder["t"] = threading.current_thread()
1452              return child.run_conversation(
1453                  user_message=goal,
1454                  task_id=child_task_id,
1455              )
1456  
1457          _child_future = _timeout_executor.submit(_run_with_thread_capture)
1458          try:
1459              result = _child_future.result(timeout=child_timeout)
1460          except Exception as _timeout_exc:
1461              # Signal the child to stop so its thread can exit cleanly.
1462              try:
1463                  if hasattr(child, "interrupt"):
1464                      child.interrupt()
1465                  elif hasattr(child, "_interrupt_requested"):
1466                      child._interrupt_requested = True
1467              except Exception:
1468                  pass
1469  
1470              is_timeout = isinstance(_timeout_exc, (FuturesTimeoutError, TimeoutError))
1471              duration = round(time.monotonic() - child_start, 2)
1472              logger.warning(
1473                  "Subagent %d %s after %.1fs",
1474                  task_index,
1475                  "timed out" if is_timeout else f"raised {type(_timeout_exc).__name__}",
1476                  duration,
1477              )
1478  
1479              # When a subagent times out BEFORE making any API call, dump a
1480              # diagnostic to help users (and us) see what the child was doing.
1481              # See #14726 — without this, 0-API-call hangs are black boxes.
1482              diagnostic_path: Optional[str] = None
1483              child_api_calls = 0
1484              try:
1485                  _summary = child.get_activity_summary()
1486                  child_api_calls = int(_summary.get("api_call_count", 0) or 0)
1487              except Exception:
1488                  pass
1489              if is_timeout and child_api_calls == 0:
1490                  diagnostic_path = _dump_subagent_timeout_diagnostic(
1491                      child=child,
1492                      task_index=task_index,
1493                      timeout_seconds=float(child_timeout),
1494                      duration_seconds=float(duration),
1495                      worker_thread=_worker_thread_holder.get("t"),
1496                      goal=goal,
1497                  )
1498                  if diagnostic_path:
1499                      logger.warning(
1500                          "Subagent %d 0-API-call timeout — diagnostic written to %s",
1501                          task_index,
1502                          diagnostic_path,
1503                      )
1504  
1505              if child_progress_cb:
1506                  try:
1507                      child_progress_cb(
1508                          "subagent.complete",
1509                          preview=(
1510                              f"Timed out after {duration}s"
1511                              if is_timeout
1512                              else str(_timeout_exc)
1513                          ),
1514                          status="timeout" if is_timeout else "error",
1515                          duration_seconds=duration,
1516                          summary="",
1517                      )
1518                  except Exception:
1519                      pass
1520  
1521              if is_timeout:
1522                  if child_api_calls == 0:
1523                      _err = (
1524                          f"Subagent timed out after {child_timeout}s without "
1525                          f"making any API call — the child never reached its "
1526                          f"first LLM request (prompt construction, credential "
1527                          f"resolution, or transport may be stuck)."
1528                      )
1529                      if diagnostic_path:
1530                          _err += f" Diagnostic: {diagnostic_path}"
1531                  else:
1532                      _err = (
1533                          f"Subagent timed out after {child_timeout}s with "
1534                          f"{child_api_calls} API call(s) completed — likely "
1535                          f"stuck on a slow API call or unresponsive network request."
1536                      )
1537              else:
1538                  _err = str(_timeout_exc)
1539  
1540              return {
1541                  "task_index": task_index,
1542                  "status": "timeout" if is_timeout else "error",
1543                  "summary": None,
1544                  "error": _err,
1545                  "exit_reason": "timeout" if is_timeout else "error",
1546                  "api_calls": child_api_calls,
1547                  "duration_seconds": duration,
1548                  "_child_role": getattr(child, "_delegate_role", None),
1549                  "diagnostic_path": diagnostic_path,
1550              }
1551          finally:
1552              # Shut down executor without waiting — if the child thread
1553              # is stuck on blocking I/O, wait=True would hang forever.
1554              _timeout_executor.shutdown(wait=False)
1555  
1556          # Flush any remaining batched progress to gateway
1557          if child_progress_cb and hasattr(child_progress_cb, "_flush"):
1558              try:
1559                  child_progress_cb._flush()
1560              except Exception as e:
1561                  logger.debug("Progress callback flush failed: %s", e)
1562  
1563          duration = round(time.monotonic() - child_start, 2)
1564  
1565          summary = result.get("final_response") or ""
1566          completed = result.get("completed", False)
1567          interrupted = result.get("interrupted", False)
1568          api_calls = result.get("api_calls", 0)
1569  
1570          if interrupted:
1571              status = "interrupted"
1572          elif summary:
1573              # A summary means the subagent produced usable output.
1574              # exit_reason ("completed" vs "max_iterations") already
1575              # tells the parent *how* the task ended.
1576              status = "completed"
1577          else:
1578              status = "failed"
1579  
1580          # Build tool trace from conversation messages (already in memory).
1581          # Uses tool_call_id to correctly pair parallel tool calls with results.
1582          tool_trace: list[Dict[str, Any]] = []
1583          trace_by_id: Dict[str, Dict[str, Any]] = {}
1584          messages = result.get("messages") or []
1585          if isinstance(messages, list):
1586              for msg in messages:
1587                  if not isinstance(msg, dict):
1588                      continue
1589                  if msg.get("role") == "assistant":
1590                      for tc in msg.get("tool_calls") or []:
1591                          fn = tc.get("function", {})
1592                          entry_t = {
1593                              "tool": fn.get("name", "unknown"),
1594                              "args_bytes": len(fn.get("arguments", "")),
1595                          }
1596                          tool_trace.append(entry_t)
1597                          tc_id = tc.get("id")
1598                          if tc_id:
1599                              trace_by_id[tc_id] = entry_t
1600                  elif msg.get("role") == "tool":
1601                      content = msg.get("content", "")
1602                      is_error = bool(content and "error" in content[:80].lower())
1603                      result_meta = {
1604                          "result_bytes": len(content),
1605                          "status": "error" if is_error else "ok",
1606                      }
1607                      # Match by tool_call_id for parallel calls
1608                      tc_id = msg.get("tool_call_id")
1609                      target = trace_by_id.get(tc_id) if tc_id else None
1610                      if target is not None:
1611                          target.update(result_meta)
1612                      elif tool_trace:
1613                          # Fallback for messages without tool_call_id
1614                          tool_trace[-1].update(result_meta)
1615  
1616          # Determine exit reason
1617          if interrupted:
1618              exit_reason = "interrupted"
1619          elif completed:
1620              exit_reason = "completed"
1621          else:
1622              exit_reason = "max_iterations"
1623  
1624          # Extract token counts (safe for mock objects)
1625          _input_tokens = getattr(child, "session_prompt_tokens", 0)
1626          _output_tokens = getattr(child, "session_completion_tokens", 0)
1627          _model = getattr(child, "model", None)
1628  
1629          entry: Dict[str, Any] = {
1630              "task_index": task_index,
1631              "status": status,
1632              "summary": summary,
1633              "api_calls": api_calls,
1634              "duration_seconds": duration,
1635              "model": _model if isinstance(_model, str) else None,
1636              "exit_reason": exit_reason,
1637              "tokens": {
1638                  "input": (
1639                      _input_tokens if isinstance(_input_tokens, (int, float)) else 0
1640                  ),
1641                  "output": (
1642                      _output_tokens if isinstance(_output_tokens, (int, float)) else 0
1643                  ),
1644              },
1645              "tool_trace": tool_trace,
1646              # Captured before the finally block calls child.close() so the
1647              # parent thread can fire subagent_stop with the correct role.
1648              # Stripped before the dict is serialised back to the model.
1649              "_child_role": getattr(child, "_delegate_role", None),
1650              # Captured before child.close() so the parent aggregator can fold
1651              # the child's total spend into the parent's session cost.  Port of
1652              # Kilo-Org/kilocode#9448 — previously the footer only reflected the
1653              # parent's direct API calls and under-counted subagent-heavy runs.
1654              # Stripped before the dict is serialised back to the model.
1655              "_child_cost_usd": (
1656                  float(getattr(child, "session_estimated_cost_usd", 0.0) or 0.0)
1657                  if isinstance(
1658                      getattr(child, "session_estimated_cost_usd", 0.0),
1659                      (int, float),
1660                  )
1661                  else 0.0
1662              ),
1663          }
1664          if status == "failed":
1665              entry["error"] = result.get("error", "Subagent did not produce a response.")
1666  
1667          # Cross-agent file-state reminder.  If this subagent wrote any
1668          # files the parent had already read, surface it so the parent
1669          # knows to re-read before editing — the scenario that motivated
1670          # the registry.  We check writes by ANY non-parent task_id (not
1671          # just this child's), which also covers transitive writes from
1672          # nested orchestrator→worker chains.
1673          try:
1674              if parent_task_id and parent_reads_snapshot:
1675                  sibling_writes = file_state.writes_since(
1676                      parent_task_id, wall_start, parent_reads_snapshot
1677                  )
1678                  if sibling_writes:
1679                      mod_paths = sorted(
1680                          {p for paths in sibling_writes.values() for p in paths}
1681                      )
1682                      if mod_paths:
1683                          reminder = (
1684                              "\n\n[NOTE: subagent modified files the parent "
1685                              "previously read — re-read before editing: "
1686                              + ", ".join(mod_paths[:8])
1687                              + (
1688                                  f" (+{len(mod_paths) - 8} more)"
1689                                  if len(mod_paths) > 8
1690                                  else ""
1691                              )
1692                              + "]"
1693                          )
1694                          if entry.get("summary"):
1695                              entry["summary"] = entry["summary"] + reminder
1696                          else:
1697                              entry["stale_paths"] = mod_paths
1698          except Exception:
1699              logger.debug("file_state sibling-write check failed", exc_info=True)
1700  
1701          # Per-branch observability payload: tokens, cost, files touched, and
1702          # a tail of tool-call results.  Fed into the TUI's overlay detail
1703          # pane + accordion rollups (features 1, 2, 4).  All fields are
1704          # optional — missing data degrades gracefully on the client.
1705          _cost_usd = getattr(child, "session_estimated_cost_usd", None)
1706          _reasoning_tokens = getattr(child, "session_reasoning_tokens", 0)
1707          try:
1708              _files_read = list(file_state.known_reads(child_task_id))[:40]
1709          except Exception:
1710              _files_read = []
1711          try:
1712              _files_written_map = file_state.writes_since(
1713                  "", wall_start, []
1714              )  # all writes since wall_start
1715          except Exception:
1716              _files_written_map = {}
1717          _files_written = sorted(
1718              {
1719                  p
1720                  for tid, paths in _files_written_map.items()
1721                  if tid == child_task_id
1722                  for p in paths
1723              }
1724          )[:40]
1725  
1726          _output_tail = _extract_output_tail(result, max_entries=8, max_chars=600)
1727  
1728          complete_kwargs: Dict[str, Any] = {
1729              "preview": summary[:160] if summary else entry.get("error", ""),
1730              "status": status,
1731              "duration_seconds": duration,
1732              "summary": summary[:500] if summary else entry.get("error", ""),
1733              "input_tokens": (
1734                  int(_input_tokens) if isinstance(_input_tokens, (int, float)) else 0
1735              ),
1736              "output_tokens": (
1737                  int(_output_tokens) if isinstance(_output_tokens, (int, float)) else 0
1738              ),
1739              "reasoning_tokens": (
1740                  int(_reasoning_tokens)
1741                  if isinstance(_reasoning_tokens, (int, float))
1742                  else 0
1743              ),
1744              "api_calls": int(api_calls) if isinstance(api_calls, (int, float)) else 0,
1745              "files_read": _files_read,
1746              "files_written": _files_written,
1747              "output_tail": _output_tail,
1748          }
1749          if _cost_usd is not None:
1750              try:
1751                  complete_kwargs["cost_usd"] = float(_cost_usd)
1752              except (TypeError, ValueError):
1753                  pass
1754  
1755          if child_progress_cb:
1756              try:
1757                  child_progress_cb("subagent.complete", **complete_kwargs)
1758              except Exception as e:
1759                  logger.debug("Progress callback completion failed: %s", e)
1760  
1761          return entry
1762  
1763      except Exception as exc:
1764          duration = round(time.monotonic() - child_start, 2)
1765          logging.exception(f"[subagent-{task_index}] failed")
1766          if child_progress_cb:
1767              try:
1768                  child_progress_cb(
1769                      "subagent.complete",
1770                      preview=str(exc),
1771                      status="failed",
1772                      duration_seconds=duration,
1773                      summary=str(exc),
1774                  )
1775              except Exception as e:
1776                  logger.debug("Progress callback failure relay failed: %s", e)
1777          return {
1778              "task_index": task_index,
1779              "status": "error",
1780              "summary": None,
1781              "error": str(exc),
1782              "api_calls": 0,
1783              "duration_seconds": duration,
1784              "_child_role": getattr(child, "_delegate_role", None),
1785          }
1786  
1787      finally:
1788          # Stop the heartbeat thread so it doesn't keep touching parent activity
1789          # after the child has finished (or failed).
1790          _heartbeat_stop.set()
1791          _heartbeat_thread.join(timeout=5)
1792  
1793          # Drop the TUI-facing registry entry.  Safe to call even if the
1794          # child was never registered (e.g. ID missing on test doubles).
1795          if _subagent_id:
1796              _unregister_subagent(_subagent_id)
1797  
1798          if child_pool is not None and leased_cred_id is not None:
1799              try:
1800                  child_pool.release_lease(leased_cred_id)
1801              except Exception as exc:
1802                  logger.debug("Failed to release credential lease: %s", exc)
1803  
1804          # Restore the parent's tool names so the process-global is correct
1805          # for any subsequent execute_code calls or other consumers.
1806          import model_tools
1807  
1808          saved_tool_names = getattr(child, "_delegate_saved_tool_names", None)
1809          if isinstance(saved_tool_names, list):
1810              model_tools._last_resolved_tool_names = list(saved_tool_names)
1811  
1812          # Remove child from active tracking
1813  
1814          # Unregister child from interrupt propagation
1815          if hasattr(parent_agent, "_active_children"):
1816              try:
1817                  lock = getattr(parent_agent, "_active_children_lock", None)
1818                  if lock:
1819                      with lock:
1820                          parent_agent._active_children.remove(child)
1821                  else:
1822                      parent_agent._active_children.remove(child)
1823              except (ValueError, UnboundLocalError) as e:
1824                  logger.debug("Could not remove child from active_children: %s", e)
1825  
1826          # Close tool resources (terminal sandboxes, browser daemons,
1827          # background processes, httpx clients) so subagent subprocesses
1828          # don't outlive the delegation.
1829          try:
1830              if hasattr(child, "close"):
1831                  child.close()
1832          except Exception:
1833              logger.debug("Failed to close child agent after delegation")
1834  
1835  
1836  def delegate_task(
1837      goal: Optional[str] = None,
1838      context: Optional[str] = None,
1839      toolsets: Optional[List[str]] = None,
1840      tasks: Optional[List[Dict[str, Any]]] = None,
1841      max_iterations: Optional[int] = None,
1842      acp_command: Optional[str] = None,
1843      acp_args: Optional[List[str]] = None,
1844      role: Optional[str] = None,
1845      parent_agent=None,
1846  ) -> str:
1847      """
1848      Spawn one or more child agents to handle delegated tasks.
1849  
1850      Supports two modes:
1851        - Single: provide goal (+ optional context, toolsets, role)
1852        - Batch:  provide tasks array [{goal, context, toolsets, role}, ...]
1853  
1854      The 'role' parameter controls whether a child can further delegate:
1855      'leaf' (default) cannot; 'orchestrator' retains the delegation
1856      toolset and can spawn its own workers, bounded by
1857      delegation.max_spawn_depth.  Per-task role beats the top-level one.
1858  
1859      Returns JSON with results array, one entry per task.
1860      """
1861      if parent_agent is None:
1862          return tool_error("delegate_task requires a parent agent context.")
1863  
1864      # Operator-controlled kill switch — lets the TUI freeze new fan-out
1865      # when a runaway tree is detected, without interrupting already-running
1866      # children.  Cleared via the matching `delegation.pause` RPC.
1867      if is_spawn_paused():
1868          return tool_error(
1869              "Delegation spawning is paused. Clear the pause via the TUI "
1870              "(`p` in /agents) or the `delegation.pause` RPC before retrying."
1871          )
1872  
1873      # Normalise the top-level role once; per-task overrides re-normalise.
1874      top_role = _normalize_role(role)
1875  
1876      # Depth limit — configurable via delegation.max_spawn_depth,
1877      # default 2 for parity with the original MAX_DEPTH constant.
1878      depth = getattr(parent_agent, "_delegate_depth", 0)
1879      max_spawn = _get_max_spawn_depth()
1880      if depth >= max_spawn:
1881          return json.dumps(
1882              {
1883                  "error": (
1884                      f"Delegation depth limit reached (depth={depth}, "
1885                      f"max_spawn_depth={max_spawn}). Raise "
1886                      f"delegation.max_spawn_depth in config.yaml if deeper "
1887                      f"nesting is required (cap: {_MAX_SPAWN_DEPTH_CAP})."
1888                  )
1889              }
1890          )
1891  
1892      # Load config
1893      cfg = _load_config()
1894      default_max_iter = cfg.get("max_iterations", DEFAULT_MAX_ITERATIONS)
1895      # Model-supplied max_iterations is ignored — the config value is authoritative
1896      # so users get predictable budgets. The kwarg is retained for internal callers
1897      # and tests; a model-emitted value here would only shrink the budget and
1898      # surprise the user mid-run. Log and drop it if one slips through from a
1899      # cached tool schema or a stale provider.
1900      if max_iterations is not None and max_iterations != default_max_iter:
1901          logger.debug(
1902              "delegate_task: ignoring caller-supplied max_iterations=%s; "
1903              "using delegation.max_iterations=%s from config",
1904              max_iterations, default_max_iter,
1905          )
1906      effective_max_iter = default_max_iter
1907  
1908      # Resolve delegation credentials (provider:model pair).
1909      # When delegation.provider is configured, this resolves the full credential
1910      # bundle (base_url, api_key, api_mode) via the same runtime provider system
1911      # used by CLI/gateway startup.  When unconfigured, returns None values so
1912      # children inherit from the parent.
1913      try:
1914          creds = _resolve_delegation_credentials(cfg, parent_agent)
1915      except ValueError as exc:
1916          return tool_error(str(exc))
1917  
1918      # Normalize to task list
1919      max_children = _get_max_concurrent_children()
1920      if tasks and isinstance(tasks, list):
1921          if len(tasks) > max_children:
1922              return tool_error(
1923                  f"Too many tasks: {len(tasks)} provided, but "
1924                  f"max_concurrent_children is {max_children}. "
1925                  f"Either reduce the task count, split into multiple "
1926                  f"delegate_task calls, or increase "
1927                  f"delegation.max_concurrent_children in config.yaml."
1928              )
1929          task_list = tasks
1930      elif goal and isinstance(goal, str) and goal.strip():
1931          task_list = [
1932              {"goal": goal, "context": context, "toolsets": toolsets, "role": top_role}
1933          ]
1934      else:
1935          return tool_error("Provide either 'goal' (single task) or 'tasks' (batch).")
1936  
1937      if not task_list:
1938          return tool_error("No tasks provided.")
1939  
1940      # Validate each task has a goal
1941      for i, task in enumerate(task_list):
1942          if not task.get("goal", "").strip():
1943              return tool_error(f"Task {i} is missing a 'goal'.")
1944  
1945      overall_start = time.monotonic()
1946      results = []
1947  
1948      n_tasks = len(task_list)
1949      # Track goal labels for progress display (truncated for readability)
1950      task_labels = [t["goal"][:40] for t in task_list]
1951  
1952      # Save parent tool names BEFORE any child construction mutates the global.
1953      # _build_child_agent() calls AIAgent() which calls get_tool_definitions(),
1954      # which overwrites model_tools._last_resolved_tool_names with child's toolset.
1955      import model_tools as _model_tools
1956  
1957      _parent_tool_names = list(_model_tools._last_resolved_tool_names)
1958  
1959      # Build all child agents on the main thread (thread-safe construction)
1960      # Wrapped in try/finally so the global is always restored even if a
1961      # child build raises (otherwise _last_resolved_tool_names stays corrupted).
1962      children = []
1963      try:
1964          for i, t in enumerate(task_list):
1965              task_acp_args = t.get("acp_args") if "acp_args" in t else None
1966              # Per-task role beats top-level; normalise again so unknown
1967              # per-task values warn and degrade to leaf uniformly.
1968              effective_role = _normalize_role(t.get("role") or top_role)
1969              child = _build_child_agent(
1970                  task_index=i,
1971                  goal=t["goal"],
1972                  context=t.get("context"),
1973                  toolsets=t.get("toolsets") or toolsets,
1974                  model=creds["model"],
1975                  max_iterations=effective_max_iter,
1976                  task_count=n_tasks,
1977                  parent_agent=parent_agent,
1978                  override_provider=creds["provider"],
1979                  override_base_url=creds["base_url"],
1980                  override_api_key=creds["api_key"],
1981                  override_api_mode=creds["api_mode"],
1982                  override_acp_command=t.get("acp_command")
1983                  or acp_command
1984                  or creds.get("command"),
1985                  override_acp_args=(
1986                      task_acp_args
1987                      if task_acp_args is not None
1988                      else (acp_args if acp_args is not None else creds.get("args"))
1989                  ),
1990                  role=effective_role,
1991              )
1992              # Override with correct parent tool names (before child construction mutated global)
1993              child._delegate_saved_tool_names = _parent_tool_names
1994              children.append((i, t, child))
1995      finally:
1996          # Authoritative restore: reset global to parent's tool names after all children built
1997          _model_tools._last_resolved_tool_names = _parent_tool_names
1998  
1999      if n_tasks == 1:
2000          # Single task -- run directly (no thread pool overhead)
2001          _i, _t, child = children[0]
2002          result = _run_single_child(0, _t["goal"], child, parent_agent)
2003          results.append(result)
2004      else:
2005          # Batch -- run in parallel with per-task progress lines
2006          completed_count = 0
2007          spinner_ref = getattr(parent_agent, "_delegate_spinner", None)
2008  
2009          with ThreadPoolExecutor(max_workers=max_children) as executor:
2010              futures = {}
2011              for i, t, child in children:
2012                  future = executor.submit(
2013                      _run_single_child,
2014                      task_index=i,
2015                      goal=t["goal"],
2016                      child=child,
2017                      parent_agent=parent_agent,
2018                  )
2019                  futures[future] = i
2020  
2021              # Poll futures with interrupt checking.  as_completed() blocks
2022              # until ALL futures finish — if a child agent gets stuck,
2023              # the parent blocks forever even after interrupt propagation.
2024              # Instead, use wait() with a short timeout so we can bail
2025              # when the parent is interrupted.
2026              # Map task_index -> child agent, so fabricated entries for
2027              # still-pending futures can carry the correct _delegate_role.
2028              _child_by_index = {i: child for (i, _, child) in children}
2029  
2030              pending = set(futures.keys())
2031              while pending:
2032                  if getattr(parent_agent, "_interrupt_requested", False) is True:
2033                      # Parent interrupted — collect whatever finished and
2034                      # abandon the rest.  Children already received the
2035                      # interrupt signal; we just can't wait forever.
2036                      for f in pending:
2037                          idx = futures[f]
2038                          if f.done():
2039                              try:
2040                                  entry = f.result()
2041                              except Exception as exc:
2042                                  entry = {
2043                                      "task_index": idx,
2044                                      "status": "error",
2045                                      "summary": None,
2046                                      "error": str(exc),
2047                                      "api_calls": 0,
2048                                      "duration_seconds": 0,
2049                                      "_child_role": getattr(
2050                                          _child_by_index.get(idx), "_delegate_role", None
2051                                      ),
2052                                  }
2053                          else:
2054                              entry = {
2055                                  "task_index": idx,
2056                                  "status": "interrupted",
2057                                  "summary": None,
2058                                  "error": "Parent agent interrupted — child did not finish in time",
2059                                  "api_calls": 0,
2060                                  "duration_seconds": 0,
2061                                  "_child_role": getattr(
2062                                      _child_by_index.get(idx), "_delegate_role", None
2063                                  ),
2064                              }
2065                          results.append(entry)
2066                          completed_count += 1
2067                      break
2068  
2069                  from concurrent.futures import wait as _cf_wait, FIRST_COMPLETED
2070  
2071                  done, pending = _cf_wait(
2072                      pending, timeout=0.5, return_when=FIRST_COMPLETED
2073                  )
2074                  for future in done:
2075                      try:
2076                          entry = future.result()
2077                      except Exception as exc:
2078                          idx = futures[future]
2079                          entry = {
2080                              "task_index": idx,
2081                              "status": "error",
2082                              "summary": None,
2083                              "error": str(exc),
2084                              "api_calls": 0,
2085                              "duration_seconds": 0,
2086                              "_child_role": getattr(
2087                                  _child_by_index.get(idx), "_delegate_role", None
2088                              ),
2089                          }
2090                      results.append(entry)
2091                      completed_count += 1
2092  
2093                      # Print per-task completion line above the spinner
2094                      idx = entry["task_index"]
2095                      label = (
2096                          task_labels[idx] if idx < len(task_labels) else f"Task {idx}"
2097                      )
2098                      dur = entry.get("duration_seconds", 0)
2099                      status = entry.get("status", "?")
2100                      icon = "✓" if status == "completed" else "✗"
2101                      remaining = n_tasks - completed_count
2102                      completion_line = f"{icon} [{idx+1}/{n_tasks}] {label}  ({dur}s)"
2103                      if spinner_ref:
2104                          try:
2105                              spinner_ref.print_above(completion_line)
2106                          except Exception:
2107                              print(f"  {completion_line}")
2108                      else:
2109                          print(f"  {completion_line}")
2110  
2111                      # Update spinner text to show remaining count
2112                      if spinner_ref and remaining > 0:
2113                          try:
2114                              spinner_ref.update_text(
2115                                  f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining"
2116                              )
2117                          except Exception as e:
2118                              logger.debug("Spinner update_text failed: %s", e)
2119  
2120          # Sort by task_index so results match input order
2121          results.sort(key=lambda r: r["task_index"])
2122  
2123      # Notify parent's memory provider of delegation outcomes
2124      if (
2125          parent_agent
2126          and hasattr(parent_agent, "_memory_manager")
2127          and parent_agent._memory_manager
2128      ):
2129          for entry in results:
2130              try:
2131                  _task_goal = (
2132                      task_list[entry["task_index"]]["goal"]
2133                      if entry["task_index"] < len(task_list)
2134                      else ""
2135                  )
2136                  parent_agent._memory_manager.on_delegation(
2137                      task=_task_goal,
2138                      result=entry.get("summary", "") or "",
2139                      child_session_id=(
2140                          getattr(children[entry["task_index"]][2], "session_id", "")
2141                          if entry["task_index"] < len(children)
2142                          else ""
2143                      ),
2144                  )
2145              except Exception:
2146                  pass
2147  
2148      # Fire subagent_stop hooks once per child, serialised on the parent thread.
2149      # This keeps Python-plugin and shell-hook callbacks off of the worker threads
2150      # that ran the children, so hook authors don't need to reason about
2151      # concurrent invocation.  Role was captured into the entry dict in
2152      # _run_single_child (or the fabricated-entry branches above) before the
2153      # child was closed.
2154      _parent_session_id = getattr(parent_agent, "session_id", None)
2155      try:
2156          from hermes_cli.plugins import invoke_hook as _invoke_hook
2157      except Exception:
2158          _invoke_hook = None
2159      # Aggregate child spend here so the parent's footer/UI reflect the true
2160      # cost of a subagent-heavy turn.  Port of Kilo-Org/kilocode#9448.  Each
2161      # child's cost was captured in _run_single_child before its AIAgent was
2162      # closed; we fold them into the parent in one pass alongside the
2163      # subagent_stop hook loop so we don't walk `results` twice.
2164      _children_cost_total = 0.0
2165      for entry in results:
2166          child_role = entry.pop("_child_role", None)
2167          child_cost = entry.pop("_child_cost_usd", 0.0)
2168          try:
2169              if child_cost:
2170                  _children_cost_total += float(child_cost)
2171          except (TypeError, ValueError):
2172              pass
2173          if _invoke_hook is None:
2174              continue
2175          try:
2176              _invoke_hook(
2177                  "subagent_stop",
2178                  parent_session_id=_parent_session_id,
2179                  child_role=child_role,
2180                  child_summary=entry.get("summary"),
2181                  child_status=entry.get("status"),
2182                  duration_ms=int((entry.get("duration_seconds") or 0) * 1000),
2183              )
2184          except Exception:
2185              logger.debug("subagent_stop hook invocation failed", exc_info=True)
2186  
2187      # Fold the aggregated child cost into the parent's session total.  This is
2188      # additive — each delegate_task call contributes its own children — so
2189      # nested orchestrator→worker trees roll up naturally: each layer's own
2190      # delegate_task() folds its direct children in, and when the orchestrator
2191      # itself finishes, its parent folds the orchestrator's now-inflated total
2192      # on top.  Degrades silently if the parent lacks the counter (older test
2193      # fixtures, etc.).
2194      if _children_cost_total > 0.0:
2195          try:
2196              current = float(getattr(parent_agent, "session_estimated_cost_usd", 0.0) or 0.0)
2197              parent_agent.session_estimated_cost_usd = current + _children_cost_total
2198              # Upgrade the cost_source so the UI doesn't label a partially-real
2199              # total as "none" when the parent itself hadn't billed any calls
2200              # yet (rare but possible when the parent's only action this turn
2201              # was delegate_task).
2202              if getattr(parent_agent, "session_cost_source", "none") in (None, "", "none"):
2203                  parent_agent.session_cost_source = "subagent"
2204              if getattr(parent_agent, "session_cost_status", "unknown") in (None, "", "unknown"):
2205                  parent_agent.session_cost_status = "estimated"
2206          except Exception:
2207              logger.debug("Subagent cost rollup failed", exc_info=True)
2208  
2209      total_duration = round(time.monotonic() - overall_start, 2)
2210  
2211      return json.dumps(
2212          {
2213              "results": results,
2214              "total_duration_seconds": total_duration,
2215          },
2216          ensure_ascii=False,
2217      )
2218  
2219  
2220  def _resolve_child_credential_pool(effective_provider: Optional[str], parent_agent):
2221      """Resolve a credential pool for the child agent.
2222  
2223      Rules:
2224      1. Same provider as the parent -> share the parent's pool so cooldown state
2225         and rotation stay synchronized.
2226      2. Different provider -> try to load that provider's own pool.
2227      3. No pool available -> return None and let the child keep the inherited
2228         fixed credential behavior.
2229      """
2230      if not effective_provider:
2231          return getattr(parent_agent, "_credential_pool", None)
2232  
2233      parent_provider = getattr(parent_agent, "provider", None) or ""
2234      parent_pool = getattr(parent_agent, "_credential_pool", None)
2235      if parent_pool is not None and effective_provider == parent_provider:
2236          return parent_pool
2237  
2238      try:
2239          from agent.credential_pool import load_pool
2240  
2241          pool = load_pool(effective_provider)
2242          if pool is not None and pool.has_credentials():
2243              return pool
2244      except Exception as exc:
2245          logger.debug(
2246              "Could not load credential pool for child provider '%s': %s",
2247              effective_provider,
2248              exc,
2249          )
2250      return None
2251  
2252  
2253  def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict:
2254      """Resolve credentials for subagent delegation.
2255  
2256      If ``delegation.base_url`` is configured, subagents use that direct
2257      OpenAI-compatible endpoint. ``delegation.api_key`` overrides the key; when
2258      omitted, ``api_key`` is returned as ``None`` so ``_build_child_agent``
2259      inherits the parent agent's key (``effective_api_key = override_api_key or
2260      parent_api_key``). This lets providers that store their key outside
2261      ``OPENAI_API_KEY`` (e.g. ``MINIMAX_API_KEY``, ``DASHSCOPE_API_KEY``) work
2262      without a duplicate config entry.
2263  
2264      Otherwise, if ``delegation.provider`` is configured, the full credential
2265      bundle (base_url, api_key, api_mode, provider) is resolved via the runtime
2266      provider system — the same path used by CLI/gateway startup. This lets
2267      subagents run on a completely different provider:model pair.
2268  
2269      If neither base_url nor provider is configured, returns None values so the
2270      child inherits everything from the parent agent.
2271  
2272      Raises ValueError with a user-friendly message on credential failure.
2273      """
2274      configured_model = str(cfg.get("model") or "").strip() or None
2275      configured_provider = str(cfg.get("provider") or "").strip() or None
2276      configured_base_url = str(cfg.get("base_url") or "").strip() or None
2277      configured_api_key = str(cfg.get("api_key") or "").strip() or None
2278  
2279      if configured_base_url:
2280          # When delegation.api_key is not set, return None so _build_child_agent
2281          # falls back to the parent agent's API key via the credential inheritance
2282          # path (effective_api_key = override_api_key or parent_api_key). This
2283          # lets providers that store their key in a non-OPENAI_API_KEY env var
2284          # (e.g. MINIMAX_API_KEY, DASHSCOPE_API_KEY) work without requiring
2285          # callers to duplicate the key under delegation.api_key.
2286          api_key = configured_api_key  # None → inherited from parent in _build_child_agent
2287  
2288          base_lower = configured_base_url.lower()
2289          provider = "custom"
2290          api_mode = "chat_completions"
2291          if (
2292              base_url_hostname(configured_base_url) == "chatgpt.com"
2293              and "/backend-api/codex" in base_lower
2294          ):
2295              provider = "openai-codex"
2296              api_mode = "codex_responses"
2297          elif base_url_hostname(configured_base_url) == "api.anthropic.com":
2298              provider = "anthropic"
2299              api_mode = "anthropic_messages"
2300          elif "api.kimi.com/coding" in base_lower:
2301              provider = "custom"
2302              api_mode = "anthropic_messages"
2303  
2304          return {
2305              "model": configured_model,
2306              "provider": provider,
2307              "base_url": configured_base_url,
2308              "api_key": api_key,
2309              "api_mode": api_mode,
2310          }
2311  
2312      if not configured_provider:
2313          # No provider override — child inherits everything from parent
2314          return {
2315              "model": configured_model,
2316              "provider": None,
2317              "base_url": None,
2318              "api_key": None,
2319              "api_mode": None,
2320          }
2321  
2322      # Provider is configured — resolve full credentials
2323      try:
2324          from hermes_cli.runtime_provider import resolve_runtime_provider
2325  
2326          runtime = resolve_runtime_provider(requested=configured_provider, target_model=configured_model)
2327      except Exception as exc:
2328          raise ValueError(
2329              f"Cannot resolve delegation provider '{configured_provider}': {exc}. "
2330              f"Check that the provider is configured (API key set, valid provider name), "
2331              f"or set delegation.base_url/delegation.api_key for a direct endpoint. "
2332              f"Available providers: openrouter, nous, zai, kimi-coding, minimax."
2333          ) from exc
2334  
2335      api_key = runtime.get("api_key", "")
2336      if not api_key:
2337          raise ValueError(
2338              f"Delegation provider '{configured_provider}' resolved but has no API key. "
2339              f"Set the appropriate environment variable or run 'hermes auth'."
2340          )
2341  
2342      return {
2343          "model": configured_model or runtime.get("model") or None,
2344          "provider": runtime.get("provider"),
2345          "base_url": runtime.get("base_url"),
2346          "api_key": api_key,
2347          "api_mode": runtime.get("api_mode"),
2348          "command": runtime.get("command"),
2349          "args": list(runtime.get("args") or []),
2350      }
2351  
2352  
2353  def _load_config() -> dict:
2354      """Load delegation config from CLI_CONFIG or persistent config.
2355  
2356      Checks the runtime config (cli.py CLI_CONFIG) first, then falls back
2357      to the persistent config (hermes_cli/config.py load_config()) so that
2358      ``delegation.model`` / ``delegation.provider`` are picked up regardless
2359      of the entry point (CLI, gateway, cron).
2360      """
2361      try:
2362          from cli import CLI_CONFIG
2363  
2364          cfg = CLI_CONFIG.get("delegation") or {}
2365          if cfg:
2366              return cfg
2367      except Exception:
2368          pass
2369      try:
2370          from hermes_cli.config import load_config
2371  
2372          full = load_config()
2373          return full.get("delegation") or {}
2374      except Exception:
2375          return {}
2376  
2377  
2378  # ---------------------------------------------------------------------------
2379  # OpenAI Function-Calling Schema
2380  # ---------------------------------------------------------------------------
2381  
2382  DELEGATE_TASK_SCHEMA = {
2383      "name": "delegate_task",
2384      "description": (
2385          "Spawn one or more subagents to work on tasks in isolated contexts. "
2386          "Each subagent gets its own conversation, terminal session, and toolset. "
2387          "Only the final summary is returned -- intermediate tool results "
2388          "never enter your context window.\n\n"
2389          "TWO MODES (one of 'goal' or 'tasks' is required):\n"
2390          "1. Single task: provide 'goal' (+ optional context, toolsets)\n"
2391          "2. Batch (parallel): provide 'tasks' array with up to delegation.max_concurrent_children items (default 3, configurable via config.yaml, no hard ceiling). "
2392          "All run concurrently and results are returned together. Nested delegation requires role='orchestrator' and delegation.max_spawn_depth >= 2.\n\n"
2393          "WHEN TO USE delegate_task:\n"
2394          "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
2395          "- Tasks that would flood your context with intermediate data\n"
2396          "- Parallel independent workstreams (research A and B simultaneously)\n\n"
2397          "WHEN NOT TO USE (use these instead):\n"
2398          "- Mechanical multi-step work with no reasoning needed -> use execute_code\n"
2399          "- Single tool call -> just call the tool directly\n"
2400          "- Tasks needing user interaction -> subagents cannot use clarify\n"
2401          "- Durable long-running work that must outlive the current turn -> "
2402          "use cronjob (action='create') or terminal(background=True, "
2403          "notify_on_complete=True) instead. delegate_task runs SYNCHRONOUSLY "
2404          "inside the parent turn: if the parent is interrupted (user sends a "
2405          "new message, /stop, /new) the child is cancelled with status="
2406          "'interrupted' and its work is discarded. Children cannot continue "
2407          "in the background.\n\n"
2408          "IMPORTANT:\n"
2409          "- Subagents have NO memory of your conversation. Pass all relevant "
2410          "info (file paths, error messages, constraints) via the 'context' field.\n"
2411          "- If the user is writing in a non-English language, or asked for "
2412          "output in a specific language / tone / style, say so in 'context' "
2413          "(e.g. \"respond in Chinese\", \"return output in Japanese\"). "
2414          "Otherwise subagents default to English and their summaries will "
2415          "contaminate your final reply with the wrong language.\n"
2416          "- Subagent summaries are SELF-REPORTS, not verified facts. A subagent "
2417          "that claims \"uploaded successfully\" or \"file written\" may be wrong. "
2418          "For operations with external side-effects (HTTP POST/PUT, remote "
2419          "writes, file creation at shared paths, publishing), require the "
2420          "subagent to return a verifiable handle (URL, ID, absolute path, HTTP "
2421          "status) and verify it yourself — fetch the URL, stat the file, read "
2422          "back the content — before telling the user the operation succeeded.\n"
2423          "- Leaf subagents (role='leaf', the default) CANNOT call: "
2424          "delegate_task, clarify, memory, send_message, execute_code.\n"
2425          "- Orchestrator subagents (role='orchestrator') retain "
2426          "delegate_task so they can spawn their own workers, but still "
2427          "cannot use clarify, memory, send_message, or execute_code. "
2428          "Orchestrators are bounded by delegation.max_spawn_depth "
2429          "(default 2) and can be disabled globally via "
2430          "delegation.orchestrator_enabled=false.\n"
2431          "- Each subagent gets its own terminal session (separate working directory and state).\n"
2432          "- Results are always returned as an array, one entry per task."
2433      ),
2434      "parameters": {
2435          "type": "object",
2436          "properties": {
2437              "goal": {
2438                  "type": "string",
2439                  "description": (
2440                      "What the subagent should accomplish. Be specific and "
2441                      "self-contained -- the subagent knows nothing about your "
2442                      "conversation history."
2443                  ),
2444              },
2445              "context": {
2446                  "type": "string",
2447                  "description": (
2448                      "Background information the subagent needs: file paths, "
2449                      "error messages, project structure, constraints. The more "
2450                      "specific you are, the better the subagent performs."
2451                  ),
2452              },
2453              "toolsets": {
2454                  "type": "array",
2455                  "items": {"type": "string"},
2456                  "description": (
2457                      "Toolsets to enable for this subagent. "
2458                      "Default: inherits your enabled toolsets. "
2459                      f"Available toolsets: {_TOOLSET_LIST_STR}. "
2460                      "Common patterns: ['terminal', 'file'] for code work, "
2461                      "['web'] for research, ['browser'] for web interaction, "
2462                      "['terminal', 'file', 'web'] for full-stack tasks."
2463                  ),
2464              },
2465              "tasks": {
2466                  "type": "array",
2467                  "items": {
2468                      "type": "object",
2469                      "properties": {
2470                          "goal": {"type": "string", "description": "Task goal"},
2471                          "context": {
2472                              "type": "string",
2473                              "description": "Task-specific context",
2474                          },
2475                          "toolsets": {
2476                              "type": "array",
2477                              "items": {"type": "string"},
2478                              "description": f"Toolsets for this specific task. Available: {_TOOLSET_LIST_STR}. Use 'web' for network access, 'terminal' for shell, 'browser' for web interaction.",
2479                          },
2480                          "acp_command": {
2481                              "type": "string",
2482                              "description": "Per-task ACP command override (e.g. 'claude'). Overrides the top-level acp_command for this task only.",
2483                          },
2484                          "acp_args": {
2485                              "type": "array",
2486                              "items": {"type": "string"},
2487                              "description": "Per-task ACP args override.",
2488                          },
2489                          "role": {
2490                              "type": "string",
2491                              "enum": ["leaf", "orchestrator"],
2492                              "description": "Per-task role override. See top-level 'role' for semantics.",
2493                          },
2494                      },
2495                      "required": ["goal"],
2496                  },
2497                  # No maxItems — the runtime limit is configurable via
2498                  # delegation.max_concurrent_children (default 3) and
2499                  # enforced with a clear error in delegate_task().
2500                  "description": (
2501                      "Batch mode: tasks to run in parallel (limit configurable via delegation.max_concurrent_children, default 3). Each gets "
2502                      "its own subagent with isolated context and terminal session. "
2503                      "When provided, top-level goal/context/toolsets are ignored."
2504                  ),
2505              },
2506              "role": {
2507                  "type": "string",
2508                  "enum": ["leaf", "orchestrator"],
2509                  "description": (
2510                      "Role of the child agent. 'leaf' (default) = focused "
2511                      "worker, cannot delegate further. 'orchestrator' = can "
2512                      "use delegate_task to spawn its own workers. Requires "
2513                      "delegation.max_spawn_depth >= 2 in config; ignored "
2514                      "(treated as 'leaf') when the child would exceed "
2515                      "max_spawn_depth or when "
2516                      "delegation.orchestrator_enabled=false."
2517                  ),
2518              },
2519              "acp_command": {
2520                  "type": "string",
2521                  "description": (
2522                      "Override ACP command for child agents (e.g. 'claude', 'copilot'). "
2523                      "When set, children use ACP subprocess transport instead of inheriting "
2524                      "the parent's transport. Enables spawning Claude Code (claude --acp --stdio) "
2525                      "or other ACP-capable agents from any parent, including Discord/Telegram/CLI."
2526                  ),
2527              },
2528              "acp_args": {
2529                  "type": "array",
2530                  "items": {"type": "string"},
2531                  "description": (
2532                      "Arguments for the ACP command (default: ['--acp', '--stdio']). "
2533                      "Only used when acp_command is set. Example: ['--acp', '--stdio', '--model', 'claude-opus-4-6']"
2534                  ),
2535              },
2536          },
2537          "required": [],
2538      },
2539  }
2540  
2541  
2542  # --- Registry ---
2543  from tools.registry import registry, tool_error
2544  
2545  registry.register(
2546      name="delegate_task",
2547      toolset="delegation",
2548      schema=DELEGATE_TASK_SCHEMA,
2549      handler=lambda args, **kw: delegate_task(
2550          goal=args.get("goal"),
2551          context=args.get("context"),
2552          toolsets=args.get("toolsets"),
2553          tasks=args.get("tasks"),
2554          max_iterations=args.get("max_iterations"),
2555          acp_command=args.get("acp_command"),
2556          acp_args=args.get("acp_args"),
2557          role=args.get("role"),
2558          parent_agent=kw.get("parent_agent"),
2559      ),
2560      check_fn=check_delegate_requirements,
2561      emoji="🔀",
2562  )