auth.py
  1  # Copyright 2025 Alibaba Group Holding Ltd.
  2  #
  3  # Licensed under the Apache License, Version 2.0 (the "License");
  4  # you may not use this file except in compliance with the License.
  5  # You may obtain a copy of the License at
  6  #
  7  #     http://www.apache.org/licenses/LICENSE-2.0
  8  #
  9  # Unless required by applicable law or agreed to in writing, software
 10  # distributed under the License is distributed on an "AS IS" BASIS,
 11  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 12  # See the License for the specific language governing permissions and
 13  # limitations under the License.
 14  
 15  """
 16  Authentication middleware for OpenSandbox Lifecycle API.
 17  
 18  This module implements API Key authentication as specified in the OpenAPI spec.
 19  API keys are configured via config.toml and validated against the OPEN-SANDBOX-API-KEY header.
 20  """
 21  
 22  import re
 23  from typing import Callable, Optional
 24  
 25  from fastapi import Request, Response, status
 26  from fastapi.responses import JSONResponse
 27  from starlette.middleware.base import BaseHTTPMiddleware
 28  
 29  from opensandbox_server.config import AppConfig, get_config
 30  
 31  SANDBOX_API_KEY_HEADER = "OPEN-SANDBOX-API-KEY"
 32  
 33  
 34  class AuthMiddleware(BaseHTTPMiddleware):
 35      """
 36      Middleware for API Key authentication.
 37  
 38      Validates the OPEN-SANDBOX-API-KEY header for all requests except health check.
 39      Returns 401 Unauthorized if authentication fails.
 40      """
 41  
 42      # Paths that don't require authentication
 43      EXEMPT_PATHS = ["/health", "/docs", "/redoc", "/openapi.json"]
 44  
 45      # Strict pattern for proxy-to-sandbox: /sandboxes/{id}/proxy/{port}/... with numeric port only.
 46      # Matches the actual route in proxy.py; rejects path traversal (..) and malformed port.
 47      _PROXY_PATH_RE = re.compile(r"^(/v1)?/sandboxes/[^/]+/proxy/\d+(/|$)")
 48  
 49      @staticmethod
 50      def _is_proxy_path(path: str) -> bool:
 51          """True only for the exact proxy-route shape; rejects path traversal (..)."""
 52          if ".." in path:
 53              return False
 54          return bool(AuthMiddleware._PROXY_PATH_RE.match(path))
 55  
 56      def __init__(self, app, config: Optional[AppConfig] = None):
 57          """
 58          Initialize authentication middleware.
 59  
 60          Args:
 61              app: FastAPI application instance
 62              config: Optional application configuration (for dependency injection)
 63          """
 64          super().__init__(app)
 65          self.config = config or get_config()
 66          # Read the API key directly from config; suitable for dev/test usage
 67          self.valid_api_keys = self._load_api_keys()
 68  
 69      def _load_api_keys(self) -> set:
 70          """
 71          Load valid API keys from configuration.
 72  
 73          Returns:
 74              set: Set of valid API keys
 75          """
 76          # Supports a single API key from config; extend later for secret managers
 77          api_key = self.config.server.api_key
 78          # Treat empty string as no key configured
 79          if api_key and api_key.strip():
 80              return {api_key}
 81          return set()
 82  
 83      async def dispatch(self, request: Request, call_next: Callable) -> Response:
 84          """
 85          Process each request and validate authentication.
 86  
 87          Args:
 88              request: Incoming HTTP request
 89              call_next: Next middleware or route handler
 90  
 91          Returns:
 92              Response: HTTP response
 93          """
 94          # Skip authentication for exempt paths
 95          if any(request.url.path.startswith(path) for path in self.EXEMPT_PATHS):
 96              return await call_next(request)
 97  
 98          # Skip authentication only for the exact proxy-to-sandbox route shape
 99          # (no path traversal, no loose substring match)
100          if self._is_proxy_path(request.url.path):
101              return await call_next(request)
102  
103          # If no API keys are configured, skip authentication
104          if not self.valid_api_keys:
105              return await call_next(request)
106  
107          # Extract API key from header
108          api_key = request.headers.get(SANDBOX_API_KEY_HEADER)
109  
110          # Validate API key
111          if not api_key:
112              return JSONResponse(
113                  status_code=status.HTTP_401_UNAUTHORIZED,
114                  content={
115                      "code": "MISSING_API_KEY",
116                      "message": "Authentication credentials are missing. "
117                                f"Provide API key via {SANDBOX_API_KEY_HEADER} header.",
118                  },
119              )
120  
121          # Enforce strict comparison whenever API keys are configured
122          if self.valid_api_keys and api_key not in self.valid_api_keys:
123              return JSONResponse(
124                  status_code=status.HTTP_401_UNAUTHORIZED,
125                  content={
126                      "code": "INVALID_API_KEY",
127                      "message": "Authentication credentials are invalid. "
128                                "Check your API key and try again.",
129                  },
130              )
131  
132          # Authentication successful, proceed to next middleware/handler
133          response = await call_next(request)
134          return response