config.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  """
 16  Application configuration management for sandbox server.
 17  
 18  Loads configuration from a TOML file (default: ~/.sandbox.toml) and exposes
 19  helpers to access the parsed settings throughout the application.
 20  """
 21  
 22  from __future__ import annotations
 23  
 24  import ipaddress
 25  import logging
 26  import os
 27  import re
 28  from pathlib import Path
 29  from typing import Any, ClassVar, Dict, Literal, Optional
 30  
 31  from pydantic import BaseModel, Field, ValidationError, model_validator
 32  
 33  try:  # Python 3.11+
 34      import tomllib  # type: ignore[attr-defined]
 35  except ModuleNotFoundError:  # Python 3.10 fallback
 36      import tomli as tomllib  # type: ignore[import]
 37  
 38  logger = logging.getLogger(__name__)
 39  
 40  CONFIG_ENV_VAR = "SANDBOX_CONFIG_PATH"
 41  DEFAULT_CONFIG_PATH = Path.home() / ".sandbox.toml"
 42  
 43  _HOSTNAME_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?:\.(?!-)[A-Za-z0-9-]{1,63})*$")
 44  _WILDCARD_DOMAIN_RE = re.compile(r"^\*\.(?!-)[A-Za-z0-9-]{1,63}(?:\.[A-Za-z0-9-]{1,63})+$")
 45  _IPV4_WITH_PORT_RE = re.compile(r"^(?P<ip>(?:\d{1,3}\.){3}\d{1,3})(?::(?P<port>\d{1,5}))?$")
 46  
 47  INGRESS_MODE_DIRECT = "direct"
 48  INGRESS_MODE_GATEWAY = "gateway"
 49  GATEWAY_ROUTE_MODE_WILDCARD = "wildcard"
 50  GATEWAY_ROUTE_MODE_HEADER = "header"
 51  GATEWAY_ROUTE_MODE_URI = "uri"
 52  
 53  EGRESS_MODE_DNS = "dns"
 54  EGRESS_MODE_DNS_NFT = "dns+nft"
 55  
 56  
 57  def _is_valid_ip(host: str) -> bool:
 58      try:
 59          ipaddress.ip_address(host)
 60          return True
 61      except ValueError:
 62          return False
 63  
 64  
 65  def _is_valid_ip_or_ip_port(address: str) -> bool:
 66      match = _IPV4_WITH_PORT_RE.match(address)
 67      if not match:
 68          return False
 69      ip_str = match.group("ip")
 70      if not _is_valid_ip(ip_str):
 71          return False
 72      port_str = match.group("port")
 73      if port_str is None:
 74          return True
 75      try:
 76          port = int(port_str)
 77      except ValueError:
 78          return False
 79      return 1 <= port <= 65535
 80  
 81  
 82  def _is_valid_hostname(address: str) -> bool:
 83      host = address
 84      port_str: Optional[str] = None
 85  
 86      if ":" in address:
 87          host, _, port_str = address.rpartition(":")
 88          if not host or not port_str:
 89              return False
 90          if not port_str.isdigit():
 91              return False
 92          port = int(port_str)
 93          if not (1 <= port <= 65535):
 94              return False
 95  
 96      return bool(_HOSTNAME_RE.match(host))
 97  
 98  
 99  def _is_wildcard_domain(host: str) -> bool:
100      return bool(_WILDCARD_DOMAIN_RE.match(host))
101  
102  
103  class RenewIntentRedisConfig(BaseModel):
104      """🧪 [EXPERIMENTAL] Redis list consumer for renew-intent queue (ingress gateway path)."""
105  
106      enabled: bool = Field(
107          default=False,
108          description=(
109              "🧪 [EXPERIMENTAL] When true, server workers consume renew intents from Redis "
110              "(ingress gateway path)."
111          ),
112      )
113      dsn: Optional[str] = Field(
114          default=None,
115          description=(
116              '🧪 [EXPERIMENTAL] Redis DSN (e.g. "redis://127.0.0.1:6379/0"). '
117              "Required when redis.enabled is true."
118          ),
119      )
120      queue_key: str = Field(
121          default="opensandbox:renew:intent",
122          min_length=1,
123          description="🧪 [EXPERIMENTAL] Redis List key for LPUSH/BRPOP renew-intent JSON payloads.",
124      )
125      consumer_concurrency: int = Field(
126          default=8,
127          ge=1,
128          description="🧪 [EXPERIMENTAL] Number of concurrent BRPOP worker tasks.",
129      )
130  
131      @model_validator(mode="after")
132      def require_dsn_when_redis_enabled(self) -> "RenewIntentRedisConfig":
133          if self.enabled and (self.dsn is None or not str(self.dsn).strip()):
134              raise ValueError(
135                  "[renew_intent] redis.dsn must be set when redis.enabled is true."
136              )
137          return self
138  
139  
140  class RenewIntentConfig(BaseModel):
141      """🧪 [EXPERIMENTAL] Renew sandbox expiration when access is observed (proxy and/or Redis queue)."""
142  
143      enabled: bool = Field(
144          default=False,
145          description=(
146              "🧪 [EXPERIMENTAL] Master switch for auto-renew on reverse-proxy access and/or Redis "
147              "ingress intents. When false, renew-intent logic is off."
148          ),
149      )
150      min_interval_seconds: int = Field(
151          default=60,
152          ge=1,
153          description=(
154              "🧪 [EXPERIMENTAL] Minimum seconds between successful renewals for the same sandbox "
155              "(cooldown)."
156          ),
157      )
158      redis: RenewIntentRedisConfig = Field(
159          default_factory=RenewIntentRedisConfig,
160          description=(
161              "🧪 [EXPERIMENTAL] Redis queue consumer for ingress gateway renew-intent mode. "
162              "In TOML, set keys under the same [renew_intent] table as redis.enabled, "
163              "redis.dsn, redis.queue_key, redis.consumer_concurrency (dotted keys)."
164          ),
165      )
166  
167  
168  class GatewayRouteModeConfig(BaseModel):
169      """Routing strategy for gateway ingress exposure."""
170  
171      mode: Literal[
172          GATEWAY_ROUTE_MODE_WILDCARD,
173          GATEWAY_ROUTE_MODE_HEADER,
174          GATEWAY_ROUTE_MODE_URI,
175      ] = Field(
176          ...,
177          description="Routing mode used by the gateway (wildcard, header, uri).",
178      )
179  
180      class Config:
181          populate_by_name = True
182  
183  
184  class GatewayConfig(BaseModel):
185      """Gateway mode configuration for ingress exposure."""
186  
187      address: str = Field(
188          ...,
189          description="Gateway host used to expose sandboxes (domain or IP, may include :port; scheme is not allowed).",
190          min_length=1,
191      )
192      route: GatewayRouteModeConfig = Field(
193          ...,
194          description="Routing mode configuration used by the gateway.",
195      )
196  
197  
198  class IngressConfig(BaseModel):
199      """Configuration for exposing sandbox ingress."""
200  
201      mode: Literal[INGRESS_MODE_DIRECT, INGRESS_MODE_GATEWAY] = Field(
202          default=INGRESS_MODE_DIRECT,
203          description="Ingress exposure mode (direct or gateway).",
204      )
205      gateway: Optional[GatewayConfig] = Field(
206          default=None,
207          description="Gateway configuration required when mode = 'gateway'.",
208      )
209  
210      @model_validator(mode="after")
211      def validate_ingress_mode(self) -> "IngressConfig":
212          if self.mode == INGRESS_MODE_GATEWAY and self.gateway is None:
213              raise ValueError("gateway block must be provided when ingress.mode = 'gateway'.")
214          if self.mode == INGRESS_MODE_DIRECT and self.gateway is not None:
215              raise ValueError("gateway block must be omitted unless ingress.mode = 'gateway'.")
216  
217          if self.mode == INGRESS_MODE_GATEWAY and self.gateway:
218              route_mode = self.gateway.route.mode
219              address_raw = self.gateway.address
220              hostport = address_raw
221              if "://" in address_raw:
222                  raise ValueError("ingress.gateway.address must not include a scheme; clients choose http/https.")
223  
224              if route_mode == GATEWAY_ROUTE_MODE_WILDCARD:
225                  if not _is_wildcard_domain(hostport):
226                      raise ValueError(
227                          "ingress.gateway.address must be a wildcard domain (e.g., *.example.com) "
228                          "when gateway.route.mode is wildcard."
229                      )
230              else:
231                  if "*" in hostport:
232                      raise ValueError(
233                          "ingress.gateway.address must not contain wildcard when gateway.route.mode is not wildcard."
234                      )
235                  if route_mode == GATEWAY_ROUTE_MODE_HEADER:
236                      if not (_is_valid_hostname(hostport) or _is_valid_ip_or_ip_port(hostport)):
237                          raise ValueError(
238                              "ingress.gateway.address must be a valid hostname, hostname:port, IP, or IP:port "
239                              "when gateway.route.mode is header."
240                          )
241                  elif route_mode == GATEWAY_ROUTE_MODE_URI:
242                      if not hostport.strip():
243                          raise ValueError(
244                              "ingress.gateway.address must not be empty when gateway.route.mode is uri."
245                          )
246          return self
247  
248  
249  class LogConfig(BaseModel):
250      """Logging configuration."""
251  
252      level: str = Field(
253          default="INFO",
254          description="Python logging level for the server process.",
255          min_length=3,
256      )
257      file_enabled: bool = Field(
258          default=False,
259          description=(
260              "When true, logs are written to rotating files instead of stdout. "
261              "Uses default paths (/var/log/opensandbox/) unless file_path/access_file_path are set."
262          ),
263      )
264      file_path: Optional[str] = Field(
265          default=None,
266          description=(
267              "Path to the main log file. When file_enabled=true and this is unset, "
268              "defaults to ~/logs/opensandbox/server.log."
269          ),
270      )
271      access_file_path: Optional[str] = Field(
272          default=None,
273          description=(
274              "Path to the HTTP access log file. When file_enabled=true, access logs are written "
275              "to a separate file by default (~/logs/opensandbox/access.log). Set this to override "
276              "the path. Example: '~/logs/opensandbox/access.log'."
277          ),
278      )
279      file_max_bytes: int = Field(
280          default=100 * 1024 * 1024,  # 100MB
281          ge=1,
282          description="Maximum size of each log file in bytes before rotation (default: 100MB).",
283      )
284      file_backup_count: int = Field(
285          default=5,
286          ge=0,
287          description="Number of backup log files to keep after rotation (default: 5).",
288      )
289  
290      # Default paths when file_enabled=true and user paths are not set.
291      # Uses ~/logs/opensandbox/ which is writable for non-root users.
292      DEFAULT_FILE_PATH: ClassVar[str] = str(Path.home() / "logs" / "opensandbox" / "server.log")
293      DEFAULT_ACCESS_FILE_PATH: ClassVar[str] = str(Path.home() / "logs" / "opensandbox" / "access.log")
294  
295      def resolved_file_path(self) -> Optional[str]:
296          """Return the effective file path, using default if file_enabled and not overridden."""
297          if not self.file_enabled:
298              return None
299          return self.file_path or self.DEFAULT_FILE_PATH
300  
301      def resolved_access_file_path(self) -> Optional[str]:
302          """Return the effective access file path (defaults to separate file when file_enabled)."""
303          if not self.file_enabled:
304              return None
305          return self.access_file_path or self.DEFAULT_ACCESS_FILE_PATH
306  
307  
308  class ServerConfig(BaseModel):
309      """FastAPI server configuration."""
310  
311      host: str = Field(
312          default="0.0.0.0",
313          description="Interface bound by the lifecycle API server.",
314          min_length=1,
315      )
316      port: int = Field(
317          default=8080,
318          ge=1,
319          le=65535,
320          description="Port exposed by the lifecycle API server.",
321      )
322      timeout_keep_alive: int = Field(
323          default=30,
324          ge=1,
325          description=(
326              "Idle keep-alive timeout in seconds passed to uvicorn. "
327              "Connections idle longer than this may be closed by the server."
328          ),
329      )
330      api_key: Optional[str] = Field(
331          default=None,
332          description="Global API key for authenticating incoming lifecycle API calls.",
333      )
334      eip: Optional[str] = Field(
335          default=None,
336          description="Bound public IP. When set, used as the host part when returning sandbox endpoints.",
337      )
338      max_sandbox_timeout_seconds: Optional[int] = Field(
339          default=None,
340          ge=60,
341          description=(
342              "Maximum allowed sandbox TTL in seconds for requests that specify timeout. "
343              "Omit from config to disable the server-side upper bound."
344          ),
345      )
346  
347  
348  class KubernetesRuntimeConfig(BaseModel):
349      """Kubernetes-specific runtime configuration."""
350  
351      kubeconfig_path: Optional[str] = Field(
352          default=None,
353          description="Absolute path to the kubeconfig file used for API authentication.",
354      )
355      informer_enabled: bool = Field(
356          default=True,
357          description=(
358              "[Beta] Enable informer-backed cache for workload reads. "
359              "Keeps a watch to reduce API pressure; set false to disable."
360          ),
361      )
362      informer_resync_seconds: int = Field(
363          default=300,
364          ge=1,
365          description=(
366              "[Beta] Full resync interval for informer cache (seconds). "
367              "Shorter intervals refresh the cache more eagerly."
368          ),
369      )
370      informer_watch_timeout_seconds: int = Field(
371          default=60,
372          ge=1,
373          description=(
374              "[Beta] Watch timeout (seconds) before restarting the informer stream."
375          ),
376      )
377      read_qps: float = Field(
378          default=0.0,
379          ge=0,
380          description=(
381              "Maximum read requests per second to the Kubernetes API (get/list). "
382              "0 means unlimited (no rate limiting)."
383          ),
384      )
385      read_burst: int = Field(
386          default=0,
387          ge=0,
388          description=(
389              "Burst size for the read rate limiter. "
390              "0 means use read_qps as burst (minimum 1)."
391          ),
392      )
393      write_qps: float = Field(
394          default=0.0,
395          ge=0,
396          description=(
397              "Maximum write requests per second to the Kubernetes API (create/delete/patch). "
398              "0 means unlimited (no rate limiting)."
399          ),
400      )
401      write_burst: int = Field(
402          default=0,
403          ge=0,
404          description=(
405              "Burst size for the write rate limiter. "
406              "0 means use write_qps as burst (minimum 1)."
407          ),
408      )
409      namespace: Optional[str] = Field(
410          default=None,
411          description="Namespace used for sandbox workloads.",
412      )
413      service_account: Optional[str] = Field(
414          default=None,
415          description="Service account bound to sandbox workloads.",
416      )
417      workload_provider: Optional[str] = Field(
418          default=None,
419          description="Workload provider type. If not specified, uses the first registered provider.",
420      )
421      batchsandbox_template_file: Optional[str] = Field(
422          default=None,
423          description="Path to BatchSandbox CR YAML template file. Used when workload_provider is 'batchsandbox'.",
424      )
425      sandbox_create_timeout_seconds: int = Field(
426          default=60,
427          ge=1,
428          description="Timeout in seconds to wait for a sandbox to become ready (IP assigned) after creation.",
429      )
430      sandbox_create_poll_interval_seconds: float = Field(
431          default=1.0,
432          gt=0,
433          description="Polling interval in seconds when waiting for a sandbox to become ready after creation.",
434      )
435      execd_init_resources: Optional["ExecdInitResources"] = Field(
436          default=None,
437          description=(
438              "Resource requests/limits for the execd init container. "
439              "If unset, no resource constraints are applied."
440          ),
441      )
442  
443  
444  class ExecdInitResources(BaseModel):
445      """Resource requests and limits for the execd init container."""
446  
447      limits: Optional[Dict[str, str]] = Field(
448          default=None,
449          description='Resource limits, e.g. {cpu = "100m", memory = "128Mi"}.',
450      )
451      requests: Optional[Dict[str, str]] = Field(
452          default=None,
453          description='Resource requests, e.g. {cpu = "50m", memory = "64Mi"}.',
454      )
455  
456  
457  class AgentSandboxRuntimeConfig(BaseModel):
458      """Agent-sandbox runtime configuration."""
459  
460      template_file: Optional[str] = Field(
461          default=None,
462          description="Path to Sandbox CR YAML template file for agent-sandbox.",
463      )
464      shutdown_policy: Literal["Delete", "Retain"] = Field(
465          default="Delete",
466          description="Shutdown policy applied when a sandbox expires (Delete or Retain).",
467      )
468      ingress_enabled: bool = Field(
469          default=True,
470          description="Whether ingress routing to agent-sandbox pods is expected to be enabled.",
471      )
472  
473  
474  class StorageConfig(BaseModel):
475      """Volume and storage configuration for sandbox mounts."""
476  
477      allowed_host_paths: list[str] = Field(
478          default_factory=list,
479          description=(
480              "Allowlist of host path prefixes permitted for host bind mounts. "
481              "If empty, host bind mounts are rejected. "
482              "Each entry must be an absolute path (e.g., '/data/opensandbox')."
483          ),
484      )
485      volume_default_size: str = Field(
486          default="1Gi",
487          description=(
488              "Default storage size for auto-created PVCs when the caller does "
489              "not specify a size in the PVC provisioning hints."
490          ),
491      )
492      ossfs_mount_root: str = Field(
493          default="/mnt/ossfs",
494          description=(
495              "Host-side root directory where OSSFS mounts are resolved. "
496              "Resolved OSSFS host paths are built as "
497              "'ossfs_mount_root/<bucket>/<volume.subPath?>'."
498          ),
499      )
500  
501  DEFAULT_EGRESS_DISABLE_IPV6 = True
502  
503  class EgressConfig(BaseModel):
504      """Egress sidecar configuration."""
505  
506      image: Optional[str] = Field(
507          default=None,
508          description="Container image for the egress sidecar (used when network policy is requested).",
509          min_length=1,
510      )
511      mode: Literal[
512          EGRESS_MODE_DNS,
513          EGRESS_MODE_DNS_NFT,
514      ] = Field(
515          default=EGRESS_MODE_DNS,
516          description="Egress enforcement passed to the sidecar as OPENSANDBOX_EGRESS_MODE (dns or dns+nft).",
517      )
518      disable_ipv6: bool = Field(
519          default=DEFAULT_EGRESS_DISABLE_IPV6,
520          description=(
521              "Default true: egress IPv6 support is incomplete, especially on Kubernetes runtime. "
522              "Set false only if you intentionally leave IPv6 enabled in the sandbox netns "
523              "(e.g. IPv4-only CNI or experimenting with IPv6 egress despite gaps)."
524          ),
525      )
526  
527  
528  class RuntimeConfig(BaseModel):
529      """Runtime selection (docker, kubernetes, etc.)."""
530  
531      type: Literal["docker", "kubernetes"] = Field(
532          ...,
533          description="Active sandbox runtime implementation.",
534      )
535      execd_image: str = Field(
536          ...,
537          description="Container image that contains the execd binary for sandbox initialization.",
538          min_length=1,
539      )
540  
541  
542  class SecureRuntimeConfig(BaseModel):
543      """Secure container runtime configuration (gVisor, Kata, Firecracker)."""
544  
545      type: Literal["", "gvisor", "kata", "firecracker"] = Field(
546          default="",
547          description=(
548              "Secure runtime type. Empty means no secure runtime. "
549              "gVisor uses runsc OCI runtime. "
550              "Kata uses kata-runtime (OCI) or kata-qemu (RuntimeClass). "
551              "Firecracker uses kata-fc (RuntimeClass, Kubernetes only)."
552          ),
553      )
554      docker_runtime: Optional[str] = Field(
555          default=None,
556          description=(
557              "OCI runtime name for Docker (e.g., 'runsc' for gVisor, 'kata-runtime' for Kata). "
558              "When specified, the Docker daemon will use this runtime instead of runc."
559          ),
560      )
561      k8s_runtime_class: Optional[str] = Field(
562          default=None,
563          description=(
564              "Kubernetes RuntimeClass name for secure containers. "
565              "Common values: 'gvisor', 'kata-qemu', 'kata-fc'. "
566              "When specified, pods will have runtimeClassName set to this value."
567          ),
568      )
569  
570      @model_validator(mode="after")
571      def validate_secure_runtime(self) -> "SecureRuntimeConfig":
572          if self.type == "":
573              # No secure runtime configured
574              if self.docker_runtime is not None or self.k8s_runtime_class is not None:
575                  raise ValueError(
576                      "docker_runtime and k8s_runtime_class must be omitted when secure_runtime.type is empty."
577                  )
578              return self
579  
580          if self.type == "firecracker":
581              # Firecracker is Kubernetes-only
582              if self.k8s_runtime_class is None:
583                  raise ValueError(
584                      "secure_runtime.k8s_runtime_class is required when secure_runtime.type is 'firecracker'."
585                  )
586              # Optional: also allow docker_runtime for consistency, but Firecracker won't use it
587  
588          # For gVisor and Kata, at least one runtime must be specified
589          if self.type in ("gvisor", "kata"):
590              if self.docker_runtime is None and self.k8s_runtime_class is None:
591                  raise ValueError(
592                      f"At least one of secure_runtime.docker_runtime or secure_runtime.k8s_runtime_class "
593                      f"must be specified when secure_runtime.type is '{self.type}'."
594                  )
595  
596          return self
597  
598  
599  class DockerConfig(BaseModel):
600      """Docker runtime specific settings."""
601  
602      network_mode: str = Field(
603          default="host",
604          description="Docker network mode for sandbox containers (host, bridge, or a custom user-defined network name).",
605      )
606      api_timeout: Optional[int] = Field(
607          default=None,
608          ge=1,
609          description="Docker API timeout in seconds. If unset, default is 180.",
610      )
611      host_ip: Optional[str] = Field(
612          default=None,
613          description=(
614              "Docker host IP or hostname for bridge-mode endpoint URLs when the server runs in a container."
615          ),
616      )
617      drop_capabilities: list[str] = Field(
618          default_factory=lambda: [
619              "AUDIT_WRITE",
620              "MKNOD",
621              "NET_ADMIN",
622              "NET_RAW",
623              "SYS_ADMIN",
624              "SYS_MODULE",
625              "SYS_PTRACE",
626              "SYS_TIME",
627              "SYS_TTY_CONFIG",
628          ],
629          description=(
630              "Linux capabilities to drop from sandbox containers. Defaults to a conservative set to reduce host impact."
631          ),
632      )
633      apparmor_profile: Optional[str] = Field(
634          default=None,
635          description=(
636              "Optional AppArmor profile name applied to sandbox containers. Leave unset to let Docker choose the default."
637          ),
638      )
639      no_new_privileges: bool = Field(
640          default=True,
641          description="Enable the kernel no_new_privileges flag to block privilege escalation inside the container.",
642      )
643      seccomp_profile: Optional[str] = Field(
644          default=None,
645          description=(
646              "Optional seccomp profile name or path applied to sandbox containers. Leave unset to use Docker's default profile."
647          ),
648      )
649      pids_limit: Optional[int] = Field(
650          default=4096,
651          ge=1,
652          description="Maximum number of processes allowed per sandbox container. Set to null to disable the limit.",
653      )
654  
655  
656  class AppConfig(BaseModel):
657      """Root application configuration model."""
658  
659      server: ServerConfig = Field(default_factory=ServerConfig)
660      log: LogConfig = Field(
661          default_factory=LogConfig,
662          description="Logging configuration (level, file output, rotation).",
663      )
664      renew_intent: RenewIntentConfig = Field(
665          default_factory=RenewIntentConfig,
666          description="Auto-renew sandbox expiration when reverse-proxy access is observed.",
667      )
668      runtime: RuntimeConfig = Field(..., description="Sandbox runtime configuration.")
669      kubernetes: Optional[KubernetesRuntimeConfig] = None
670      agent_sandbox: Optional["AgentSandboxRuntimeConfig"] = None
671      ingress: Optional[IngressConfig] = None
672      docker: DockerConfig = Field(default_factory=DockerConfig)
673      storage: StorageConfig = Field(default_factory=StorageConfig)
674      egress: Optional[EgressConfig] = None
675      secure_runtime: Optional[SecureRuntimeConfig] = Field(
676          default=None,
677          description="Secure container runtime configuration (gVisor, Kata, Firecracker).",
678      )
679  
680      @model_validator(mode="after")
681      def validate_runtime_blocks(self) -> "AppConfig":
682          if self.runtime.type == "docker":
683              if self.kubernetes is not None:
684                  raise ValueError("Kubernetes block must be omitted when runtime.type = 'docker'.")
685              if self.agent_sandbox is not None:
686                  raise ValueError("agent_sandbox block must be omitted when runtime.type = 'docker'.")
687              if self.ingress is not None and self.ingress.mode != INGRESS_MODE_DIRECT:
688                  raise ValueError("ingress.mode must be 'direct' when runtime.type = 'docker'.")
689              if self.secure_runtime is not None and self.secure_runtime.type == "firecracker":
690                  raise ValueError( "secure_runtime.type 'firecracker' is only compatible with runtime.type='kubernetes'.")
691          elif self.runtime.type == "kubernetes":
692              if self.kubernetes is None:
693                  self.kubernetes = KubernetesRuntimeConfig()
694              provider_type = (self.kubernetes.workload_provider or "").lower()
695              if provider_type == "agent-sandbox":
696                  if self.agent_sandbox is None:
697                      self.agent_sandbox = AgentSandboxRuntimeConfig()
698              elif self.agent_sandbox is not None:
699                  raise ValueError(
700                      "agent_sandbox block requires kubernetes.workload_provider = 'agent-sandbox'."
701                  )
702          else:
703              raise ValueError(f"Unsupported runtime type '{self.runtime.type}'.")
704          return self
705  
706  
707  _config: AppConfig | None = None
708  _config_path: Path | None = None
709  
710  
711  def _resolve_config_path(path: str | Path | None = None) -> Path:
712      """Resolve configuration file path from explicit value, env var, or default."""
713      if path:
714          return Path(path).expanduser()
715      env_path = os.environ.get(CONFIG_ENV_VAR)
716      if env_path:
717          return Path(env_path).expanduser()
718      return DEFAULT_CONFIG_PATH
719  
720  
721  def _load_toml_data(path: Path) -> dict[str, Any]:
722      """Load TOML content from file, returning empty dict if file is missing."""
723      if not path.exists():
724          logger.info("Config file %s not found. Using default configuration.", path)
725          return {}
726  
727      try:
728          with path.open("rb") as fh:
729              data = tomllib.load(fh)
730              logger.info("Loaded configuration from %s", path)
731              return data
732      except Exception as exc:  # noqa: BLE001
733          logger.error("Failed to read config file %s: %s", path, exc)
734          raise
735  
736  
737  def load_config(path: str | Path | None = None) -> AppConfig:
738      """
739      Load configuration from TOML file and store it globally.
740  
741      Args:
742          path: Optional explicit config path. Falls back to SANDBOX_CONFIG_PATH env,
743                then ~/.sandbox.toml when not provided.
744  
745      Returns:
746          AppConfig: Parsed application configuration.
747  
748      Raises:
749          ValidationError: If the TOML contents do not match AppConfig schema.
750          Exception: For any IO or parsing errors.
751      """
752      global _config, _config_path
753  
754      resolved_path = _resolve_config_path(path)
755      raw_data = _load_toml_data(resolved_path)
756  
757      try:
758          _config = AppConfig(**raw_data)
759      except ValidationError as exc:
760          logger.error("Invalid configuration in %s: %s", resolved_path, exc)
761          raise
762  
763      _config_path = resolved_path
764      return _config
765  
766  
767  def get_config() -> AppConfig:
768      """
769      Retrieve the currently loaded configuration, loading defaults if necessary.
770  
771      Returns:
772          AppConfig: Currently active configuration.
773      """
774      global _config
775      if _config is None:
776          _config = load_config()
777      return _config
778  
779  
780  def get_config_path() -> Path:
781      """Return the resolved configuration path."""
782      global _config_path
783      if _config_path is None:
784          _config_path = _resolve_config_path()
785      return _config_path
786  
787  
788  __all__ = [
789      "AppConfig",
790      "RenewIntentConfig",
791      "RenewIntentRedisConfig",
792      "ServerConfig",
793      "LogConfig",
794      "RuntimeConfig",
795      "IngressConfig",
796      "GatewayConfig",
797      "GatewayRouteModeConfig",
798      "INGRESS_MODE_DIRECT",
799      "INGRESS_MODE_GATEWAY",
800      "DockerConfig",
801      "StorageConfig",
802      "KubernetesRuntimeConfig",
803      "EgressConfig",
804      "EGRESS_MODE_DNS",
805      "EGRESS_MODE_DNS_NFT",
806      "SecureRuntimeConfig",
807      "DEFAULT_CONFIG_PATH",
808      "CONFIG_ENV_VAR",
809      "get_config",
810      "get_config_path",
811      "load_config",
812  ]