/ cli / commands / task_cmd / run.py
run.py
  1  """
  2  CLI command for running SAM, sending a task, and stopping - all in one command.
  3  """
  4  import asyncio
  5  import sys
  6  import time
  7  import uuid
  8  from pathlib import Path
  9  from typing import Optional, List, Dict, Any
 10  
 11  import click
 12  import httpx
 13  
 14  from cli.utils import error_exit
 15  
 16  from .common import (
 17      fetch_available_agents,
 18      get_agent_name_from_cards,
 19      execute_task,
 20  )
 21  from .sam_runner import SAMRunner, discover_config_files
 22  
 23  
 24  async def wait_for_agents(
 25      url: str,
 26      target_agent: str,
 27      token: Optional[str],
 28      timeout: float,
 29      poll_interval: float,
 30      stabilization_wait: float,
 31      debug_fn,
 32  ) -> tuple[List[Dict[str, Any]], str]:
 33      """
 34      Wait for agents to become available.
 35  
 36      Algorithm:
 37      1. Poll /api/v1/agentCards until we get a response with at least one agent
 38      2. Wait stabilization_wait seconds for all agents to register
 39      3. Poll again to get the final list
 40      4. Verify target agent exists
 41  
 42      Returns:
 43          Tuple of (agent_cards, resolved_agent_name)
 44  
 45      Raises:
 46          TimeoutError: If agents don't become ready within timeout
 47      """
 48      start_time = time.time()
 49      first_agents_time = None
 50  
 51      while time.time() - start_time < timeout:
 52          try:
 53              agent_cards = await fetch_available_agents(url, token)
 54  
 55              if agent_cards:
 56                  if first_agents_time is None:
 57                      # First time we see agents - start stabilization wait
 58                      first_agents_time = time.time()
 59                      agent_names = [c.get("name", "?") for c in agent_cards]
 60                      debug_fn(f"First agents detected: {agent_names}")
 61                      debug_fn(f"Waiting {stabilization_wait}s for stabilization...")
 62                      await asyncio.sleep(stabilization_wait)
 63                      # Poll again after stabilization
 64                      agent_cards = await fetch_available_agents(url, token)
 65                      agent_names = [c.get("name", "?") for c in agent_cards]
 66                      debug_fn(f"After stabilization: {agent_names}")
 67  
 68                  # Check if target agent is available
 69                  resolved_agent = get_agent_name_from_cards(agent_cards, target_agent)
 70                  if resolved_agent:
 71                      return agent_cards, resolved_agent
 72  
 73                  debug_fn(f"Target agent '{target_agent}' not yet available")
 74  
 75          except httpx.ConnectError:
 76              debug_fn("Gateway not yet responding...")
 77          except httpx.HTTPStatusError as e:
 78              debug_fn(f"Gateway error: {e.response.status_code}")
 79  
 80          await asyncio.sleep(poll_interval)
 81  
 82      raise TimeoutError(f"Timeout waiting for agent '{target_agent}' after {timeout}s")
 83  
 84  
 85  @click.command("run")
 86  @click.argument("message", required=True)
 87  @click.option(
 88      "--config",
 89      "-c",
 90      "config_paths",
 91      multiple=True,
 92      type=click.Path(exists=True, dir_okay=True, resolve_path=True),
 93      help="YAML config files or directories (can be used multiple times). Defaults to configs/ directory.",
 94  )
 95  @click.option(
 96      "--skip",
 97      "-s",
 98      "skip_files",
 99      multiple=True,
100      help="File name(s) to exclude from configs (e.g., -s my_agent.yaml).",
101  )
102  @click.option(
103      "--url",
104      "-u",
105      envvar="SAM_WEBUI_URL",
106      default="http://localhost:8000",
107      help="Base URL of the webui gateway (default: http://localhost:8000)",
108  )
109  @click.option(
110      "--agent",
111      "-a",
112      envvar="SAM_AGENT",
113      default="orchestrator",
114      help="Target agent name (default: orchestrator)",
115  )
116  @click.option(
117      "--session-id",
118      default=None,
119      help="Session ID for context continuity (generates new if not provided)",
120  )
121  @click.option(
122      "--token",
123      "-t",
124      envvar="SAM_AUTH_TOKEN",
125      default=None,
126      help="Bearer token for authentication",
127  )
128  @click.option(
129      "--file",
130      "-f",
131      "files",
132      multiple=True,
133      type=click.Path(exists=True, dir_okay=False, resolve_path=True),
134      help="File(s) to attach (can be used multiple times)",
135  )
136  @click.option(
137      "--timeout",
138      default=300,
139      type=int,
140      help="Timeout in seconds for task execution (default: 300)",
141  )
142  @click.option(
143      "--startup-timeout",
144      default=60,
145      type=int,
146      help="Timeout in seconds for agent readiness (default: 60)",
147  )
148  @click.option(
149      "--output-dir",
150      "-o",
151      default=None,
152      type=click.Path(),
153      help="Output directory for artifacts and logs (default: /tmp/sam-task-run-{taskId})",
154  )
155  @click.option(
156      "--quiet",
157      "-q",
158      is_flag=True,
159      help="Suppress streaming output, only show final result",
160  )
161  @click.option(
162      "--no-stim",
163      is_flag=True,
164      help="Do not fetch the STIM file on completion",
165  )
166  @click.option(
167      "--system-env",
168      is_flag=True,
169      help="Use system environment variables only; do not load .env file.",
170  )
171  @click.option(
172      "--debug",
173      is_flag=True,
174      help="Enable debug output",
175  )
176  def run_task(
177      message: str,
178      config_paths: tuple,
179      skip_files: tuple,
180      url: str,
181      agent: str,
182      session_id: Optional[str],
183      token: Optional[str],
184      files: tuple,
185      timeout: int,
186      startup_timeout: int,
187      output_dir: Optional[str],
188      quiet: bool,
189      no_stim: bool,
190      system_env: bool,
191      debug: bool,
192  ):
193      """
194      Start SAM, send a task, stream the response, and stop.
195  
196      This command runs SAM with the specified configuration, waits for agents
197      to become ready, sends a task, streams the response, and then cleanly
198      shuts down SAM.
199  
200      MESSAGE is the prompt text to send to the agent.
201  
202      \b
203      Examples:
204          # Basic usage with default configs
205          sam task run "What agents are available?"
206  
207          # Specify config files
208          sam task run "Hello" -c examples/agents/orchestrator.yaml -c examples/gateways/webui.yaml
209  
210          # With file attachment
211          sam task run "Summarize this document" --file ./document.pdf -c configs/
212  
213          # Target specific agent
214          sam task run "Analyze data" --agent data_analyst -c configs/
215      """
216      try:
217          exit_code = asyncio.run(
218              _run_task_main(
219                  message=message,
220                  config_paths=config_paths,
221                  skip_files=skip_files,
222                  url=url,
223                  agent=agent,
224                  session_id=session_id,
225                  token=token,
226                  files=list(files),
227                  timeout=timeout,
228                  startup_timeout=startup_timeout,
229                  output_dir=output_dir,
230                  quiet=quiet,
231                  no_stim=no_stim,
232                  system_env=system_env,
233                  debug=debug,
234              )
235          )
236          sys.exit(exit_code)
237      except KeyboardInterrupt:
238          click.echo("\n\nTask cancelled by user.")
239          sys.exit(1)
240      except Exception as e:
241          if debug:
242              import traceback
243              traceback.print_exc()
244          error_exit(f"Error: {e}")
245  
246  
247  async def _run_task_main(
248      message: str,
249      config_paths: tuple,
250      skip_files: tuple,
251      url: str,
252      agent: str,
253      session_id: Optional[str],
254      token: Optional[str],
255      files: List[str],
256      timeout: int,
257      startup_timeout: int,
258      output_dir: Optional[str],
259      quiet: bool,
260      no_stim: bool,
261      system_env: bool,
262      debug: bool,
263  ) -> int:
264      """Main async implementation of the task run command."""
265  
266      def _debug(msg: str):
267          if debug:
268              click.echo(click.style(f"[DEBUG] {msg}", fg="yellow"), err=True)
269  
270      def _info(msg: str):
271          if not quiet:
272              click.echo(msg)
273  
274      # Discover config files
275      try:
276          config_files = discover_config_files(config_paths, skip_files)
277      except FileNotFoundError as e:
278          click.echo(click.style(str(e), fg="red"), err=True)
279          return 1
280  
281      if not config_files:
282          click.echo(click.style("No configuration files found.", fg="red"), err=True)
283          return 1
284  
285      _info(click.style(f"Starting SAM with {len(config_files)} config file(s)...", fg="blue"))
286      for cf in config_files:
287          _debug(f"  Config: {cf}")
288  
289      # Create output directory with unique name to avoid collisions between concurrent runs
290      output_path = Path(output_dir) if output_dir else Path(f"/tmp/sam-task-run-{uuid.uuid4()}")
291      output_path.mkdir(parents=True, exist_ok=True)
292      log_file = output_path / "sam.log"
293  
294      # Create SAM runner
295      sam_runner = SAMRunner(
296          config_files=config_files,
297          log_file=log_file,
298          load_env=not system_env,
299      )
300  
301      exit_code = 1
302  
303      try:
304          # Start SAM
305          sam_runner.start()
306          _info(click.style("SAM started.", fg="green"))
307  
308          # Wait for agents to be ready
309          _info(click.style(f"Waiting for agents (timeout: {startup_timeout}s)...", fg="blue"))
310  
311          try:
312              agent_cards, resolved_agent = await wait_for_agents(
313                  url=url,
314                  target_agent=agent,
315                  token=token,
316                  timeout=startup_timeout,
317                  poll_interval=1.0,
318                  stabilization_wait=2.0,
319                  debug_fn=_debug,
320              )
321          except TimeoutError as e:
322              click.echo(click.style(str(e), fg="red"), err=True)
323              click.echo(click.style(f"Check {log_file} for SAM startup logs.", fg="yellow"), err=True)
324              return 1
325  
326          agent_names = [c.get("name", "?") for c in agent_cards]
327          _info(click.style(f"Agents ready: {', '.join(agent_names)}", fg="green"))
328  
329          if resolved_agent != agent:
330              _info(click.style(f"Using agent: {resolved_agent}", fg="yellow"))
331  
332          _info("")
333  
334          # Send the task
335          exit_code = await execute_task(
336              message=message,
337              url=url,
338              agent=resolved_agent,
339              session_id=session_id,
340              token=token,
341              files=files,
342              timeout=timeout,
343              output_dir=output_path,
344              quiet=quiet,
345              no_stim=no_stim,
346              debug=debug,
347          )
348  
349      finally:
350          # Always stop SAM
351          _info("")
352          _info(click.style("Stopping SAM...", fg="blue"))
353          sam_runner.stop()
354          _info(click.style("Done.", fg="green"))
355  
356      return exit_code