agent_core.py
1 """ 2 Core agent implementation for Ag3ntum. 3 4 This module contains the main agent execution logic using the Claude Agent SDK. 5 """ 6 import asyncio 7 import json 8 import logging 9 import os 10 from dataclasses import asdict 11 from datetime import datetime 12 from pathlib import Path 13 from typing import Any, Optional, Union 14 15 import uuid as uuid_mod 16 17 from claude_agent_sdk import ( 18 ClaudeAgentOptions, 19 ClaudeSDKClient, 20 HookMatcher, 21 ResultMessage, 22 SystemMessage, 23 ) 24 from .prompt_manager import get_prompt_manager 25 from .prompt_engine import PromptTemplateEngine, PromptContext 26 27 # Import paths from central config 28 from ..config import ( 29 AGENT_DIR, 30 PROMPTS_DIR, 31 LOGS_DIR, 32 SESSIONS_DIR, 33 SKILLS_DIR, 34 USERS_DIR, 35 load_sandboxed_envs, 36 ) 37 import shutil 38 from .exceptions import ( 39 AgentError, 40 MaxTurnsExceededError, 41 ServerError, 42 SessionIncompleteError, 43 ) 44 from .schemas import ( 45 AgentConfig, 46 AgentResult, 47 Checkpoint, 48 CheckpointType, 49 LLMMetrics, 50 SessionContext, 51 TaskStatus, 52 TokenUsage, 53 ) 54 from .sessions import SessionManager 55 from .skills import SkillManager, discover_merged_skills 56 from .skill_tools import SkillToolsManager 57 from .tracer import ExecutionTracer, TracerBase, NullTracer 58 from .trace_processor import TraceProcessor 59 from .permissions import ( 60 create_permission_callback, 61 PermissionDenialTracker, 62 ) 63 from .permission_profiles import PermissionManager 64 from .sandbox import SandboxConfig, SandboxExecutor, SandboxMount 65 from .subagent_manager import get_subagent_manager 66 from .checkpoint_tracker import CheckpointTracker 67 from .hooks import create_pre_compact_hook 68 from .structured_output import parse_structured_output 69 70 # Ensure tools directory is in sys.path for ag3ntum imports 71 import sys 72 _tools_dir = str(AGENT_DIR / "tools") 73 if _tools_dir not in sys.path: 74 sys.path.insert(0, _tools_dir) 75 76 # Import Ag3ntum MCP tools - these are REQUIRED for Ag3ntum to function 77 # If these imports fail, the application should fail fast with a clear error 78 from tools.ag3ntum import ( 79 create_ag3ntum_tools_mcp_server, 80 AG3NTUM_BASH_TOOL, 81 ) 82 83 # Import PathValidator configuration functions 84 from .path_validator import ( 85 configure_path_validator, 86 cleanup_path_validator, 87 set_session_linux_uid, 88 ) 89 90 # Import LLM proxy config for non-Anthropic model routing 91 from ..api.llm_proxy.config import load_llm_proxy_config, ProxyConfigError 92 93 logger = logging.getLogger(__name__) 94 95 96 def determine_session_status( 97 result_text: Optional[str], 98 had_tool_errors: bool, 99 tool_error_count: int = 0, 100 ) -> str: 101 """ 102 Determine the final session status using a priority chain. 103 104 Priority: 105 1. Agent's self-assessment from structured header (request_status) — primary 106 2. Fallback: default to COMPLETE (agent ran to completion without crash) 107 108 Args: 109 result_text: The agent's final message text (may contain structured header). 110 had_tool_errors: Whether any tool errors occurred during execution. 111 tool_error_count: Number of tool errors (for logging). 112 113 Returns: 114 Status string: "COMPLETE", "PARTIAL", or "FAILED". 115 """ 116 if result_text: 117 header_fields, _ = parse_structured_output(result_text) 118 agent_status = header_fields.get("request_status", "").upper() 119 if agent_status in ("COMPLETE", "PARTIAL", "FAILED"): 120 if had_tool_errors: 121 logger.info( 122 f"Agent self-assessed as {agent_status} despite " 123 f"{tool_error_count} tool error(s)" 124 ) 125 return agent_status 126 127 # No structured header status — fall back to heuristic. 128 # An agent that ran to completion without crash/circuit-breaker/permission-denial 129 # likely completed its task. Default to COMPLETE. 130 if had_tool_errors: 131 logger.warning( 132 f"No agent status header found; " 133 f"{tool_error_count} tool error(s) during execution" 134 ) 135 return "COMPLETE" 136 137 138 def _get_proxy_base_url_for_model( 139 model: str, 140 api_port: int = 40080, 141 session_id: str | None = None, 142 ) -> Optional[str]: 143 """ 144 Check if a model requires routing through the LLM proxy. 145 146 Models defined in config/llm-api-proxy.yaml are routed through the proxy 147 endpoint, which handles format translation for non-Anthropic providers. 148 149 Args: 150 model: The model name (e.g., 'openrouter:openai/gpt-5.2'). 151 api_port: The API server port (default 40080). 152 session_id: Optional session ID for debug file organization. 153 154 Returns: 155 The proxy base URL if the model needs proxy routing, None otherwise. 156 """ 157 try: 158 config = load_llm_proxy_config() 159 except ProxyConfigError as e: 160 logger.debug(f"LLM proxy config not available: {e}") 161 return None 162 163 # Check if model is defined in proxy config 164 if model not in config.models: 165 logger.debug(f"Model '{model}' not in proxy config, using direct Anthropic API") 166 return None 167 168 # Model is defined in proxy config - route through proxy 169 mapping = config.models[model] 170 provider = config.providers.get(mapping.provider) 171 172 if provider is None: 173 logger.warning(f"Model '{model}' references undefined provider '{mapping.provider}'") 174 return None 175 176 # All proxy-defined models go through the proxy, regardless of provider type 177 # The proxy handles routing to the appropriate endpoint 178 # NOTE: SDK appends "/v1/messages" to base_url, so we use /api/llm-proxy (not /api/llm-proxy/v1) 179 # When session_id is provided, embed it in the URL for session-scoped debug output 180 if session_id: 181 proxy_url = f"http://127.0.0.1:{api_port}/api/llm-proxy/s/{session_id}" 182 else: 183 proxy_url = f"http://127.0.0.1:{api_port}/api/llm-proxy" 184 logger.info( 185 f"LLM_PROXY: Model '{model}' → provider '{mapping.provider}' " 186 f"(type={provider.type}) → {proxy_url}" 187 ) 188 return proxy_url 189 190 191 # User prompt template engine (lightweight, for user.md only) 192 _user_prompt_engine = PromptTemplateEngine() 193 194 195 class ClaudeAgent: 196 """ 197 Ag3ntum - Self-Improving Agent. 198 199 Executes tasks using the Claude Agent SDK with configurable 200 tools, prompts, and execution limits. 201 """ 202 203 def __init__( 204 self, 205 config: Optional[AgentConfig] = None, 206 sessions_dir: Optional[Path] = None, 207 logs_dir: Optional[Path] = None, 208 skills_dir: Optional[Path] = None, 209 tracer: Optional[Union[TracerBase, bool]] = True, 210 permission_manager: Optional[PermissionManager] = None, 211 linux_uid: Optional[int] = None, 212 linux_gid: Optional[int] = None, 213 ) -> None: 214 """ 215 Initialize the Claude Agent. 216 217 Args: 218 config: Agent configuration. Uses defaults if not provided. 219 sessions_dir: Directory for sessions. Defaults to AGENT/sessions. 220 logs_dir: Directory for logs. Defaults to AGENT/logs. 221 skills_dir: Directory for skills. Defaults to AGENT/skills. 222 tracer: Execution tracer for console output. 223 - True (default): Use ExecutionTracer with default settings. 224 - False/None: Disable tracing (NullTracer). 225 - TracerBase instance: Use custom tracer. 226 permission_manager: PermissionManager for permission checking. 227 Required - agent will fail without permission profile. 228 linux_uid: Linux UID for privilege dropping during command execution. 229 When set, sandboxed commands will run as this UID instead of the API user. 230 linux_gid: Linux GID for privilege dropping during command execution. 231 When set, sandboxed commands will run with this GID. 232 """ 233 self._config = config or AgentConfig() 234 self._sessions_dir = sessions_dir or SESSIONS_DIR 235 self._logs_dir = logs_dir or LOGS_DIR 236 self._permission_manager = permission_manager 237 self._linux_uid = linux_uid 238 self._linux_gid = linux_gid 239 240 # SECURITY: Validate that permission_mode is None or empty 241 # Setting permission_mode to any value causes SDK to use --permission-prompt-tool stdio 242 # which bypasses can_use_tool callback and all permission checks 243 if self._config.permission_mode not in (None, "", "null"): 244 logger.warning( 245 f"SECURITY WARNING: permission_mode='{self._config.permission_mode}' is set. " 246 f"This will bypass can_use_tool callback and disable permission checks! " 247 f"Set permission_mode to null in config/agent.yaml to enable security." 248 ) 249 raise AgentError( 250 "permission_mode must be null (not 'default', 'acceptEdits', etc). " 251 "Set it to null in agent.yaml to enable proper permission checking via can_use_tool callback." 252 ) 253 254 # Determine skills directory from parameter, config, or default 255 if skills_dir: 256 self._skills_dir = skills_dir 257 elif self._config.skills_dir: 258 self._skills_dir = Path(self._config.skills_dir) 259 else: 260 self._skills_dir = SKILLS_DIR 261 262 self._session_manager = SessionManager(self._sessions_dir) 263 self._skill_manager = SkillManager(self._skills_dir) 264 self._logs_dir.mkdir(parents=True, exist_ok=True) 265 266 # Setup tracer 267 if tracer is True: 268 self._tracer: TracerBase = ExecutionTracer(verbose=True) 269 elif tracer is False or tracer is None: 270 self._tracer = NullTracer() 271 else: 272 self._tracer = tracer 273 274 # Track permission denials for interruption handling 275 self._denial_tracker = PermissionDenialTracker() 276 self._sandbox_system_message: Optional[str] = None 277 278 # Wire tracer to permission manager for profile notifications 279 if self._permission_manager is not None: 280 self._permission_manager.set_tracer(self._tracer) 281 282 @property 283 def config(self) -> AgentConfig: 284 """Get the agent configuration.""" 285 return self._config 286 287 @property 288 def skill_manager(self) -> SkillManager: 289 """Get the skill manager.""" 290 return self._skill_manager 291 292 @property 293 def tracer(self) -> TracerBase: 294 """Get the execution tracer.""" 295 return self._tracer 296 297 def _load_external_mounts_config(self, username: Optional[str] = None) -> dict: 298 """ 299 Load external mounts configuration for template rendering. 300 301 Returns a dict suitable for the mounts template with structure: 302 { 303 "ro": [{"name": "downloads", "description": "..."}], 304 "rw": [{"name": "projects", "description": "..."}], 305 "persistent": True/False 306 } 307 308 Args: 309 username: Optional username for persistent storage check. 310 311 Returns: 312 External mounts configuration dict. 313 """ 314 import yaml 315 316 mounts_config = { 317 "ro": [], 318 "rw": [], 319 "persistent": False, 320 } 321 322 # Load mounts manifest if it exists (auto-generated by run.sh) 323 mounts_file = Path("/auto-generated/auto-generated-mounts.yaml") 324 if mounts_file.exists(): 325 try: 326 with open(mounts_file, "r", encoding="utf-8") as f: 327 manifest = yaml.safe_load(f) or {} 328 329 mounts_data = manifest.get("mounts", {}) 330 331 # Read-only mounts 332 if isinstance(mounts_data.get("ro"), list): 333 for mount in mounts_data["ro"]: 334 if isinstance(mount, dict) and mount.get("name"): 335 mounts_config["ro"].append({ 336 "name": mount["name"], 337 "description": mount.get("description", ""), 338 }) 339 340 # Read-write mounts 341 if isinstance(mounts_data.get("rw"), list): 342 for mount in mounts_data["rw"]: 343 if isinstance(mount, dict) and mount.get("name"): 344 mounts_config["rw"].append({ 345 "name": mount["name"], 346 "description": mount.get("description", ""), 347 }) 348 349 # Log successful mount config loading 350 ro_count = len(mounts_config["ro"]) 351 rw_count = len(mounts_config["rw"]) 352 if ro_count > 0 or rw_count > 0: 353 logger.info( 354 f"Loaded external mounts config: {ro_count} RO, {rw_count} RW" 355 ) 356 else: 357 logger.debug("External mounts manifest exists but contains no mounts") 358 359 except Exception as e: 360 logger.warning(f"Failed to load mounts config: {e}") 361 else: 362 logger.debug(f"No external mounts manifest at {mounts_file}") 363 364 # Check if persistent storage exists for user 365 if username: 366 persistent_dir = Path(f"/users/{username}/ag3ntum/persistent") 367 mounts_config["persistent"] = persistent_dir.exists() 368 369 # Load per-user mounts from external-mounts.yaml 370 try: 371 from ..services.mount_service import get_user_mounts 372 user_mounts = get_user_mounts(username) 373 374 # Add user-specific RO mounts 375 mounts_config["user_ro"] = [ 376 {"name": m["name"], "description": m.get("description", "")} 377 for m in user_mounts.get("ro", []) 378 ] 379 380 # Add user-specific RW mounts 381 mounts_config["user_rw"] = [ 382 {"name": m["name"], "description": m.get("description", "")} 383 for m in user_mounts.get("rw", []) 384 ] 385 386 if mounts_config["user_ro"] or mounts_config["user_rw"]: 387 logger.debug( 388 f"Loaded per-user mounts for '{username}': " 389 f"{len(mounts_config['user_ro'])} RO, {len(mounts_config['user_rw'])} RW" 390 ) 391 except Exception as e: 392 logger.debug(f"No per-user mounts for '{username}': {e}") 393 mounts_config["user_ro"] = [] 394 mounts_config["user_rw"] = [] 395 396 return mounts_config 397 398 def _load_original_path_mounts_for_prompt( 399 self, 400 username: Optional[str] = None, 401 dynamic_mounts: Optional[list] = None, 402 ) -> list[dict]: 403 """ 404 Collect all original-path mount info for the mounts template. 405 406 Gathers host paths from: 407 1. The original_paths config section 408 2. All standard external mounts (which auto-get original-path support) 409 3. Dynamic mounts (which also auto-get original-path support) 410 411 Returns list of {"path": "/var/log", "mode": "ro", "description": "..."}. 412 """ 413 result: list[dict] = [] 414 seen_paths: set[str] = set() 415 416 # 1. Original-path mounts from config 417 if username: 418 try: 419 from ..services.mount_service import get_original_path_mount_service 420 orig_service = get_original_path_mount_service() 421 for mount in orig_service.get_mounts_for_user(username): 422 if mount.path not in seen_paths: 423 seen_paths.add(mount.path) 424 result.append({ 425 "path": mount.path, 426 "mode": mount.mode, 427 "description": mount.description or "", 428 }) 429 except Exception as e: 430 logger.debug(f"No original-path mounts from config: {e}") 431 432 # 2. Standard external mounts (global + per-user) — all have host paths 433 try: 434 from ..services.mount_service import get_all_mounts_with_host_paths 435 ext_mounts = get_all_mounts_with_host_paths(username) 436 for mode in ("ro", "rw"): 437 for m in ext_mounts.get(mode, []): 438 hp = m.get("host_path", "") 439 if hp and hp not in seen_paths: 440 seen_paths.add(hp) 441 result.append({ 442 "path": hp, 443 "mode": mode, 444 "description": m.get("description", ""), 445 }) 446 except Exception as e: 447 logger.debug(f"No external mount host paths for prompt: {e}") 448 449 # 3. Dynamic mounts 450 if dynamic_mounts: 451 for dm in dynamic_mounts: 452 hp = getattr(dm, "host_path", None) or "" 453 if hp and hp not in seen_paths: 454 seen_paths.add(hp) 455 result.append({ 456 "path": hp, 457 "mode": getattr(dm, "mode", "ro"), 458 "description": getattr(dm, "description", ""), 459 }) 460 461 return result 462 463 def _setup_workspace_skills( 464 self, 465 session_id: str, 466 username: Optional[str] = None 467 ) -> None: 468 """ 469 Create merged skills directory with symlinks in workspace. 470 471 Skills are discovered from: 472 1. Global skills: SKILLS_DIR/.claude/skills/ 473 2. User skills: USERS_DIR/<username>/.claude/skills/ 474 475 User skills with the same name override global skills. 476 The SDK discovers skills via setting_sources=["project"] from 477 workspace/.claude/skills/. 478 479 IMPORTANT ARCHITECTURE NOTE: 480 Symlinks must point to paths that work in BOTH Docker and bwrap environments: 481 - MCP tools (Read, Write, Glob, etc.) run in Docker container OUTSIDE bwrap 482 - Bash tool runs INSIDE bwrap sandbox 483 - Both environments now have CONSISTENT mounts (see permissions.yaml): 484 - /skills = ./skills (same in both Docker and bwrap) 485 - /user-skills = per-user skills mount (same in both Docker and bwrap) 486 487 Symlink paths: /skills/.claude/skills/foo, /user-skills/foo 488 These work in both MCP tools and Bash. 489 SECURITY: User skills are per-user mounts to prevent cross-user access. 490 491 Args: 492 session_id: The session ID for workspace access. 493 username: Optional username for user-specific skills. 494 """ 495 if not self._config.enable_skills: 496 return 497 498 workspace_dir = self._session_manager.get_workspace_dir(session_id) 499 skills_target = workspace_dir / ".claude" / "skills" 500 501 # Clean existing and recreate 502 if skills_target.exists(): 503 shutil.rmtree(skills_target) 504 skills_target.mkdir(parents=True, exist_ok=True) 505 506 # NOTE: .claude/ is owned by the API process (UID 45045) with default 507 # umask permissions. It is intentionally NOT world-readable inside bwrap. 508 # The SDK reads skills from the Docker context (outside bwrap), so the 509 # sandbox user does not need access to .claude/ via Bash. 510 511 # Discover merged skills using shared function (global + user, with user overriding) 512 skill_sources = discover_merged_skills(username=username) 513 514 # Paths used to determine skill source type 515 global_skills_base = SKILLS_DIR / ".claude" / "skills" 516 user_skills_base = USERS_DIR / username / ".claude" / "skills" if username else None 517 518 # Create symlinks pointing to DOCKER paths (not bwrap sandbox paths) 519 # MCP tools run outside bwrap and see Docker's filesystem: 520 # - Global skills: /skills/.claude/skills/<skill_name> 521 # - User skills: /user-skills/<skill_name> (mounted from /users/<username>/.claude/skills) 522 for skill_name, source_path in skill_sources.items(): 523 link_path = skills_target / skill_name 524 525 # User skills override global, so check user first 526 if user_skills_base and str(source_path).startswith(str(user_skills_base)): 527 docker_path = Path("/user-skills") / skill_name 528 else: 529 docker_path = Path("/skills") / ".claude" / "skills" / skill_name 530 531 try: 532 link_path.symlink_to(docker_path) 533 logger.debug(f"Linked skill: {skill_name} -> {docker_path} (source: {source_path})") 534 except Exception as e: 535 logger.warning(f"Failed to create skill symlink {skill_name}: {e}") 536 537 skill_names = sorted(skill_sources.keys()) 538 logger.info( 539 f"Refreshed skills ({len(skill_sources)}): {', '.join(skill_names) if skill_names else 'none'} " 540 f"-> {skills_target}" 541 ) 542 543 def _cleanup_session(self, session_id: str, owner_uid: Optional[int] = None) -> None: 544 """ 545 Clean up session resources after agent run completes. 546 547 Removes copied skills from workspace to save disk space. 548 Session metadata is preserved. Also hardens file permissions 549 to ensure session isolation. 550 551 Args: 552 session_id: The session ID to clean up. 553 owner_uid: Optional owner UID for permission hardening. 554 If not provided, gets owner from directory ownership. 555 """ 556 # Remove skills folder from workspace 557 self._session_manager.cleanup_workspace_skills(session_id) 558 559 # Clear session context from permission manager 560 if self._permission_manager is not None: 561 self._permission_manager.clear_session_context() 562 563 # Clean up PathValidator for this session 564 cleanup_path_validator(session_id) 565 566 # SECURITY: Harden session file permissions after agent run 567 # This ensures all files created during execution have proper 700/600 permissions 568 # with owner-only access (true session isolation) 569 try: 570 from .sessions import ensure_secure_session_files 571 session_dir = self._session_manager.get_session_dir(session_id) 572 573 # Get owner_uid from directory if not provided 574 if owner_uid is None: 575 try: 576 stat = session_dir.stat() 577 owner_uid = stat.st_uid 578 except OSError: 579 pass 580 581 ensure_secure_session_files(session_dir, owner_uid) 582 except Exception as e: 583 # Don't fail cleanup on permission hardening failure 584 logger.warning(f"Failed to harden session permissions for {session_id}: {e}") 585 586 def _build_options( 587 self, 588 session_context: SessionContext, 589 system_prompt: str, 590 trace_processor: Optional[Any] = None, 591 resume_id: Optional[str] = None, 592 fork_session: bool = False, 593 username: Optional[str] = None, 594 dynamic_mounts: Optional[list] = None, 595 ssh_context: Optional[Any] = None, 596 ) -> ClaudeAgentOptions: 597 """ 598 Build ClaudeAgentOptions for the SDK. 599 600 Args: 601 session_context: Session context with session_id and related data. 602 system_prompt: System prompt (required, must not be empty). 603 trace_processor: Optional trace processor for permission denial tracking. 604 resume_id: Claude's session ID for resuming conversations (optional). 605 fork_session: If True, fork to new session when resuming (optional). 606 username: Optional username for loading user-specific sandboxed environment variables. 607 dynamic_mounts: List of DynamicMountInfo objects for this session (optional). 608 609 Returns: 610 ClaudeAgentOptions configured for execution. 611 612 Raises: 613 AgentError: If required parameters are missing or invalid. 614 """ 615 # Validate required inputs - fail fast 616 if not system_prompt or not system_prompt.strip(): 617 raise AgentError( 618 "system_prompt is required and must not be empty. " 619 "Load prompts from AGENT/prompts/ before calling _build_options." 620 ) 621 all_tools = list(self._config.allowed_tools) 622 if self._config.enable_skills and "Skill" not in all_tools: 623 all_tools.append("Skill") 624 625 # Permission management: permission manager is required 626 if self._permission_manager is None: 627 raise AgentError( 628 "PermissionManager is required. " 629 "Agent cannot run without permission profile." 630 ) 631 632 # Activate permission profile 633 self._permission_manager.activate() 634 635 # Get tool configuration from active profile 636 permission_checked_tools = self._permission_manager.get_permission_checked_tools() 637 sandbox_disabled_tools = self._permission_manager.get_disabled_tools() 638 639 # Pre-approved tools (no permission check needed) 640 allowed_tools = [ 641 t for t in all_tools 642 if t not in permission_checked_tools and t not in sandbox_disabled_tools 643 ] 644 645 # Available tools (excluding completely disabled ones) 646 available_tools = [ 647 t for t in all_tools 648 if t not in sandbox_disabled_tools 649 ] 650 651 # Disabled tools list for SDK 652 disallowed_tools = list(sandbox_disabled_tools) 653 654 active_profile = self._permission_manager.active_profile 655 logger.info( 656 f"SANDBOX: Using profile '{active_profile.name}' for task execution" 657 ) 658 logger.info(f"SANDBOX: permission_checked_tools={permission_checked_tools}") 659 logger.info(f"SANDBOX: available_tools={available_tools}") 660 logger.info(f"SANDBOX: allowed_tools (pre-approved)={allowed_tools}") 661 logger.info(f"SANDBOX: disallowed_tools (blocked)={disallowed_tools}") 662 663 # Build list of accessible directories from the active profile 664 working_dir = Path(self._config.working_dir) if self._config.working_dir else AGENT_DIR 665 profile_dirs = self._permission_manager.get_allowed_dirs() 666 add_dirs = [] 667 for dir_path in profile_dirs: 668 # Resolve relative paths (e.g., "./input") to absolute paths 669 if dir_path.startswith("./"): 670 add_dirs.append(str(working_dir / dir_path[2:])) 671 elif dir_path.startswith("/"): 672 add_dirs.append(dir_path) 673 else: 674 add_dirs.append(str(working_dir / dir_path)) 675 logger.info(f"SANDBOX: Profile allowed_dirs={add_dirs}") 676 677 # Use workspace subdirectory as cwd to prevent reading session logs 678 # The workspace only contains files the agent should access 679 workspace_dir = self._session_manager.get_workspace_dir( 680 session_context.session_id 681 ) 682 683 # Load sandboxed environment variables (global + user-specific overrides) 684 # These will be available inside the bubblewrap sandbox for Ag3ntumBash commands 685 sandboxed_envs = load_sandboxed_envs(username=username) 686 if sandboxed_envs: 687 logger.info( 688 f"SANDBOX: Loaded {len(sandboxed_envs)} sandboxed env vars for user '{username}': " 689 f"{list(sandboxed_envs.keys())}" 690 ) 691 692 sandbox_config = self._permission_manager.get_sandbox_config( 693 sandboxed_envs=sandboxed_envs 694 ) 695 696 # Per-user mounts and external mounts are now added after PathValidator 697 # mount loading (see "SECURITY: Mount only authorized mounts" block below) 698 # to avoid loading mount data twice and to ensure consistent path formats. 699 700 # Add bwrap mount for persistent Docker path so workspace symlink resolves 701 # The workspace symlink: ./persistent -> /users/{username}/ag3ntum/persistent 702 # Bwrap must mount the Docker path at the SAME path inside the sandbox, 703 # following the same pattern as external mount symlinks. 704 # (The /persistent mount from permissions.yaml still exists as an alias.) 705 if sandbox_config and sandbox_config.enabled and username: 706 persistent_docker_path = f"/users/{username}/ag3ntum/persistent" 707 if Path(persistent_docker_path).exists(): 708 sandbox_config.dynamic_mounts.append(SandboxMount( 709 source=persistent_docker_path, 710 target=persistent_docker_path, # Same path so symlink works! 711 mode="rw", 712 optional=True, 713 )) 714 logger.debug(f"SANDBOX: Added persistent Docker path mount: {persistent_docker_path}") 715 716 self._sandbox_system_message = self._format_sandbox_system_message( 717 sandbox_config=sandbox_config, 718 workspace_dir=workspace_dir, 719 ) 720 721 # Build custom sandbox executor for bubblewrap isolation 722 # SDK's built-in sandbox doesn't work reliably in Docker environments, 723 # so we use our own bubblewrap wrapper via the permission callback 724 sandbox_executor = self._build_sandbox_executor(sandbox_config, workspace_dir) 725 726 # Create permission callback using the permission manager 727 # Pass tracer's on_permission_check for tracing (if available) 728 # Pass denial tracker to record denials 729 # Pass trace_processor so permission denial shows FAILED status 730 # Pass sandbox_executor to wrap Bash commands in bubblewrap 731 on_permission_check = ( 732 self._tracer.on_permission_check 733 if hasattr(self._tracer, 'on_permission_check') 734 else None 735 ) 736 # Clear any previous denials before starting new run 737 self._denial_tracker.clear() 738 can_use_tool = create_permission_callback( 739 permission_manager=self._permission_manager, 740 on_permission_check=on_permission_check, 741 denial_tracker=self._denial_tracker, 742 trace_processor=trace_processor, 743 system_message_builder=self._sandbox_system_message_builder, 744 ) 745 746 all_tools = available_tools 747 748 # Get session directory for isolated Claude storage (CLAUDE_CONFIG_DIR) 749 session_dir = self._session_manager.get_session_dir(session_context.session_id) 750 751 # Ensure remote-settings.json exists in the session directory. 752 # The Claude Agent SDK binary reads this file with openSync during 753 # initialization and crashes (exit code 1) if it's missing — especially 754 # during session resume. An empty JSON object is a valid default. 755 remote_settings_path = session_dir / "remote-settings.json" 756 if not remote_settings_path.exists(): 757 try: 758 remote_settings_path.write_text("{}") 759 logger.debug( 760 "Created remote-settings.json for session %s", 761 session_context.session_id, 762 ) 763 except OSError as exc: 764 logger.warning( 765 "Could not create remote-settings.json: %s", exc 766 ) 767 768 # Set up MCP servers for additional tools 769 mcp_servers: dict[str, Any] = {} 770 771 # Configure PathValidator for this session BEFORE creating MCP tools 772 # The validator runs in the main Python process (outside bwrap) and 773 # translates agent paths (/workspace/...) to real Docker paths 774 # 775 # IMPORTANT: Skills paths must be DOCKER paths (not bwrap paths) because 776 # MCP tools (Read, Write, etc.) run outside bwrap and see the Docker filesystem. 777 # Docker mounts: ./skills:/skills, /users/{username}/.claude/skills:/user-skills 778 # So global skills are at /skills/.claude/skills/ and user skills at /user-skills/ 779 try: 780 global_skills = None 781 user_skills = None 782 if self._config.enable_skills: 783 global_skills = Path("/skills/.claude/skills") 784 if username: 785 user_skills = Path("/user-skills") 786 787 # External mount paths (Docker container paths) 788 # With flattened mount structure, all mounts are at /mounts/{name} 789 # Agent sees: /workspace/external/ro/* -> Real path: /mounts/{name} 790 # Agent sees: /workspace/external/rw/* -> Real path: /mounts/{name} 791 # Agent sees: /workspace/persistent/* -> Real path: /users/{username}/ag3ntum/persistent/* 792 persistent_path = Path(f"/users/{username}/ag3ntum/persistent") if username else None 793 794 # Load global mounts from manifest for PathValidator 795 # These are configured via external-mounts.yaml global section 796 # All mounts now appear at /mounts/{name} (flattened structure) 797 from ..services.mount_service import get_global_mounts_for_path_validator 798 global_mounts = get_global_mounts_for_path_validator() 799 global_mounts_ro_paths = global_mounts.get("ro", {}) 800 global_mounts_rw_paths = global_mounts.get("rw", {}) 801 802 # Load per-user mounts for PathValidator 803 # These are configured via external-mounts.yaml per_user section 804 # Mounts appear at /mounts/{name} in Docker (flattened structure) 805 user_mounts_ro_paths: dict[str, Path] = {} 806 user_mounts_rw_paths: dict[str, Path] = {} 807 808 if username: 809 from ..services.mount_service import get_user_mounts 810 try: 811 user_mounts_data = get_user_mounts(username) 812 for mount_info in user_mounts_data.get("ro", []): 813 name = mount_info["name"] 814 mount_path = Path(f"/mounts/{name}") 815 if mount_path.exists() or mount_info.get("optional", True): 816 user_mounts_ro_paths[name] = mount_path 817 for mount_info in user_mounts_data.get("rw", []): 818 name = mount_info["name"] 819 mount_path = Path(f"/mounts/{name}") 820 if mount_path.exists() or mount_info.get("optional", True): 821 user_mounts_rw_paths[name] = mount_path 822 except Exception as e: 823 logger.warning(f"Failed to load per-user mounts for PathValidator: {e}") 824 825 # SECURITY: Mount only authorized mounts individually into bwrap. 826 # Instead of mounting the entire /mounts tree (which exposes ALL 827 # mounts to every user), each authorized mount is added individually. 828 # This prevents cross-user mount visibility via Bash. 829 # 830 # In Docker (nested container mode), the host filesystem is the base 831 # for bwrap. Without --tmpfs /mounts, all Docker volumes at /mounts/ 832 # would be visible. The tmpfs creates a clean empty /mounts directory, 833 # then only authorized mounts are bind-mounted into it. 834 if sandbox_config and sandbox_config.enabled: 835 # Create tmpfs at /mounts to hide all Docker mount volumes, 836 # then add only authorized mounts individually below. 837 sandbox_config.tmpfs_paths.append("/mounts") 838 839 bwrap_mount_count = 0 840 all_bwrap_mounts = [ 841 (global_mounts_ro_paths, "ro"), 842 (global_mounts_rw_paths, "rw"), 843 (user_mounts_ro_paths, "ro"), 844 (user_mounts_rw_paths, "rw"), 845 ] 846 for mount_dict, mode in all_bwrap_mounts: 847 for _name, _mount_path in mount_dict.items(): 848 source = str(_mount_path) 849 if Path(source).exists(): 850 sandbox_config.dynamic_mounts.append(SandboxMount( 851 source=source, 852 target=source, 853 mode=mode, 854 optional=True, 855 )) 856 bwrap_mount_count += 1 857 else: 858 logger.warning( 859 f"SANDBOX: Skipping mount '{_name}': path {source} does not exist" 860 ) 861 if bwrap_mount_count > 0: 862 logger.info(f"SANDBOX: Added {bwrap_mount_count} authorized external mounts to bwrap") 863 864 # Build dynamic mount paths for PathValidator 865 dynamic_mounts_ro_paths: dict[str, Path] = {} 866 dynamic_mounts_rw_paths: dict[str, Path] = {} 867 # Also track host paths for original-path support 868 dynamic_host_paths: list[tuple[str, str, str]] = [] # (host_path, container_path, mode) 869 if dynamic_mounts: 870 for mount_info in dynamic_mounts: 871 # Build container path: /mounts/{base}/{subpath} (flattened structure) 872 container_path = f"/mounts/{mount_info.source_base}" 873 if mount_info.source_subpath: 874 container_path = f"{container_path}/{mount_info.source_subpath}" 875 mount_path = Path(container_path) 876 877 if mount_info.mode == "ro": 878 dynamic_mounts_ro_paths[mount_info.alias] = mount_path 879 else: 880 dynamic_mounts_rw_paths[mount_info.alias] = mount_path 881 882 # Also add to sandbox config for Bubblewrap binding 883 sandbox_config.dynamic_mounts.append(SandboxMount( 884 source=container_path, 885 target=container_path, # Same path so symlinks work 886 mode=mount_info.mode, 887 optional=True, 888 )) 889 890 # Track host_path for original-path support (e.g., /var/log) 891 if mount_info.host_path: 892 dynamic_host_paths.append((mount_info.host_path, container_path, mount_info.mode)) 893 894 logger.info( 895 f"Added {len(dynamic_mounts)} dynamic mounts: " 896 f"{len(dynamic_mounts_ro_paths)} RO, {len(dynamic_mounts_rw_paths)} RW" 897 ) 898 899 # Load original-path mounts for this user 900 # These allow accessing paths like /var/log at their original locations 901 original_path_mounts_ro: dict[str, Path] = {} 902 original_path_mounts_rw: dict[str, Path] = {} 903 if username: 904 from ..services.mount_service import get_original_path_mount_service 905 try: 906 orig_mount_service = get_original_path_mount_service() 907 orig_mounts = orig_mount_service.get_mounts_for_user(username) 908 for mount in orig_mounts: 909 docker_path = Path(mount.container_path) 910 if docker_path.exists() or mount.optional: 911 if mount.mode == "ro": 912 original_path_mounts_ro[mount.path] = docker_path 913 else: 914 original_path_mounts_rw[mount.path] = docker_path 915 916 # Add to sandbox config for Bubblewrap binding 917 # Bind Docker path to original location inside sandbox 918 sandbox_config.original_path_mounts.append(SandboxMount( 919 source=mount.container_path, # Docker path 920 target=mount.path, # Original path (bind target) 921 mode=mount.mode, 922 optional=mount.optional, 923 )) 924 if orig_mounts: 925 logger.info( 926 f"Added {len(orig_mounts)} original-path mounts: " 927 f"{len(original_path_mounts_ro)} RO, {len(original_path_mounts_rw)} RW" 928 ) 929 except Exception as e: 930 logger.warning(f"Failed to load original-path mounts: {e}") 931 932 # Also add external mounts (global and user) to original_path_mounts 933 # This enables agents to use host paths like /var/log directly, 934 # not just internal paths like ./external/ro/global_var_log 935 try: 936 from ..services.mount_service import get_all_mounts_with_host_paths 937 external_mounts_with_host_paths = get_all_mounts_with_host_paths(username) 938 external_original_count = 0 939 940 for mode in ("ro", "rw"): 941 target_dict = original_path_mounts_ro if mode == "ro" else original_path_mounts_rw 942 for mount_info in external_mounts_with_host_paths.get(mode, []): 943 host_path = mount_info["host_path"] 944 container_path = mount_info["container_path"] 945 docker_path = Path(container_path) 946 947 # Skip if already added (from original_paths config) 948 if host_path in target_dict: 949 continue 950 951 target_dict[host_path] = docker_path 952 external_original_count += 1 953 954 # Add to sandbox config for Bubblewrap binding 955 sandbox_config.original_path_mounts.append(SandboxMount( 956 source=container_path, # Docker path 957 target=host_path, # Original host path (bind target) 958 mode=mode, 959 optional=True, 960 )) 961 962 if external_original_count > 0: 963 logger.info( 964 f"Added {external_original_count} external mounts to original-path support " 965 f"(total: {len(original_path_mounts_ro)} RO, {len(original_path_mounts_rw)} RW)" 966 ) 967 except Exception as e: 968 logger.warning(f"Failed to add external mounts to original-path support: {e}") 969 970 # Add dynamic mount host paths to original-path support 971 # This enables agents to use /var/log directly when /var/log is a dynamic mount 972 if dynamic_host_paths: 973 dynamic_original_count = 0 974 for host_path, container_path, mode in dynamic_host_paths: 975 target_dict = original_path_mounts_ro if mode == "ro" else original_path_mounts_rw 976 # Skip if already added 977 if host_path in target_dict: 978 continue 979 target_dict[host_path] = Path(container_path) 980 dynamic_original_count += 1 981 # Add to sandbox config for Bubblewrap binding 982 sandbox_config.original_path_mounts.append(SandboxMount( 983 source=container_path, 984 target=host_path, 985 mode=mode, 986 optional=True, 987 )) 988 if dynamic_original_count > 0: 989 logger.info( 990 f"Added {dynamic_original_count} dynamic mounts to original-path support " 991 f"(total: {len(original_path_mounts_ro)} RO, {len(original_path_mounts_rw)} RW)" 992 ) 993 994 configure_path_validator( 995 session_id=session_context.session_id, 996 workspace_path=workspace_dir, 997 username=username, # Pass username to configure SandboxPathResolver 998 skills_path=self._skills_dir if self._config.enable_skills else None, 999 global_skills_path=global_skills, 1000 user_skills_path=user_skills, 1001 global_mounts_ro=global_mounts_ro_paths if global_mounts_ro_paths else None, 1002 global_mounts_rw=global_mounts_rw_paths if global_mounts_rw_paths else None, 1003 persistent_path=persistent_path if persistent_path and persistent_path.exists() else None, 1004 user_mounts_ro=user_mounts_ro_paths if user_mounts_ro_paths else None, 1005 user_mounts_rw=user_mounts_rw_paths if user_mounts_rw_paths else None, 1006 dynamic_mounts_ro=dynamic_mounts_ro_paths if dynamic_mounts_ro_paths else None, 1007 dynamic_mounts_rw=dynamic_mounts_rw_paths if dynamic_mounts_rw_paths else None, 1008 original_path_mounts_ro=original_path_mounts_ro if original_path_mounts_ro else None, 1009 original_path_mounts_rw=original_path_mounts_rw if original_path_mounts_rw else None, 1010 ) 1011 logger.info( 1012 f"PathValidator configured for session {session_context.session_id}, " 1013 f"workspace={workspace_dir}, global_skills={global_skills}, user_skills={user_skills}, " 1014 f"global_mounts={len(global_mounts_ro_paths)} RO/{len(global_mounts_rw_paths)} RW, " 1015 f"persistent={persistent_path if persistent_path and persistent_path.exists() else None}, " 1016 f"user_mounts={len(user_mounts_ro_paths)} RO/{len(user_mounts_rw_paths)} RW, " 1017 f"original_paths={len(original_path_mounts_ro)} RO/{len(original_path_mounts_rw)} RW" 1018 ) 1019 # Register sandbox UID so file tools can chown files to the session user 1020 if self._linux_uid is not None: 1021 set_session_linux_uid(session_context.session_id, self._linux_uid) 1022 1023 except Exception as e: 1024 logger.error(f"Failed to configure PathValidator: {e}") 1025 raise AgentError(f"PathValidator configuration failed: {e}") 1026 1027 # Add unified Ag3ntum MCP server containing ALL tools (Bash + file tools) 1028 # All tools share the same server name "ag3ntum" for consistent naming: 1029 # mcp__ag3ntum__Bash, mcp__ag3ntum__Read, mcp__ag3ntum__Write, etc. 1030 # SECURITY: Bash uses bwrap sandbox, file tools use PathValidator 1031 # NOTE: MCP tools are REQUIRED - fail fast if creation fails 1032 session_id = session_context.session_id 1033 include_bash = AG3NTUM_BASH_TOOL in all_tools 1034 try: 1035 # Create unified MCP server with ALL Ag3ntum tools 1036 # Tool names: mcp__ag3ntum__Bash, mcp__ag3ntum__Read, mcp__ag3ntum__Write, 1037 # mcp__ag3ntum__Edit, mcp__ag3ntum__MultiEdit, mcp__ag3ntum__Glob, 1038 # mcp__ag3ntum__Grep, mcp__ag3ntum__LS, mcp__ag3ntum__WebFetch, 1039 # mcp__ag3ntum__AskUserQuestion 1040 ag3ntum_server = create_ag3ntum_tools_mcp_server( 1041 session_id=session_id, 1042 workspace_path=workspace_dir, 1043 sandbox_executor=sandbox_executor, # SECURITY: Enable bwrap for Bash 1044 include_bash=include_bash, 1045 ssh_context=ssh_context, 1046 server_name="ag3ntum" 1047 ) 1048 mcp_servers["ag3ntum"] = ag3ntum_server 1049 1050 # CRITICAL: Add MCP tool names to all_tools list for subagent access 1051 # The SDK's AgentDefinition.tools filters from the parent's available tools. 1052 # Without this, subagents can't use MCP tools even if specified in their config. 1053 # Tool names follow the mcp__{server}__{tool} convention. 1054 ag3ntum_tool_names = [ 1055 "mcp__ag3ntum__Read", 1056 "mcp__ag3ntum__ReadDocument", 1057 "mcp__ag3ntum__Write", 1058 "mcp__ag3ntum__Edit", 1059 "mcp__ag3ntum__MultiEdit", 1060 "mcp__ag3ntum__Glob", 1061 "mcp__ag3ntum__Grep", 1062 "mcp__ag3ntum__LS", 1063 "mcp__ag3ntum__WebFetch", 1064 "mcp__ag3ntum__AskUserQuestion", 1065 ] 1066 if include_bash: 1067 ag3ntum_tool_names.append("mcp__ag3ntum__Bash") 1068 1069 # Add SSH tool names if SSH context is available 1070 # SSH tools are pre-approved (added to allowed_tools) because 1071 # access is already gated by feature flags + per-user enablement, 1072 # and the tools enforce their own security (command filter, host 1073 # blocking, rate limiting, credential vault). 1074 if ssh_context is not None: 1075 from tools.ag3ntum.ag3ntum_ssh.tool import ( 1076 AG3NTUM_SSH_EXEC_TOOL, 1077 AG3NTUM_SSH_READ_TOOL, 1078 AG3NTUM_SSH_CONNECT_TOOL, 1079 ) 1080 ssh_tool_names = [ 1081 AG3NTUM_SSH_EXEC_TOOL, 1082 AG3NTUM_SSH_READ_TOOL, 1083 AG3NTUM_SSH_CONNECT_TOOL, 1084 ] 1085 ag3ntum_tool_names.extend(ssh_tool_names) 1086 # Pre-approve SSH tools so the SDK permission system doesn't 1087 # block them — SSH security is enforced by the tools themselves 1088 allowed_tools.extend(ssh_tool_names) 1089 1090 # Add to all_tools so they're available for subagent tool filtering 1091 all_tools.extend(ag3ntum_tool_names) 1092 1093 tool_count = len(ag3ntum_tool_names) 1094 logger.info( 1095 f"Ag3ntum unified MCP server configured ({tool_count} tools, " 1096 f"Bash: {include_bash}, sandbox: {'ENABLED' if sandbox_executor else 'DISABLED'})" 1097 ) 1098 logger.debug(f"MCP tools added to all_tools: {ag3ntum_tool_names}") 1099 except Exception as e: 1100 # MCP tools are critical - fail fast with clear error 1101 raise AgentError( 1102 f"CRITICAL: Failed to create Ag3ntum MCP server. " 1103 f"MCP tools (mcp__ag3ntum__*) are required for Ag3ntum to function. " 1104 f"Error: {e}" 1105 ) 1106 1107 # Add skills MCP server for script-based skills 1108 # SECURITY: Script skills MUST run inside the Bubblewrap sandbox 1109 # Environment variables (sandboxed_envs) are injected via sandbox config's custom_env 1110 if self._config.enable_skills and sandbox_executor is not None: 1111 try: 1112 skill_tools_manager = SkillToolsManager( 1113 skills_dir=self._skills_dir, 1114 workspace_dir=workspace_dir, 1115 sandbox_executor=sandbox_executor, 1116 ) 1117 skill_tools_manager.initialize() 1118 1119 skill_tool_names = skill_tools_manager.get_tool_definitions() 1120 if skill_tool_names: 1121 skills_server = skill_tools_manager.create_mcp_server( 1122 name="skills", 1123 version="1.0.0" 1124 ) 1125 mcp_servers["skills"] = skills_server 1126 logger.info( 1127 f"Skills MCP server configured ({len(skill_tool_names)} script skills, " 1128 f"sandbox: ENABLED, envs via sandbox config)" 1129 ) 1130 except Exception as e: 1131 logger.warning(f"Failed to create skills MCP server: {e}") 1132 elif self._config.enable_skills and sandbox_executor is None: 1133 logger.warning( 1134 "Skills MCP server NOT created: SandboxExecutor is required for script-based skills. " 1135 "Instruction-based skills (SKILL.md) can still use mcp__ag3ntum__Bash." 1136 ) 1137 1138 # Get subagent overrides from the global SubagentManager singleton 1139 # These override Claude Code's built-in subagents (general-purpose, etc.) 1140 # and disable unwanted ones (claude-code-guide, statusline-setup) 1141 subagent_manager = get_subagent_manager() 1142 agents = subagent_manager.get_agents_dict() 1143 if agents: 1144 logger.info( 1145 f"SUBAGENTS: Using {len(agents)} custom subagent definitions " 1146 f"(enabled: {subagent_manager.list_enabled_agents()}, " 1147 f"disabled: {subagent_manager.list_disabled_agents()})" 1148 ) 1149 1150 # Build environment variables 1151 # Use base_model (without :mode=thinking suffix) for API calls 1152 # Set MAX_THINKING_TOKENS when thinking mode is enabled 1153 env_vars = {"CLAUDE_CONFIG_DIR": str(session_dir)} 1154 thinking_tokens = self._config.effective_thinking_tokens 1155 if thinking_tokens: 1156 env_vars["MAX_THINKING_TOKENS"] = str(thinking_tokens) 1157 logger.info( 1158 f"THINKING: Extended thinking enabled with {thinking_tokens} token budget" 1159 ) 1160 1161 # Check if model needs LLM proxy routing (non-Anthropic models) 1162 # This sets ANTHROPIC_BASE_URL to route requests through our proxy 1163 # session_id is embedded in the URL for session-scoped debug output 1164 proxy_base_url = _get_proxy_base_url_for_model( 1165 self._config.base_model, 1166 session_id=session_context.session_id, 1167 ) 1168 if proxy_base_url: 1169 env_vars["ANTHROPIC_BASE_URL"] = proxy_base_url 1170 logger.info(f"LLM_PROXY: Routing model '{self._config.base_model}' via {proxy_base_url}") 1171 1172 # Build hooks configuration 1173 # PreCompact: logs when context compaction is triggered for diagnostics 1174 hooks_config: dict[str, list] = {} 1175 pre_compact_hook = create_pre_compact_hook(hook_logger=logger) 1176 hooks_config["PreCompact"] = [HookMatcher(hooks=[pre_compact_hook])] 1177 1178 logger.info( 1179 f"SANDBOX: Final ClaudeAgentOptions - " 1180 f"tools={all_tools}, allowed_tools={allowed_tools}, " 1181 f"disallowed_tools={disallowed_tools}, " 1182 f"can_use_tool={'SET' if can_use_tool else 'NONE'}, " 1183 f"cwd={workspace_dir}, " 1184 f"CLAUDE_CONFIG_DIR={session_dir}, " 1185 f"mcp_servers={list(mcp_servers.keys())}, " 1186 f"bwrap_sandbox={'ENABLED' if sandbox_executor else 'DISABLED'}, " 1187 f"resume={resume_id}, fork_session={fork_session}, " 1188 f"agents={list(agents.keys()) if agents else 'none'}, " 1189 f"thinking={'ENABLED (' + str(thinking_tokens) + ' tokens)' if thinking_tokens else 'DISABLED'}" 1190 ) 1191 1192 # Capture SDK stderr for debugging (especially exit code 1 on resume) 1193 def _sdk_stderr_callback(line: str) -> None: 1194 logger.debug("SDK_STDERR: %s", line.rstrip()) 1195 1196 return ClaudeAgentOptions( 1197 system_prompt=system_prompt, 1198 model=self._config.base_model, # Use base model without :mode=thinking suffix 1199 max_turns=self._config.max_turns, 1200 permission_mode=None, # CRITICAL: Explicitly set to None to use can_use_tool callback 1201 tools=all_tools, # Available tools (excluding disabled) 1202 allowed_tools=allowed_tools, # Pre-approved (no permission check) 1203 disallowed_tools=disallowed_tools, # Completely blocked tools 1204 mcp_servers=mcp_servers if mcp_servers else None, 1205 cwd=str(workspace_dir), # Sandboxed workspace, not session dir 1206 add_dirs=add_dirs, 1207 setting_sources=["project"] if self._config.enable_skills else [], 1208 can_use_tool=can_use_tool, # Includes bwrap sandboxing for Bash 1209 env=env_vars, # Per-session storage + thinking config 1210 resume=resume_id, # Claude's session ID for resumption 1211 fork_session=fork_session, # Fork instead of continue when resuming 1212 enable_file_checkpointing=self._config.enable_file_checkpointing, 1213 max_buffer_size=self._config.max_buffer_size, 1214 output_format=self._config.output_format, 1215 include_partial_messages=self._config.include_partial_messages, 1216 agents=agents if agents else None, # Subagent overrides (global singleton) 1217 hooks=hooks_config if hooks_config else None, # SDK hooks (PreCompact, etc.) 1218 stderr=_sdk_stderr_callback, # Capture SDK process stderr for debugging 1219 ) 1220 1221 def _build_user_prompt( 1222 self, 1223 task: str, 1224 session_context: SessionContext, 1225 parameters: Optional[dict] = None 1226 ) -> str: 1227 """ 1228 Build the user prompt from template. 1229 1230 Args: 1231 task: The task description. 1232 session_context: Session context with session_id. 1233 parameters: Additional template parameters. 1234 1235 Returns: 1236 Rendered user prompt. 1237 1238 Raises: 1239 AgentError: If user prompt template is missing or invalid. 1240 """ 1241 # Validate task is provided 1242 if not task or not task.strip(): 1243 raise AgentError("Task is required and must not be empty") 1244 1245 # Validate user prompt template exists 1246 user_template_path = PROMPTS_DIR / "user.md" 1247 if not user_template_path.exists(): 1248 raise AgentError( 1249 f"User prompt template not found in: {PROMPTS_DIR}\n" 1250 f"Create the template file in AGENT/prompts/user.md" 1251 ) 1252 1253 params = parameters or {} 1254 workspace_dir = self._session_manager.get_workspace_dir( 1255 session_context.session_id 1256 ) 1257 try: 1258 # Build context for user prompt rendering 1259 user_context = PromptContext() 1260 user_context.strings["TASK"] = task 1261 user_context.strings["WORKING_DIR"] = self._config.working_dir or str(workspace_dir) 1262 if params.get("context"): 1263 user_context.strings["CONTEXT"] = params["context"] 1264 user_context.flags["HAS_CONTEXT"] = bool(params.get("context")) 1265 user_prompt = _user_prompt_engine.load_and_render( 1266 user_template_path, user_context 1267 ) 1268 except Exception as e: 1269 raise AgentError(f"Failed to render user prompt template: {e}") from e 1270 1271 if not user_prompt or not user_prompt.strip(): 1272 raise AgentError("User prompt is empty after rendering") 1273 1274 return user_prompt 1275 1276 def _build_sandbox_executor( 1277 self, 1278 sandbox_config: Optional[SandboxConfig], 1279 workspace_dir: Path, 1280 ) -> Optional[SandboxExecutor]: 1281 """ 1282 Build a SandboxExecutor with resolved mounts for bubblewrap isolation. 1283 1284 This creates the executor that will wrap Bash commands in bubblewrap 1285 to provide proper filesystem isolation within Docker containers. 1286 1287 When linux_uid/linux_gid are set on the agent, sandboxed commands will 1288 drop privileges to run as that user instead of the API user (45045). 1289 This ensures files created by the agent are owned by the session user. 1290 1291 Args: 1292 sandbox_config: Sandbox configuration from permissions.yaml. 1293 workspace_dir: Absolute path to the session workspace directory. 1294 1295 Returns: 1296 SandboxExecutor if sandbox is enabled, None otherwise. 1297 """ 1298 if sandbox_config is None or not sandbox_config.enabled: 1299 logger.info("BWRAP SANDBOX: Disabled in config") 1300 return None 1301 1302 if not sandbox_config.file_sandboxing: 1303 logger.info("BWRAP SANDBOX: File sandboxing disabled") 1304 return None 1305 1306 # Pass linux_uid/linux_gid to executor for privilege dropping 1307 executor = SandboxExecutor( 1308 sandbox_config, 1309 linux_uid=self._linux_uid, 1310 linux_gid=self._linux_gid, 1311 ) 1312 1313 if self._linux_uid is not None: 1314 logger.info(f"BWRAP SANDBOX: Will drop privileges to UID={self._linux_uid}, GID={self._linux_gid}") 1315 1316 # Validate mount sources exist 1317 missing = executor.validate_mount_sources() 1318 if missing: 1319 logger.warning( 1320 f"BWRAP SANDBOX: Some mount sources don't exist: {missing}. " 1321 "Sandbox may fail at runtime." 1322 ) 1323 1324 logger.info( 1325 f"BWRAP SANDBOX: Enabled with {len(sandbox_config.static_mounts)} static mounts, " 1326 f"{len(sandbox_config.session_mounts)} session mounts, " 1327 f"workspace={workspace_dir}" 1328 ) 1329 1330 return executor 1331 1332 def _sandbox_system_message_builder( 1333 self, 1334 tool_name: str, 1335 tool_input: dict[str, Any], 1336 ) -> Optional[str]: 1337 if not self._sandbox_system_message: 1338 return None 1339 if tool_name in { 1340 "Bash", 1341 "Read", 1342 "Write", 1343 "Edit", 1344 "MultiEdit", 1345 "Glob", 1346 "Grep", 1347 "LS", 1348 "WebFetch", 1349 "WebSearch", 1350 }: 1351 return self._sandbox_system_message 1352 return None 1353 1354 def _format_sandbox_system_message( 1355 self, 1356 sandbox_config: Optional[SandboxConfig], 1357 workspace_dir: Path, 1358 ) -> Optional[str]: 1359 if sandbox_config is None: 1360 return None 1361 1362 writable_paths = sandbox_config.writable_paths or [str(workspace_dir)] 1363 readonly_paths = sandbox_config.readonly_paths or [] 1364 network_mode = "enabled" if sandbox_config.network_sandboxing and sandbox_config.enabled else "disabled" 1365 file_mode = "enabled" if sandbox_config.file_sandboxing and sandbox_config.enabled else "disabled" 1366 1367 return ( 1368 "Sandbox policy: " 1369 f"file sandboxing {file_mode}, network sandboxing {network_mode}. " 1370 f"Writable: {', '.join(writable_paths) or 'none'}. " 1371 f"Read-only: {', '.join(readonly_paths) or 'none'}. " 1372 "Do not access paths outside the allowed list or attempt to bypass sandboxing." 1373 ) 1374 1375 def _validate_response(self, response: Optional[ResultMessage]) -> None: 1376 """ 1377 Validate the agent response. 1378 1379 Args: 1380 response: The ResultMessage from the SDK. 1381 1382 Raises: 1383 SessionIncompleteError: If session did not complete. 1384 ServerError: If an API error occurred. 1385 MaxTurnsExceededError: If max turns was exceeded. 1386 """ 1387 if response is None: 1388 raise SessionIncompleteError("Session did not complete") 1389 if response.is_error: 1390 # Use result field for actual error message, fall back to subtype 1391 error_msg = response.result or response.subtype or "Unknown error" 1392 raise ServerError(f"API error: {error_msg}") 1393 if response.subtype == "error_max_turns": 1394 raise MaxTurnsExceededError( 1395 f"Exceeded {self._config.max_turns} turns" 1396 ) 1397 1398 async def run( 1399 self, 1400 task: str, 1401 system_prompt: Optional[str] = None, 1402 parameters: Optional[dict] = None, 1403 resume_session_id: Optional[str] = None, 1404 fork_session: bool = False, 1405 timeout_seconds: Optional[int] = None, 1406 session_id: Optional[str] = None, 1407 username: Optional[str] = None, 1408 session_context: Optional[SessionContext] = None, 1409 dynamic_mounts: Optional[list] = None, 1410 ssh_context: Optional[Any] = None, 1411 ) -> AgentResult: 1412 """ 1413 Execute the agent with a task. 1414 1415 Timeout is always enforced. Uses config.timeout_seconds (default 1800s = 30 min) 1416 unless overridden via timeout_seconds parameter. 1417 1418 Args: 1419 task: The task description. 1420 system_prompt: Custom system prompt. If None, loads from prompts/system-prompts/. 1421 parameters: Additional template parameters (optional). 1422 resume_session_id: Session ID to resume (optional, for logging - use session_context.claude_session_id). 1423 fork_session: If True, fork to new session when resuming (optional). 1424 timeout_seconds: Override timeout (uses config.timeout_seconds if None). 1425 session_id: Session ID to use for new session (optional, use session_context.session_id instead). 1426 username: Optional username for user-specific features. 1427 session_context: Session context from database. If provided, contains session_id and 1428 claude_session_id for resumption. Caller is responsible for persisting 1429 updates from AgentResult back to database. 1430 dynamic_mounts: List of DynamicMountInfo objects for this session (optional). 1431 1432 Returns: 1433 AgentResult with execution outcome. 1434 1435 Raises: 1436 AgentError: If prompts cannot be loaded or are invalid. 1437 """ 1438 # Determine effective timeout (parameter overrides config) 1439 effective_timeout = timeout_seconds or self._config.timeout_seconds 1440 1441 # Wrap execution with timeout to ensure every run is time-bounded 1442 return await asyncio.wait_for( 1443 self._execute( 1444 task, system_prompt, parameters, resume_session_id, fork_session, 1445 session_id=session_id, username=username, session_context=session_context, 1446 dynamic_mounts=dynamic_mounts, 1447 ssh_context=ssh_context, 1448 ), 1449 timeout=effective_timeout, 1450 ) 1451 1452 async def _execute( 1453 self, 1454 task: str, 1455 system_prompt: Optional[str] = None, 1456 parameters: Optional[dict] = None, 1457 resume_session_id: Optional[str] = None, 1458 fork_session: bool = False, 1459 session_id: Optional[str] = None, 1460 username: Optional[str] = None, 1461 session_context: Optional[SessionContext] = None, 1462 dynamic_mounts: Optional[list] = None, 1463 ssh_context: Optional[Any] = None, 1464 ) -> AgentResult: 1465 """ 1466 Internal execution logic (called by run() with timeout wrapper). 1467 1468 Args: 1469 task: The task description. 1470 system_prompt: Custom system prompt. If None, loads from prompts/system-prompts/. 1471 parameters: Additional template parameters (optional). 1472 resume_session_id: Session ID to resume (optional, for logging only - use session_context.claude_session_id). 1473 fork_session: If True, fork to new session when resuming (optional). 1474 session_id: Session ID to use for new session (optional, use session_context.session_id instead). 1475 username: Optional username for user-specific features. 1476 session_context: Session context from database. If not provided, a minimal one is created. 1477 dynamic_mounts: List of DynamicMountInfo objects for this session. 1478 1479 Returns: 1480 AgentResult with execution outcome. 1481 1482 Raises: 1483 AgentError: If prompts cannot be loaded or are invalid. 1484 """ 1485 # Session context should be provided by caller (from database) 1486 # If not provided, create a minimal one (for backward compat during transition) 1487 if session_context is None: 1488 # Generate session ID if not provided 1489 if session_id is None: 1490 from .sessions import generate_session_id 1491 session_id = generate_session_id() 1492 session_context = SessionContext( 1493 session_id=session_id, 1494 working_dir=self._config.working_dir or str(AGENT_DIR), 1495 file_checkpointing_enabled=self._config.enable_file_checkpointing, 1496 ) 1497 # Create session directory 1498 self._session_manager.create_session_directory(session_context.session_id) 1499 else: 1500 # Session directory was already created by session_service (backend path). 1501 # After chown to sandbox user, API process may lack group access (770 perms) 1502 # if the container hasn't restarted since user creation (Gotcha #12). 1503 # The agent subprocess runs as sandbox user via bwrap and has full access. 1504 try: 1505 self._session_manager.create_session_directory(session_context.session_id) 1506 except PermissionError: 1507 session_dir = self._session_manager.get_session_dir(session_context.session_id) 1508 if session_dir.exists(): 1509 logger.debug( 1510 f"Session directory {session_dir} exists but API process " 1511 f"lacks group access (expected after user creation without restart)" 1512 ) 1513 else: 1514 raise 1515 1516 # Extract resume_id from session_context for SDK resumption. 1517 # Validate that the conversation data still exists — the SDK crashes 1518 # (exit code 1) if asked to resume a session whose JSONL was lost 1519 # (e.g., after a container rebuild or a previous crash). 1520 resume_id: Optional[str] = None 1521 if session_context.claude_session_id: 1522 resume_id = session_context.claude_session_id 1523 # Check that the SDK's projects/ directory has conversation data 1524 _sess_dir = self._session_manager.get_session_dir( 1525 session_context.session_id 1526 ) 1527 projects_dir = _sess_dir / "projects" 1528 has_conversation_data = ( 1529 projects_dir.is_dir() 1530 and any(projects_dir.rglob("*.jsonl")) 1531 ) 1532 if has_conversation_data: 1533 logger.info( 1534 f"Resuming session: {session_context.session_id} " 1535 f"(Claude session: {resume_id})" 1536 ) 1537 else: 1538 logger.warning( 1539 f"Cannot resume session {session_context.session_id}: " 1540 f"conversation data missing (projects/*.jsonl not found). " 1541 f"Starting fresh conversation instead." 1542 ) 1543 resume_id = None 1544 1545 # Set session context for session-specific permissions 1546 # This sandboxes the agent to only its own workspace folder 1547 # Inside the sandbox, the workspace is mounted at /workspace and cwd is /workspace 1548 # So relative paths are relative to /workspace (the session workspace directory) 1549 if self._permission_manager is not None: 1550 # Agent's perspective inside sandbox: cwd is /workspace 1551 workspace_path = "." 1552 workspace_absolute = self._session_manager.get_workspace_dir( 1553 session_context.session_id 1554 ) 1555 self._permission_manager.set_session_context( 1556 session_id=session_context.session_id, 1557 workspace_path=workspace_path, 1558 workspace_absolute_path=workspace_absolute, 1559 username=username 1560 ) 1561 1562 # Setup skills access in workspace 1563 # Creates merged .claude/skills/ directory with symlinks to global and user skills 1564 self._setup_workspace_skills(session_context.session_id, username=username) 1565 1566 # Load system prompt from template if not provided 1567 # Done after session creation so permissions reflect session-specific rules 1568 if system_prompt is None: 1569 # Build permission profile data for the template 1570 # Now includes session-specific paths after set_session_context() 1571 permissions_data = None 1572 if self._permission_manager is not None: 1573 active_profile = self._permission_manager.active_profile 1574 # Get allow/deny/allowed_dirs from permissions if available 1575 allow_rules: list[str] = [] 1576 deny_rules: list[str] = [] 1577 allowed_dirs: list[str] = [] 1578 if active_profile.permissions is not None: 1579 allow_rules = active_profile.permissions.allow 1580 deny_rules = active_profile.permissions.deny 1581 allowed_dirs = active_profile.permissions.allowed_dirs 1582 1583 permissions_data = { 1584 "name": active_profile.name, 1585 "description": active_profile.description, 1586 "allow": allow_rules, 1587 "deny": deny_rules, 1588 "enabled_tools": active_profile.tools.enabled, 1589 "disabled_tools": active_profile.tools.disabled, 1590 "allowed_dirs": allowed_dirs, 1591 } 1592 sandbox_config = self._permission_manager.get_sandbox_config() 1593 if sandbox_config is not None: 1594 permissions_data["sandbox"] = { 1595 "enabled": sandbox_config.enabled, 1596 "file_sandboxing": sandbox_config.file_sandboxing, 1597 "network_sandboxing": sandbox_config.network_sandboxing, 1598 "writable_paths": sandbox_config.writable_paths, 1599 "readonly_paths": sandbox_config.readonly_paths, 1600 "network": { 1601 "enabled": sandbox_config.network.enabled, 1602 "allowed_domains": sandbox_config.network.allowed_domains, 1603 "allow_localhost": sandbox_config.network.allow_localhost, 1604 }, 1605 } 1606 1607 # Get workspace directory for template 1608 workspace_dir = self._session_manager.get_workspace_dir( 1609 session_context.session_id 1610 ) 1611 1612 # Custom role can be specified via parameters["role"] to override config 1613 params = parameters or {} 1614 role_name = params.get("role", self._config.role) 1615 1616 try: 1617 system_prompt = get_prompt_manager().build_system_prompt( 1618 username=username, 1619 role=role_name, 1620 model=self._config.model, 1621 session_id=session_context.session_id, 1622 docker_workspace_path=str(workspace_dir), 1623 permissions=permissions_data, 1624 enable_skills=self._config.enable_skills, 1625 external_mounts=self._load_external_mounts_config(username), 1626 dynamic_mounts=dynamic_mounts or [], 1627 original_path_mounts=self._load_original_path_mounts_for_prompt( 1628 username, dynamic_mounts 1629 ), 1630 ssh_profiles=getattr(ssh_context, "profiles", None) if ssh_context else None, 1631 ) 1632 except FileNotFoundError as e: 1633 raise AgentError(str(e)) from e 1634 except Exception as e: 1635 raise AgentError(f"Failed to render system prompt: {e}") from e 1636 1637 # Validate system prompt is not empty 1638 if not system_prompt or not system_prompt.strip(): 1639 raise AgentError("System prompt is empty after loading/rendering") 1640 1641 # Build path display mapping for transforming internal paths to host paths in output 1642 # This makes agent output more user-friendly (e.g., "/var/log" instead of "./external/ro/global_var_log") 1643 path_display_mapping: dict[str, str] = {} 1644 try: 1645 from ..services.mount_service import get_path_display_mapping 1646 path_display_mapping = get_path_display_mapping(username) 1647 1648 # Also add dynamic mount aliases if they have host paths 1649 if dynamic_mounts: 1650 for mount_info in dynamic_mounts: 1651 if mount_info.host_path and mount_info.alias: 1652 # Dynamic mounts appear at workspace root as {alias} 1653 path_display_mapping[mount_info.alias] = mount_info.host_path 1654 1655 if path_display_mapping: 1656 logger.debug(f"Built path display mapping with {len(path_display_mapping)} entries") 1657 except Exception as e: 1658 logger.warning(f"Failed to build path display mapping: {e}") 1659 1660 # Create trace processor BEFORE options so it can be passed to 1661 # permission callback for correct failure status display 1662 trace_processor = TraceProcessor(self._tracer, path_display_mapping=path_display_mapping) 1663 trace_processor.set_task(task) 1664 trace_processor.set_model(self._config.model) 1665 1666 # Apply model-specific guardrail profile 1667 from src.config import get_model_profile 1668 model_profile = get_model_profile(self._config.model) 1669 trace_processor.configure_guardrails( 1670 max_consecutive_failures=model_profile.get("max_consecutive_failures"), 1671 max_repetitive_calls=model_profile.get("max_repetitive_calls"), 1672 max_silent_turns=model_profile.get("max_silent_turns"), 1673 max_todowrite_only_turns=model_profile.get("max_todowrite_only_turns"), 1674 ) 1675 1676 # Set cumulative stats if resuming a session (for display during execution) 1677 if session_context.cumulative_turns > 0 or session_context.cumulative_cost_usd > 0: 1678 trace_processor.set_cumulative_stats( 1679 cost_usd=session_context.cumulative_cost_usd, 1680 turns=session_context.cumulative_turns, 1681 tokens=session_context.cumulative_total_tokens, 1682 ) 1683 1684 options = self._build_options( 1685 session_context, system_prompt, trace_processor, 1686 resume_id=resume_id, 1687 fork_session=fork_session, 1688 username=username, 1689 dynamic_mounts=dynamic_mounts, 1690 ssh_context=ssh_context, 1691 ) 1692 user_prompt = self._build_user_prompt(task, session_context, parameters) 1693 1694 log_file = self._session_manager.get_log_file(session_context.session_id) 1695 result: Optional[ResultMessage] = None 1696 1697 # Create checkpoint tracker for file change tracking 1698 checkpoint_tracker = CheckpointTracker( 1699 session_id=session_context.session_id, 1700 auto_checkpoint_tools=self._config.auto_checkpoint_tools, 1701 enabled=session_context.file_checkpointing_enabled, 1702 initial_turn_count=session_context.cumulative_turns, 1703 ) 1704 1705 try: 1706 async with ClaudeSDKClient(options=options) as client: 1707 await client.query(user_prompt) 1708 1709 with log_file.open("w", encoding="utf-8") as f: 1710 async for message in client.receive_response(): 1711 # Write to log file 1712 f.write(json.dumps(asdict(message)) + "\n") 1713 1714 # Process for console tracing 1715 trace_processor.process_message(message) 1716 1717 # Check circuit breaker for repeated tool failures 1718 if trace_processor.circuit_breaker_tripped: 1719 logger.error( 1720 "Circuit breaker tripped for session %s: %s", 1721 session_context.session_id, 1722 trace_processor.circuit_breaker_message, 1723 ) 1724 self._tracer.on_error( 1725 trace_processor.circuit_breaker_message, 1726 error_type="circuit_breaker" 1727 ) 1728 # Force a result if we have a partial one 1729 if isinstance(message, ResultMessage): 1730 result = message 1731 break 1732 1733 # Track checkpoints for file-modifying tools 1734 checkpoint_tracker.process_message(message) 1735 1736 if isinstance(message, ResultMessage): 1737 result = message 1738 1739 # Persist conversation data to project JSONL for --resume support. 1740 # In SDK mode, the binary doesn't write conversations to project files, 1741 # so we must write them ourselves for resume to find them. 1742 if result and getattr(result, "session_id", None): 1743 _session_dir = self._session_manager.get_session_dir( 1744 session_context.session_id 1745 ) 1746 _workspace_dir = self._session_manager.get_workspace_dir( 1747 session_context.session_id 1748 ) 1749 _persist_conversation_for_resume( 1750 session_dir=_session_dir, 1751 workspace_dir=_workspace_dir, 1752 claude_session_id=result.session_id, 1753 user_prompt=user_prompt, 1754 result_text=result.result, 1755 model=self._config.model, 1756 ) 1757 1758 # Compute adjusted turn count: exclude TodoWrite/TodoRead from metrics 1759 # These are planning tools that shouldn't count against the turn budget 1760 _todo_count = trace_processor.todo_tool_count 1761 def _adjusted_turns(raw_turns: int) -> int: 1762 return max(0, raw_turns - _todo_count) 1763 1764 # Check if circuit breaker was tripped 1765 if trace_processor.circuit_breaker_tripped: 1766 error_msg = trace_processor.circuit_breaker_message 1767 self._cleanup_session(session_context.session_id) 1768 1769 # Extract metrics even for failed runs 1770 usage = None 1771 if result: 1772 usage = TokenUsage.from_sdk_usage(result.usage) 1773 1774 # Finalize any orphaned subagents before emitting completion 1775 trace_processor.finalize_orphaned_subagents() 1776 1777 # Emit completion so the UI can close the stream deterministically 1778 self._tracer.on_agent_complete( 1779 status="FAILED", 1780 num_turns=_adjusted_turns(result.num_turns) if result else 0, 1781 duration_ms=result.duration_ms if result else 0, 1782 total_cost_usd=result.total_cost_usd if result else None, 1783 result=error_msg, 1784 session_id=getattr(result, "session_id", None) if result else None, 1785 usage=getattr(result, "usage", None) if result else None, 1786 model=self._config.model, 1787 cumulative_cost_usd=session_context.cumulative_cost_usd, 1788 cumulative_turns=session_context.cumulative_turns, 1789 cumulative_tokens=session_context.cumulative_total_tokens, 1790 ) 1791 1792 return AgentResult( 1793 status=TaskStatus.FAILED, 1794 error=error_msg, 1795 metrics=LLMMetrics( 1796 model=self._config.model, 1797 duration_ms=result.duration_ms if result else 0, 1798 num_turns=_adjusted_turns(result.num_turns) if result else 0, 1799 session_id=result.session_id if result else None, 1800 total_cost_usd=result.total_cost_usd if result else None, 1801 usage=usage, 1802 ) if result else None, 1803 session_id=session_context.session_id, 1804 ) 1805 1806 self._validate_response(result) 1807 1808 # Check if agent was interrupted due to permission denial 1809 # This happens when interrupt=True is returned from permission callback 1810 if self._denial_tracker.was_interrupted: 1811 denial = self._denial_tracker.last_denial 1812 error_msg = denial.message if denial else "Permission denied" 1813 self._tracer.on_error(error_msg, error_type="permission_denied") 1814 self._cleanup_session(session_context.session_id) 1815 1816 # Extract metrics even for failed runs 1817 usage = None 1818 if result: 1819 usage = TokenUsage.from_sdk_usage(result.usage) 1820 # Note: Session update is now handled by caller via AgentResult.metrics 1821 1822 # Finalize any orphaned subagents before emitting completion 1823 trace_processor.finalize_orphaned_subagents() 1824 1825 # Emit completion so the UI can close the stream deterministically. 1826 if result: 1827 self._tracer.on_agent_complete( 1828 status="FAILED", 1829 num_turns=_adjusted_turns(result.num_turns), 1830 duration_ms=result.duration_ms, 1831 total_cost_usd=result.total_cost_usd, 1832 result=result.result, 1833 session_id=getattr(result, "session_id", None), 1834 usage=getattr(result, "usage", None), 1835 model=self._config.model, 1836 cumulative_cost_usd=session_context.cumulative_cost_usd, 1837 cumulative_turns=session_context.cumulative_turns, 1838 cumulative_tokens=session_context.cumulative_total_tokens, 1839 ) 1840 1841 return AgentResult( 1842 status=TaskStatus.FAILED, 1843 error=error_msg, 1844 metrics=LLMMetrics( 1845 model=self._config.model, 1846 duration_ms=result.duration_ms if result else 0, 1847 num_turns=_adjusted_turns(result.num_turns) if result else 0, 1848 session_id=result.session_id if result else None, 1849 total_cost_usd=result.total_cost_usd if result else None, 1850 usage=usage, 1851 ) if result else None, 1852 session_id=session_context.session_id, 1853 ) 1854 1855 # Normal successful completion 1856 # Clean up session (remove skills, switch to system profile) 1857 self._cleanup_session(session_context.session_id) 1858 1859 # Extract token usage from result (for metrics in AgentResult) 1860 usage = None 1861 if result: 1862 usage = TokenUsage.from_sdk_usage(result.usage) 1863 # Note: Session update is now handled by caller via AgentResult.metrics 1864 1865 # Determine session status using priority chain: 1866 # agent self-assessment > fallback heuristic 1867 raw_status = determine_session_status( 1868 result_text=result.result if result else None, 1869 had_tool_errors=trace_processor.had_tool_errors(), 1870 tool_error_count=trace_processor.tool_error_count, 1871 ) 1872 1873 # Finalize any orphaned subagents before emitting completion 1874 trace_processor.finalize_orphaned_subagents() 1875 1876 # Emit completion so the UI can close the stream cleanly. 1877 if result: 1878 self._tracer.on_agent_complete( 1879 status=raw_status, 1880 num_turns=_adjusted_turns(result.num_turns), 1881 duration_ms=result.duration_ms, 1882 total_cost_usd=result.total_cost_usd, 1883 result=result.result, 1884 session_id=getattr(result, "session_id", None), 1885 usage=getattr(result, "usage", None), 1886 model=self._config.model, 1887 cumulative_cost_usd=session_context.cumulative_cost_usd, 1888 cumulative_turns=session_context.cumulative_turns, 1889 cumulative_tokens=session_context.cumulative_total_tokens, 1890 ) 1891 1892 return AgentResult( 1893 status=TaskStatus(raw_status), 1894 output=result.result if result else None, 1895 metrics=LLMMetrics( 1896 model=self._config.model, 1897 duration_ms=result.duration_ms, 1898 num_turns=_adjusted_turns(result.num_turns), 1899 session_id=result.session_id, 1900 total_cost_usd=result.total_cost_usd, 1901 usage=usage, 1902 ) if result else None, 1903 session_id=session_context.session_id, 1904 ) 1905 1906 except AgentError as e: 1907 self._tracer.on_error(str(e), error_type="agent_error") 1908 self._cleanup_session(session_context.session_id) 1909 # Note: Session status update is now handled by caller 1910 raise 1911 except asyncio.TimeoutError: 1912 error_msg = f"Timed out after {self._config.timeout_seconds}s" 1913 self._tracer.on_error(error_msg, error_type="timeout") 1914 self._cleanup_session(session_context.session_id) 1915 # Note: Session status update is now handled by caller 1916 raise AgentError(error_msg) 1917 except Exception as e: 1918 self._tracer.on_error(str(e), error_type="error") 1919 self._cleanup_session(session_context.session_id) 1920 # Note: Session status update is now handled by caller 1921 return AgentResult( 1922 status=TaskStatus.ERROR, 1923 error=str(e), 1924 session_id=session_context.session_id, 1925 ) 1926 1927 async def run_with_timeout( 1928 self, 1929 task: str, 1930 system_prompt: Optional[str] = None, 1931 parameters: Optional[dict] = None, 1932 resume_session_id: Optional[str] = None, 1933 fork_session: bool = False, 1934 timeout_seconds: Optional[int] = None, 1935 session_id: Optional[str] = None 1936 ) -> AgentResult: 1937 """ 1938 Execute agent with timeout (alias for run(), kept for backward compatibility). 1939 1940 All runs now enforce timeout by default (30 minutes). 1941 1942 Args: 1943 task: The task description. 1944 system_prompt: Custom system prompt (optional). 1945 parameters: Additional template parameters (optional). 1946 resume_session_id: Session ID to resume (optional). 1947 fork_session: If True, fork to new session when resuming (optional). 1948 timeout_seconds: Override timeout (uses config.timeout_seconds if None). 1949 session_id: Session ID to use for new session (optional). 1950 1951 Returns: 1952 AgentResult with execution outcome. 1953 """ 1954 return await self.run( 1955 task, system_prompt, parameters, resume_session_id, fork_session, 1956 timeout_seconds=timeout_seconds, session_id=session_id 1957 ) 1958 1959 async def compact( 1960 self, 1961 session_id: str, 1962 claude_session_id: str 1963 ) -> dict[str, Any]: 1964 """ 1965 Compact conversation history for a session. 1966 1967 Reduces context size by summarizing older messages while 1968 preserving important context. Uses the SDK's /compact command. 1969 1970 Args: 1971 session_id: The Ag3ntum session ID (for logging). 1972 claude_session_id: The Claude SDK session ID for resumption. 1973 1974 Returns: 1975 Dict with compaction metadata: 1976 - pre_tokens: Token count before compaction 1977 - post_tokens: Token count after compaction (if available) 1978 - trigger: What triggered the compaction 1979 1980 Raises: 1981 AgentError: If claude_session_id is not provided. 1982 """ 1983 if not claude_session_id: 1984 raise AgentError( 1985 f"Session {session_id} has no Claude session ID to resume" 1986 ) 1987 1988 compact_metadata: dict[str, Any] = {} 1989 1990 async with ClaudeSDKClient( 1991 options=ClaudeAgentOptions( 1992 resume=claude_session_id, 1993 max_turns=1 1994 ) 1995 ) as client: 1996 await client.query("/compact") 1997 1998 async for message in client.receive_response(): 1999 if isinstance(message, SystemMessage): 2000 if message.subtype == "compact_boundary": 2001 compact_metadata = message.data.get("compact_metadata", {}) 2002 2003 logger.info( 2004 f"Compacted session {session_id}: " 2005 f"pre_tokens={compact_metadata.get('pre_tokens')}" 2006 ) 2007 2008 return compact_metadata 2009 2010 # ------------------------------------------------------------------------- 2011 # Checkpoint Management 2012 # 2013 # NOTE: Checkpoint data is now stored in the database (Session.checkpoints_json). 2014 # Callers should use session_service to manage checkpoints. 2015 # These methods are provided for convenience and work with passed-in data. 2016 # ------------------------------------------------------------------------- 2017 2018 def create_checkpoint( 2019 self, 2020 session_id: str, 2021 uuid: str, 2022 turn_number: int, 2023 description: Optional[str] = None 2024 ) -> Checkpoint: 2025 """ 2026 Create a manual checkpoint object. 2027 2028 This creates a Checkpoint object that the caller should persist to the database. 2029 2030 Args: 2031 session_id: The session ID. 2032 uuid: The user message UUID from the SDK. 2033 turn_number: Current cumulative turn number. 2034 description: Optional description of the checkpoint. 2035 2036 Returns: 2037 The created Checkpoint object. Caller must persist to database. 2038 """ 2039 from datetime import datetime 2040 checkpoint = Checkpoint( 2041 uuid=uuid, 2042 created_at=datetime.now(), 2043 checkpoint_type=CheckpointType.MANUAL, 2044 description=description, 2045 turn_number=turn_number, 2046 ) 2047 logger.debug(f"Created manual checkpoint: {checkpoint.to_summary()}") 2048 return checkpoint 2049 2050 async def rewind_to_checkpoint( 2051 self, 2052 session_id: str, 2053 claude_session_id: str, 2054 checkpoint: Checkpoint, 2055 file_checkpointing_enabled: bool = True 2056 ) -> dict[str, Any]: 2057 """ 2058 Rewind files to a specific checkpoint. 2059 2060 This restores all files to their state at the specified checkpoint, 2061 reverting any changes made after that point. 2062 2063 Args: 2064 session_id: The Ag3ntum session ID (for logging). 2065 claude_session_id: The Claude SDK session ID for resumption. 2066 checkpoint: The Checkpoint object to rewind to. 2067 file_checkpointing_enabled: Whether file checkpointing is enabled. 2068 2069 Returns: 2070 Dict with rewind metadata: 2071 - checkpoint: The checkpoint that was rewound to 2072 - success: Whether the rewind succeeded 2073 2074 Raises: 2075 AgentError: If session data is invalid or checkpointing not enabled. 2076 2077 Note: 2078 The caller is responsible for clearing checkpoints after this one 2079 from the database using session_service.clear_checkpoints_after(). 2080 """ 2081 # Validate file checkpointing is enabled 2082 if not file_checkpointing_enabled: 2083 raise AgentError( 2084 f"File checkpointing is not enabled for session {session_id}. " 2085 "Set enable_file_checkpointing=True in session config." 2086 ) 2087 2088 # Validate session has a resume ID 2089 if not claude_session_id: 2090 raise AgentError( 2091 f"Session {session_id} has no Claude session ID to resume" 2092 ) 2093 2094 # Use SDK to rewind files 2095 async with ClaudeSDKClient( 2096 options=ClaudeAgentOptions( 2097 resume=claude_session_id, 2098 max_turns=1, 2099 enable_file_checkpointing=True, 2100 ) 2101 ) as client: 2102 await client.rewind_files(checkpoint.uuid) 2103 2104 logger.info( 2105 f"Rewound session {session_id} to checkpoint {checkpoint.uuid}" 2106 ) 2107 2108 # Notify tracer if available 2109 if hasattr(self._tracer, 'on_checkpoint_rewind'): 2110 self._tracer.on_checkpoint_rewind(checkpoint, 0) 2111 2112 return { 2113 "checkpoint": checkpoint, 2114 "success": True, 2115 } 2116 2117 async def rewind_to_latest_checkpoint( 2118 self, 2119 session_id: str, 2120 claude_session_id: str, 2121 checkpoints: list[Checkpoint], 2122 file_checkpointing_enabled: bool = True 2123 ) -> dict[str, Any]: 2124 """ 2125 Rewind to the most recent checkpoint. 2126 2127 Convenience method for undoing the last file-modifying operation. 2128 2129 Args: 2130 session_id: The Ag3ntum session ID. 2131 claude_session_id: The Claude SDK session ID for resumption. 2132 checkpoints: List of checkpoints from database (Session.checkpoints_json). 2133 file_checkpointing_enabled: Whether file checkpointing is enabled. 2134 2135 Returns: 2136 Dict with rewind metadata (same as rewind_to_checkpoint). 2137 2138 Raises: 2139 AgentError: If no checkpoints exist or rewind fails. 2140 """ 2141 if len(checkpoints) < 2: 2142 raise AgentError( 2143 f"Session {session_id} needs at least 2 checkpoints to rewind" 2144 ) 2145 2146 # Rewind to the checkpoint before the last one 2147 return await self.rewind_to_checkpoint( 2148 session_id=session_id, 2149 claude_session_id=claude_session_id, 2150 checkpoint=checkpoints[-2], 2151 file_checkpointing_enabled=file_checkpointing_enabled 2152 ) 2153 2154 @staticmethod 2155 def get_checkpoint_summary(checkpoints: list[Checkpoint]) -> list[str]: 2156 """ 2157 Get a human-readable summary of checkpoints. 2158 2159 Args: 2160 checkpoints: List of Checkpoint objects. 2161 2162 Returns: 2163 List of checkpoint summary strings. 2164 """ 2165 return [ 2166 f"[{i}] {cp.to_summary()}" 2167 for i, cp in enumerate(checkpoints) 2168 ] 2169 2170 2171 def _persist_conversation_for_resume( 2172 session_dir: Path, 2173 workspace_dir: Path, 2174 claude_session_id: str, 2175 user_prompt: str, 2176 result_text: Optional[str], 2177 model: str, 2178 ) -> None: 2179 """ 2180 Write conversation data to the Claude Code project JSONL file. 2181 2182 In SDK mode (--output-format stream-json), the Claude Code binary does NOT 2183 persist conversation messages to its project JSONL files — only metadata 2184 (queue-operation, file-history-snapshot). When --resume is used for a 2185 subsequent request, the binary searches these files for conversation data 2186 and fails with "No conversation found" if none exists. 2187 2188 This function writes the user prompt and assistant response to the project 2189 file in the format the binary expects, enabling successful resume. 2190 2191 Args: 2192 session_dir: The session directory (CLAUDE_CONFIG_DIR). 2193 workspace_dir: The workspace directory (cwd for the agent). 2194 claude_session_id: The Claude session ID from the SDK. 2195 user_prompt: The user's prompt text. 2196 result_text: The assistant's response text (may be None on error). 2197 model: The model name used. 2198 """ 2199 try: 2200 # Compute project slug from workspace path (same algorithm as Claude Code binary) 2201 # Binary replaces both "/" and "_" with "-" and keeps leading "-" 2202 # e.g., /users/greg/sessions/20260212_213024_c37241ab/workspace 2203 # → -users-greg-sessions-20260212-213024-c37241ab-workspace 2204 slug = str(workspace_dir).replace("/", "-").replace("_", "-") 2205 project_dir = session_dir / "projects" / slug 2206 project_dir.mkdir(parents=True, exist_ok=True) 2207 2208 project_file = project_dir / f"{claude_session_id}.jsonl" 2209 2210 from datetime import timezone 2211 now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" 2212 user_uuid = str(uuid_mod.uuid4()) 2213 assistant_uuid = str(uuid_mod.uuid4()) 2214 cwd = str(workspace_dir) 2215 2216 # Find parentUuid for the user message by reading the last assistant entry 2217 parent_uuid = None 2218 if project_file.exists(): 2219 try: 2220 with project_file.open("r", encoding="utf-8") as f: 2221 for line in f: 2222 line = line.strip() 2223 if not line: 2224 continue 2225 try: 2226 entry = json.loads(line) 2227 if entry.get("type") == "assistant" and entry.get("uuid"): 2228 parent_uuid = entry["uuid"] 2229 except json.JSONDecodeError: 2230 continue 2231 except Exception: 2232 parent_uuid = None 2233 2234 entries = [] 2235 2236 # User message 2237 entries.append({ 2238 "parentUuid": parent_uuid, 2239 "isSidechain": False, 2240 "userType": "external", 2241 "cwd": cwd, 2242 "sessionId": claude_session_id, 2243 "type": "user", 2244 "message": {"role": "user", "content": user_prompt}, 2245 "uuid": user_uuid, 2246 "timestamp": now, 2247 "permissionMode": "default", 2248 }) 2249 2250 # Assistant message 2251 content_blocks = [] 2252 if result_text: 2253 content_blocks.append({"type": "text", "text": result_text}) 2254 2255 entries.append({ 2256 "parentUuid": user_uuid, 2257 "isSidechain": False, 2258 "userType": "external", 2259 "cwd": cwd, 2260 "sessionId": claude_session_id, 2261 "type": "assistant", 2262 "message": { 2263 "role": "assistant", 2264 "content": content_blocks, 2265 "model": model, 2266 "stop_reason": "end_turn", 2267 "stop_sequence": None, 2268 }, 2269 "uuid": assistant_uuid, 2270 "timestamp": now, 2271 }) 2272 2273 # Append to project file (preserves binary's queue-operation entries) 2274 with project_file.open("a", encoding="utf-8") as f: 2275 for entry in entries: 2276 f.write(json.dumps(entry) + "\n") 2277 2278 logger.debug( 2279 "Persisted conversation for resume: session=%s, file=%s, entries=%d", 2280 claude_session_id, project_file, len(entries), 2281 ) 2282 except Exception as e: 2283 # Non-fatal: resume may fail but current request succeeded 2284 logger.warning( 2285 "Failed to persist conversation for resume (session=%s): %s", 2286 claude_session_id, e, 2287 ) 2288 2289 2290 async def run_agent( 2291 task: str, 2292 config: AgentConfig, 2293 permission_manager: PermissionManager, 2294 system_prompt: Optional[str] = None, 2295 parameters: Optional[dict] = None, 2296 resume_session_id: Optional[str] = None, 2297 fork_session: bool = False, 2298 tracer: Optional[Union[TracerBase, bool]] = True 2299 ) -> AgentResult: 2300 """ 2301 Convenience function to run the agent. 2302 2303 Args: 2304 task: The task description. 2305 config: AgentConfig loaded from agent.yaml (required). 2306 permission_manager: PermissionManager (required). 2307 system_prompt: Custom system prompt. 2308 parameters: Additional template parameters. 2309 resume_session_id: Session ID to resume. 2310 fork_session: If True, fork to new session when resuming. 2311 tracer: Execution tracer for console output. 2312 - True (default): Use ExecutionTracer with default settings. 2313 - False/None: Disable tracing (NullTracer). 2314 - TracerBase instance: Use custom tracer. 2315 2316 Returns: 2317 AgentResult with execution outcome. 2318 2319 Raises: 2320 AgentError: If permission manager is not provided or prompts are missing. 2321 2322 Example: 2323 from config import AgentConfigLoader 2324 from schemas import AgentConfig 2325 2326 loader = AgentConfigLoader() 2327 yaml_config = loader.get_config() 2328 config = AgentConfig(**yaml_config, working_dir="/path/to/project") 2329 2330 result = await run_agent( 2331 task="List all files", 2332 config=config, 2333 permission_manager=manager 2334 ) 2335 """ 2336 agent = ClaudeAgent( 2337 config, 2338 tracer=tracer, 2339 permission_manager=permission_manager 2340 ) 2341 return await agent.run_with_timeout( 2342 task, system_prompt, parameters, resume_session_id, fork_session 2343 )