/ cli / src / opensandbox_cli / utils.py
utils.py
  1  # Copyright 2026 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  """Shared CLI utilities: duration parsing, output selection, error handling, key-value parsing."""
 16  
 17  from __future__ import annotations
 18  
 19  import functools
 20  import re
 21  import sys
 22  from datetime import timedelta
 23  
 24  import click
 25  
 26  from opensandbox_cli.client import ClientContext
 27  
 28  # ---------------------------------------------------------------------------
 29  # Duration parsing  (e.g. "10m", "1h30m", "90s", "2h")
 30  # ---------------------------------------------------------------------------
 31  
 32  _DURATION_RE = re.compile(
 33      r"^(?:(?P<hours>\d+)h)?(?:(?P<minutes>\d+)m)?(?:(?P<seconds>\d+)s)?$"
 34  )
 35  
 36  
 37  def parse_duration(value: str) -> timedelta:
 38      """Parse a human-friendly duration string into a ``timedelta``.
 39  
 40      Supported formats: ``10m``, ``1h30m``, ``90s``, ``2h``, ``1h30m45s``.
 41      A plain integer is treated as seconds.
 42      """
 43      value = value.strip()
 44      if not value:
 45          raise click.BadParameter("Duration cannot be empty")
 46  
 47      # Plain integer → seconds
 48      if value.isdigit():
 49          return timedelta(seconds=int(value))
 50  
 51      m = _DURATION_RE.match(value)
 52      if not m or not m.group(0):
 53          raise click.BadParameter(
 54              f"Invalid duration '{value}'. Use format like 10m, 1h30m, 90s."
 55          )
 56  
 57      hours = int(m.group("hours") or 0)
 58      minutes = int(m.group("minutes") or 0)
 59      seconds = int(m.group("seconds") or 0)
 60      return timedelta(hours=hours, minutes=minutes, seconds=seconds)
 61  
 62  
 63  def parse_nullable_duration(value: str) -> timedelta | None:
 64      """Parse a duration string or the literal ``none``.
 65  
 66      ``none`` means no automatic timeout / manual cleanup mode.
 67      """
 68      normalized = value.strip().lower()
 69      if normalized == "none":
 70          return None
 71      return parse_duration(value)
 72  
 73  
 74  class DurationType(click.ParamType):
 75      """Click parameter type for duration strings."""
 76  
 77      name = "duration"
 78  
 79      def convert(
 80          self, value: str, param: click.Parameter | None, ctx: click.Context | None
 81      ) -> timedelta:
 82          if isinstance(value, timedelta):
 83              return value
 84          try:
 85              return parse_duration(value)
 86          except click.BadParameter:
 87              self.fail(
 88                  f"Invalid duration '{value}'. Use format like 10m, 1h30m, 90s.",
 89                  param,
 90                  ctx,
 91              )
 92  
 93  
 94  DURATION = DurationType()
 95  
 96  
 97  # ---------------------------------------------------------------------------
 98  # Key=Value parsing  (e.g. --env FOO=bar)
 99  # ---------------------------------------------------------------------------
100  
101  
102  class KeyValueType(click.ParamType):
103      """Click parameter type that parses ``KEY=VALUE`` strings into a tuple."""
104  
105      name = "KEY=VALUE"
106  
107      def convert(
108          self, value: str, param: click.Parameter | None, ctx: click.Context | None
109      ) -> tuple[str, str]:
110          if isinstance(value, tuple):
111              return value
112          if "=" not in value:
113              self.fail(f"Expected KEY=VALUE format, got '{value}'", param, ctx)
114          key, _, val = value.partition("=")
115          return (key, val)
116  
117  
118  KEY_VALUE = KeyValueType()
119  
120  
121  # ---------------------------------------------------------------------------
122  # Output helpers
123  # ---------------------------------------------------------------------------
124  
125  
126  def output_option(
127      *choices: str,
128      default: str | None = None,
129      help_text: str | None = None,
130  ):
131      """Attach a command-scoped output option."""
132      option_help = help_text or f"Output format: {', '.join(choices)}."
133      return click.option(
134          "-o",
135          "--output",
136          "output_format",
137          type=click.Choice(list(choices), case_sensitive=False),
138          default=None if default is None else default,
139          show_default=default is not None,
140          help=option_help,
141      )
142  
143  
144  def select_output_format(
145      obj: ClientContext,
146      requested: str | None,
147      *,
148      allowed: tuple[str, ...],
149      fallback: str,
150  ) -> str:
151      """Resolve a command-scoped output format from explicit input, config, and fallback."""
152      if requested:
153          if requested not in allowed:
154              allowed_list = ", ".join(allowed)
155              raise click.ClickException(
156                  f"This command does not support `-o {requested}`. Allowed values: {allowed_list}."
157              )
158          return requested
159  
160      return fallback
161  
162  
163  def prepare_output(
164      obj: ClientContext,
165      requested: str | None,
166      *,
167      allowed: tuple[str, ...],
168      fallback: str,
169  ):
170      """Resolve and attach the formatter for the current command."""
171      fmt = select_output_format(obj, requested, allowed=allowed, fallback=fallback)
172      return obj.make_output(fmt)
173  
174  
175  # ---------------------------------------------------------------------------
176  # Error handling decorator
177  # ---------------------------------------------------------------------------
178  
179  
180  def handle_errors(fn):  # type: ignore[no-untyped-def]
181      """Decorator that catches SDK / HTTP exceptions and prints a friendly message."""
182  
183      @functools.wraps(fn)
184      def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
185          try:
186              return fn(*args, **kwargs)
187          except click.exceptions.Exit:
188              raise
189          except click.ClickException:
190              raise
191          except Exception as exc:
192              # Import here to avoid circular imports at module level
193              from opensandbox.exceptions import SandboxException
194  
195              # Try to get the OutputFormatter from the Click context
196              ctx = click.get_current_context(silent=True)
197              obj = getattr(ctx, "obj", None) if ctx else None
198              output = getattr(obj, "output", None) if obj else None
199  
200              if output and hasattr(output, "error_panel"):
201                  if isinstance(exc, SandboxException):
202                      output.error_panel(str(exc), title="Sandbox Error")
203                  else:
204                      output.error_panel(
205                          f"{str(exc)}\n\n[dim]Type: {type(exc).__qualname__}[/]",
206                          title=type(exc).__name__,
207                      )
208              else:
209                  click.secho(f"Error: {exc}", fg="red", err=True)
210              sys.exit(1)
211  
212      return wrapper