auth.py
1 # Copyright 2025 Alibaba Group Holding Ltd. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 15 """ 16 Authentication middleware for OpenSandbox Lifecycle API. 17 18 This module implements API Key authentication as specified in the OpenAPI spec. 19 API keys are configured via config.toml and validated against the OPEN-SANDBOX-API-KEY header. 20 """ 21 22 import re 23 from typing import Callable, Optional 24 25 from fastapi import Request, Response, status 26 from fastapi.responses import JSONResponse 27 from starlette.middleware.base import BaseHTTPMiddleware 28 29 from opensandbox_server.config import AppConfig, get_config 30 31 SANDBOX_API_KEY_HEADER = "OPEN-SANDBOX-API-KEY" 32 33 34 class AuthMiddleware(BaseHTTPMiddleware): 35 """ 36 Middleware for API Key authentication. 37 38 Validates the OPEN-SANDBOX-API-KEY header for all requests except health check. 39 Returns 401 Unauthorized if authentication fails. 40 """ 41 42 # Paths that don't require authentication 43 EXEMPT_PATHS = ["/health", "/docs", "/redoc", "/openapi.json"] 44 45 # Strict pattern for proxy-to-sandbox: /sandboxes/{id}/proxy/{port}/... with numeric port only. 46 # Matches the actual route in proxy.py; rejects path traversal (..) and malformed port. 47 _PROXY_PATH_RE = re.compile(r"^(/v1)?/sandboxes/[^/]+/proxy/\d+(/|$)") 48 49 @staticmethod 50 def _is_proxy_path(path: str) -> bool: 51 """True only for the exact proxy-route shape; rejects path traversal (..).""" 52 if ".." in path: 53 return False 54 return bool(AuthMiddleware._PROXY_PATH_RE.match(path)) 55 56 def __init__(self, app, config: Optional[AppConfig] = None): 57 """ 58 Initialize authentication middleware. 59 60 Args: 61 app: FastAPI application instance 62 config: Optional application configuration (for dependency injection) 63 """ 64 super().__init__(app) 65 self.config = config or get_config() 66 # Read the API key directly from config; suitable for dev/test usage 67 self.valid_api_keys = self._load_api_keys() 68 69 def _load_api_keys(self) -> set: 70 """ 71 Load valid API keys from configuration. 72 73 Returns: 74 set: Set of valid API keys 75 """ 76 # Supports a single API key from config; extend later for secret managers 77 api_key = self.config.server.api_key 78 # Treat empty string as no key configured 79 if api_key and api_key.strip(): 80 return {api_key} 81 return set() 82 83 async def dispatch(self, request: Request, call_next: Callable) -> Response: 84 """ 85 Process each request and validate authentication. 86 87 Args: 88 request: Incoming HTTP request 89 call_next: Next middleware or route handler 90 91 Returns: 92 Response: HTTP response 93 """ 94 # Skip authentication for exempt paths 95 if any(request.url.path.startswith(path) for path in self.EXEMPT_PATHS): 96 return await call_next(request) 97 98 # Skip authentication only for the exact proxy-to-sandbox route shape 99 # (no path traversal, no loose substring match) 100 if self._is_proxy_path(request.url.path): 101 return await call_next(request) 102 103 # If no API keys are configured, skip authentication 104 if not self.valid_api_keys: 105 return await call_next(request) 106 107 # Extract API key from header 108 api_key = request.headers.get(SANDBOX_API_KEY_HEADER) 109 110 # Validate API key 111 if not api_key: 112 return JSONResponse( 113 status_code=status.HTTP_401_UNAUTHORIZED, 114 content={ 115 "code": "MISSING_API_KEY", 116 "message": "Authentication credentials are missing. " 117 f"Provide API key via {SANDBOX_API_KEY_HEADER} header.", 118 }, 119 ) 120 121 # Enforce strict comparison whenever API keys are configured 122 if self.valid_api_keys and api_key not in self.valid_api_keys: 123 return JSONResponse( 124 status_code=status.HTTP_401_UNAUTHORIZED, 125 content={ 126 "code": "INVALID_API_KEY", 127 "message": "Authentication credentials are invalid. " 128 "Check your API key and try again.", 129 }, 130 ) 131 132 # Authentication successful, proceed to next middleware/handler 133 response = await call_next(request) 134 return response