prompt_manager.py
1 """ 2 Prompt Manager for Ag3ntum. 3 4 Manages prompt loading, caching, user overrides, and rendering. 5 Provides the main interface for prompt operations. 6 """ 7 import logging 8 from pathlib import Path 9 from typing import Any, Optional 10 11 import yaml 12 13 from ..config import CONFIG_DIR, PROMPTS_DIR, USERS_DIR 14 from .prompt_engine import PromptTemplateEngine, PromptMetadata 15 from .prompt_context import build_prompt_context, PromptContext 16 17 logger = logging.getLogger(__name__) 18 19 20 class PromptManager: 21 """ 22 Singleton manager for prompt loading and rendering. 23 24 Handles: 25 - Global prompt loading from prompts/ 26 - User override merging from users/{user}/.prompts/ 27 - Override allowlist enforcement 28 - Hot reload via clear_cache() 29 """ 30 31 _instance: Optional["PromptManager"] = None 32 33 def __init__(self) -> None: 34 self._prompts_dir = PROMPTS_DIR 35 self._engine = PromptTemplateEngine(base_dir=self._prompts_dir) 36 self._overrides_config = self._load_overrides_config() 37 38 @classmethod 39 def get_instance(cls) -> "PromptManager": 40 """Get or create the singleton instance.""" 41 if cls._instance is None: 42 cls._instance = cls() 43 return cls._instance 44 45 @classmethod 46 def reset_instance(cls) -> None: 47 """Reset singleton (for testing).""" 48 cls._instance = None 49 50 def _load_overrides_config(self) -> dict[str, Any]: 51 """Load prompt override allowlist configuration.""" 52 config_path = CONFIG_DIR / "prompt-overrides.yaml" 53 if not config_path.exists(): 54 return {"allowed_overrides": {}} 55 56 try: 57 with config_path.open(encoding="utf-8") as f: 58 return yaml.safe_load(f) or {"allowed_overrides": {}} 59 except Exception as e: 60 logger.error(f"Failed to load prompt-overrides.yaml: {e}") 61 return {"allowed_overrides": {}} 62 63 def _is_override_allowed(self, category: str, filename: str) -> bool: 64 """Check if a user override is allowed for this prompt.""" 65 allowed = self._overrides_config.get("allowed_overrides", {}) 66 category_rules = allowed.get(category, []) 67 68 for rule in category_rules: 69 if rule == "*.md": 70 return True 71 if rule == filename: 72 return True 73 74 return False 75 76 def _get_user_override_path( 77 self, 78 username: str, 79 category: str, 80 filename: str, 81 ) -> Optional[Path]: 82 """Get user override path if it exists and is allowed.""" 83 if not self._is_override_allowed(category, filename): 84 return None 85 86 user_prompts_dir = USERS_DIR / username / ".prompts" 87 override_path = user_prompts_dir / category / filename 88 89 if override_path.exists(): 90 return override_path 91 92 return None 93 94 def build_system_prompt( 95 self, 96 username: Optional[str] = None, 97 role: str = "default", 98 model: str = "claude-sonnet-4-20250514", 99 session_id: Optional[str] = None, 100 docker_workspace_path: str = "", 101 permissions: Optional[dict[str, Any]] = None, 102 enable_skills: bool = True, 103 external_mounts: Optional[dict[str, Any]] = None, 104 dynamic_mounts: Optional[list] = None, 105 original_path_mounts: Optional[list] = None, 106 ssh_profiles: Optional[dict[str, Any]] = None, 107 ) -> str: 108 """ 109 Build the complete system prompt. 110 111 Loads and renders all prompt components, applying user overrides 112 where allowed. 113 114 Args: 115 username: User for override lookup (None for global only) 116 role: Role template name 117 model: Model name for context 118 session_id: Session identifier 119 docker_workspace_path: Internal Docker path (agent sees / as root) 120 permissions: Permission profile 121 enable_skills: Skills enabled flag 122 external_mounts: External mounts config 123 dynamic_mounts: Dynamic mount list for this session 124 original_path_mounts: Original-path mount list 125 126 Returns: 127 Complete rendered system prompt 128 129 Raises: 130 FileNotFoundError: If role file not found 131 """ 132 # Load role content 133 role_path = self._prompts_dir / "roles" / f"{role}.md" 134 if username: 135 user_role = self._get_user_override_path(username, "roles", f"{role}.md") 136 if user_role: 137 role_path = user_role 138 139 if not role_path.exists(): 140 raise FileNotFoundError( 141 f"Role file not found: {role_path}. " 142 f"Create the role file in prompts/roles/{role}.md" 143 ) 144 145 role_content = role_path.read_text(encoding="utf-8").strip() 146 147 # Build context 148 context = build_prompt_context( 149 docker_workspace_path=docker_workspace_path, 150 session_id=session_id, 151 model=model, 152 role_content=role_content, 153 permissions=permissions, 154 enable_skills=enable_skills, 155 enable_external_mounts=bool(external_mounts), 156 external_mounts=external_mounts, 157 dynamic_mounts=dynamic_mounts, 158 original_path_mounts=original_path_mounts, 159 ssh_profiles=ssh_profiles, 160 ) 161 162 # Load and render each system prompt component 163 prompt_parts = [] 164 165 system_prompts_dir = self._prompts_dir / "system-prompts" 166 if system_prompts_dir.exists(): 167 for prompt_file in sorted(system_prompts_dir.glob("*.md")): 168 # Check for user override 169 if username: 170 override = self._get_user_override_path( 171 username, "system-prompts", prompt_file.name 172 ) 173 if override: 174 prompt_file = override 175 176 rendered = self._engine.load_and_render(prompt_file, context) 177 if rendered.strip(): 178 prompt_parts.append(rendered) 179 180 return "\n\n".join(prompt_parts) 181 182 def render_subagent_prompt( 183 self, 184 template_path: str, 185 context: Optional[PromptContext] = None, 186 ) -> str: 187 """ 188 Render a subagent prompt template. 189 190 Args: 191 template_path: Relative path within prompts dir (e.g., "subagents/general-purpose/prompt.md") 192 context: Optional context; if None, builds a minimal context 193 194 Returns: 195 Rendered prompt string 196 """ 197 full_path = self._prompts_dir / template_path 198 if not full_path.exists(): 199 raise FileNotFoundError(f"Subagent prompt not found: {full_path}") 200 201 if context is None: 202 context = build_prompt_context(enable_skills=True) 203 204 return self._engine.load_and_render(full_path, context) 205 206 def get_system_reminder( 207 self, 208 reminder_name: str, 209 context: PromptContext, 210 ) -> Optional[str]: 211 """ 212 Get a rendered system reminder. 213 214 Args: 215 reminder_name: Name of the reminder (without .md extension) 216 context: PromptContext for rendering 217 218 Returns: 219 Rendered reminder or None if not found 220 """ 221 reminder_path = self._prompts_dir / "system-reminders" / f"{reminder_name}.md" 222 if not reminder_path.exists(): 223 return None 224 225 return self._engine.load_and_render(reminder_path, context) 226 227 def get_available_roles(self) -> list[str]: 228 """Get list of available role templates.""" 229 roles_dir = self._prompts_dir / "roles" 230 if not roles_dir.exists(): 231 return [] 232 return sorted([ 233 f.stem for f in roles_dir.glob("*.md") 234 if f.is_file() 235 ]) 236 237 def get_prompt_modules(self) -> list[str]: 238 """Get list of system prompt modules.""" 239 system_dir = self._prompts_dir / "system-prompts" 240 if not system_dir.exists(): 241 return [] 242 return sorted([ 243 f.stem for f in system_dir.glob("*.md") 244 if f.is_file() 245 ]) 246 247 def reload(self) -> int: 248 """ 249 Reload all prompts by clearing cache. 250 251 Returns: 252 Number of cache entries cleared 253 """ 254 self._overrides_config = self._load_overrides_config() 255 return self._engine.clear_cache() 256 257 258 def get_prompt_manager() -> PromptManager: 259 """Get the global PromptManager singleton.""" 260 return PromptManager.get_instance()