run.py
1 """ 2 CLI command for running SAM, sending a task, and stopping - all in one command. 3 """ 4 import asyncio 5 import sys 6 import time 7 import uuid 8 from pathlib import Path 9 from typing import Optional, List, Dict, Any 10 11 import click 12 import httpx 13 14 from cli.utils import error_exit 15 16 from .common import ( 17 fetch_available_agents, 18 get_agent_name_from_cards, 19 execute_task, 20 ) 21 from .sam_runner import SAMRunner, discover_config_files 22 23 24 async def wait_for_agents( 25 url: str, 26 target_agent: str, 27 token: Optional[str], 28 timeout: float, 29 poll_interval: float, 30 stabilization_wait: float, 31 debug_fn, 32 ) -> tuple[List[Dict[str, Any]], str]: 33 """ 34 Wait for agents to become available. 35 36 Algorithm: 37 1. Poll /api/v1/agentCards until we get a response with at least one agent 38 2. Wait stabilization_wait seconds for all agents to register 39 3. Poll again to get the final list 40 4. Verify target agent exists 41 42 Returns: 43 Tuple of (agent_cards, resolved_agent_name) 44 45 Raises: 46 TimeoutError: If agents don't become ready within timeout 47 """ 48 start_time = time.time() 49 first_agents_time = None 50 51 while time.time() - start_time < timeout: 52 try: 53 agent_cards = await fetch_available_agents(url, token) 54 55 if agent_cards: 56 if first_agents_time is None: 57 # First time we see agents - start stabilization wait 58 first_agents_time = time.time() 59 agent_names = [c.get("name", "?") for c in agent_cards] 60 debug_fn(f"First agents detected: {agent_names}") 61 debug_fn(f"Waiting {stabilization_wait}s for stabilization...") 62 await asyncio.sleep(stabilization_wait) 63 # Poll again after stabilization 64 agent_cards = await fetch_available_agents(url, token) 65 agent_names = [c.get("name", "?") for c in agent_cards] 66 debug_fn(f"After stabilization: {agent_names}") 67 68 # Check if target agent is available 69 resolved_agent = get_agent_name_from_cards(agent_cards, target_agent) 70 if resolved_agent: 71 return agent_cards, resolved_agent 72 73 debug_fn(f"Target agent '{target_agent}' not yet available") 74 75 except httpx.ConnectError: 76 debug_fn("Gateway not yet responding...") 77 except httpx.HTTPStatusError as e: 78 debug_fn(f"Gateway error: {e.response.status_code}") 79 80 await asyncio.sleep(poll_interval) 81 82 raise TimeoutError(f"Timeout waiting for agent '{target_agent}' after {timeout}s") 83 84 85 @click.command("run") 86 @click.argument("message", required=True) 87 @click.option( 88 "--config", 89 "-c", 90 "config_paths", 91 multiple=True, 92 type=click.Path(exists=True, dir_okay=True, resolve_path=True), 93 help="YAML config files or directories (can be used multiple times). Defaults to configs/ directory.", 94 ) 95 @click.option( 96 "--skip", 97 "-s", 98 "skip_files", 99 multiple=True, 100 help="File name(s) to exclude from configs (e.g., -s my_agent.yaml).", 101 ) 102 @click.option( 103 "--url", 104 "-u", 105 envvar="SAM_WEBUI_URL", 106 default="http://localhost:8000", 107 help="Base URL of the webui gateway (default: http://localhost:8000)", 108 ) 109 @click.option( 110 "--agent", 111 "-a", 112 envvar="SAM_AGENT", 113 default="orchestrator", 114 help="Target agent name (default: orchestrator)", 115 ) 116 @click.option( 117 "--session-id", 118 default=None, 119 help="Session ID for context continuity (generates new if not provided)", 120 ) 121 @click.option( 122 "--token", 123 "-t", 124 envvar="SAM_AUTH_TOKEN", 125 default=None, 126 help="Bearer token for authentication", 127 ) 128 @click.option( 129 "--file", 130 "-f", 131 "files", 132 multiple=True, 133 type=click.Path(exists=True, dir_okay=False, resolve_path=True), 134 help="File(s) to attach (can be used multiple times)", 135 ) 136 @click.option( 137 "--timeout", 138 default=300, 139 type=int, 140 help="Timeout in seconds for task execution (default: 300)", 141 ) 142 @click.option( 143 "--startup-timeout", 144 default=60, 145 type=int, 146 help="Timeout in seconds for agent readiness (default: 60)", 147 ) 148 @click.option( 149 "--output-dir", 150 "-o", 151 default=None, 152 type=click.Path(), 153 help="Output directory for artifacts and logs (default: /tmp/sam-task-run-{taskId})", 154 ) 155 @click.option( 156 "--quiet", 157 "-q", 158 is_flag=True, 159 help="Suppress streaming output, only show final result", 160 ) 161 @click.option( 162 "--no-stim", 163 is_flag=True, 164 help="Do not fetch the STIM file on completion", 165 ) 166 @click.option( 167 "--system-env", 168 is_flag=True, 169 help="Use system environment variables only; do not load .env file.", 170 ) 171 @click.option( 172 "--debug", 173 is_flag=True, 174 help="Enable debug output", 175 ) 176 def run_task( 177 message: str, 178 config_paths: tuple, 179 skip_files: tuple, 180 url: str, 181 agent: str, 182 session_id: Optional[str], 183 token: Optional[str], 184 files: tuple, 185 timeout: int, 186 startup_timeout: int, 187 output_dir: Optional[str], 188 quiet: bool, 189 no_stim: bool, 190 system_env: bool, 191 debug: bool, 192 ): 193 """ 194 Start SAM, send a task, stream the response, and stop. 195 196 This command runs SAM with the specified configuration, waits for agents 197 to become ready, sends a task, streams the response, and then cleanly 198 shuts down SAM. 199 200 MESSAGE is the prompt text to send to the agent. 201 202 \b 203 Examples: 204 # Basic usage with default configs 205 sam task run "What agents are available?" 206 207 # Specify config files 208 sam task run "Hello" -c examples/agents/orchestrator.yaml -c examples/gateways/webui.yaml 209 210 # With file attachment 211 sam task run "Summarize this document" --file ./document.pdf -c configs/ 212 213 # Target specific agent 214 sam task run "Analyze data" --agent data_analyst -c configs/ 215 """ 216 try: 217 exit_code = asyncio.run( 218 _run_task_main( 219 message=message, 220 config_paths=config_paths, 221 skip_files=skip_files, 222 url=url, 223 agent=agent, 224 session_id=session_id, 225 token=token, 226 files=list(files), 227 timeout=timeout, 228 startup_timeout=startup_timeout, 229 output_dir=output_dir, 230 quiet=quiet, 231 no_stim=no_stim, 232 system_env=system_env, 233 debug=debug, 234 ) 235 ) 236 sys.exit(exit_code) 237 except KeyboardInterrupt: 238 click.echo("\n\nTask cancelled by user.") 239 sys.exit(1) 240 except Exception as e: 241 if debug: 242 import traceback 243 traceback.print_exc() 244 error_exit(f"Error: {e}") 245 246 247 async def _run_task_main( 248 message: str, 249 config_paths: tuple, 250 skip_files: tuple, 251 url: str, 252 agent: str, 253 session_id: Optional[str], 254 token: Optional[str], 255 files: List[str], 256 timeout: int, 257 startup_timeout: int, 258 output_dir: Optional[str], 259 quiet: bool, 260 no_stim: bool, 261 system_env: bool, 262 debug: bool, 263 ) -> int: 264 """Main async implementation of the task run command.""" 265 266 def _debug(msg: str): 267 if debug: 268 click.echo(click.style(f"[DEBUG] {msg}", fg="yellow"), err=True) 269 270 def _info(msg: str): 271 if not quiet: 272 click.echo(msg) 273 274 # Discover config files 275 try: 276 config_files = discover_config_files(config_paths, skip_files) 277 except FileNotFoundError as e: 278 click.echo(click.style(str(e), fg="red"), err=True) 279 return 1 280 281 if not config_files: 282 click.echo(click.style("No configuration files found.", fg="red"), err=True) 283 return 1 284 285 _info(click.style(f"Starting SAM with {len(config_files)} config file(s)...", fg="blue")) 286 for cf in config_files: 287 _debug(f" Config: {cf}") 288 289 # Create output directory with unique name to avoid collisions between concurrent runs 290 output_path = Path(output_dir) if output_dir else Path(f"/tmp/sam-task-run-{uuid.uuid4()}") 291 output_path.mkdir(parents=True, exist_ok=True) 292 log_file = output_path / "sam.log" 293 294 # Create SAM runner 295 sam_runner = SAMRunner( 296 config_files=config_files, 297 log_file=log_file, 298 load_env=not system_env, 299 ) 300 301 exit_code = 1 302 303 try: 304 # Start SAM 305 sam_runner.start() 306 _info(click.style("SAM started.", fg="green")) 307 308 # Wait for agents to be ready 309 _info(click.style(f"Waiting for agents (timeout: {startup_timeout}s)...", fg="blue")) 310 311 try: 312 agent_cards, resolved_agent = await wait_for_agents( 313 url=url, 314 target_agent=agent, 315 token=token, 316 timeout=startup_timeout, 317 poll_interval=1.0, 318 stabilization_wait=2.0, 319 debug_fn=_debug, 320 ) 321 except TimeoutError as e: 322 click.echo(click.style(str(e), fg="red"), err=True) 323 click.echo(click.style(f"Check {log_file} for SAM startup logs.", fg="yellow"), err=True) 324 return 1 325 326 agent_names = [c.get("name", "?") for c in agent_cards] 327 _info(click.style(f"Agents ready: {', '.join(agent_names)}", fg="green")) 328 329 if resolved_agent != agent: 330 _info(click.style(f"Using agent: {resolved_agent}", fg="yellow")) 331 332 _info("") 333 334 # Send the task 335 exit_code = await execute_task( 336 message=message, 337 url=url, 338 agent=resolved_agent, 339 session_id=session_id, 340 token=token, 341 files=files, 342 timeout=timeout, 343 output_dir=output_path, 344 quiet=quiet, 345 no_stim=no_stim, 346 debug=debug, 347 ) 348 349 finally: 350 # Always stop SAM 351 _info("") 352 _info(click.style("Stopping SAM...", fg="blue")) 353 sam_runner.stop() 354 _info(click.style("Done.", fg="green")) 355 356 return exit_code