server.py
1 """ 2 Test Static File Server for integration testing of web_request tool. 3 Serves static test files and supports dynamic response configuration. 4 """ 5 6 import asyncio 7 import logging 8 import os 9 import threading 10 import time 11 from pathlib import Path 12 from typing import Any, Dict, List, Optional 13 14 import uvicorn 15 from fastapi import FastAPI, Request 16 from fastapi.responses import Response, JSONResponse 17 from starlette.responses import FileResponse 18 19 20 class TestStaticFileServer: 21 """ 22 A lightweight HTTP server for serving static test files during integration tests. 23 24 Provides: 25 - Static file serving from a test data directory 26 - Dynamic response configuration for testing edge cases 27 - Request capture for test assertions 28 - Health check endpoint 29 """ 30 31 def __init__( 32 self, 33 host: str = "127.0.0.1", 34 port: int = 8089, 35 content_dir: Optional[str] = None 36 ): 37 self.host = host 38 self.port = port 39 40 # Determine content directory 41 if content_dir is None: 42 # Default to test_data/web_content relative to this file 43 pkg_dir = Path(__file__).parent.parent 44 self.content_dir = pkg_dir / "test_data" / "web_content" 45 else: 46 self.content_dir = Path(content_dir) 47 48 self.content_dir.mkdir(parents=True, exist_ok=True) 49 50 # Server state 51 self._uvicorn_server: Optional[uvicorn.Server] = None 52 self._server_thread: Optional[threading.Thread] = None 53 self._app = FastAPI() 54 55 # Request capture 56 self.captured_requests: List[Dict[str, Any]] = [] 57 58 # Dynamic response configuration 59 self._configured_responses: Dict[str, Dict[str, Any]] = {} 60 self._response_lock = threading.Lock() 61 62 # Setup logger 63 self._setup_logger() 64 65 # Setup routes 66 self._setup_routes() 67 68 self.logger.info( 69 f"TestStaticFileServer initialized. Content dir: {self.content_dir}" 70 ) 71 72 def _setup_logger(self): 73 """Sets up a dedicated logger for the TestStaticFileServer.""" 74 self.logger = logging.getLogger("TestStaticFileServer") 75 self.logger.setLevel(logging.DEBUG) 76 self.logger.propagate = False 77 78 # Remove existing handlers 79 for handler in self.logger.handlers[:]: 80 self.logger.removeHandler(handler) 81 82 # Add file handler 83 log_file_path = os.path.join(os.getcwd(), "test_static_file_server.log") 84 file_handler = logging.FileHandler(log_file_path, mode="a") 85 file_handler.setFormatter( 86 logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 87 ) 88 self.logger.addHandler(file_handler) 89 self.logger.info( 90 f"TestStaticFileServer logger initialized. Logging to: {log_file_path}" 91 ) 92 93 def _setup_routes(self): 94 """Sets up FastAPI routes.""" 95 96 @self._app.get("/health") 97 async def health_check(): 98 """Health check endpoint.""" 99 return JSONResponse({"status": "ok"}) 100 101 @self._app.post("/{path:path}") 102 async def handle_post(path: str, request: Request): 103 """Handles POST requests.""" 104 # Capture request 105 body = await request.body() 106 self.captured_requests.append({ 107 "path": f"/{path}", 108 "method": "POST", 109 "headers": dict(request.headers), 110 "body": body.decode("utf-8", errors="replace"), 111 "timestamp": time.time(), 112 }) 113 114 self.logger.debug(f"POST request for: /{path}") 115 116 # Check for configured response 117 with self._response_lock: 118 if f"/{path}" in self._configured_responses: 119 config = self._configured_responses[f"/{path}"] 120 self.logger.info( 121 f"Serving configured POST response for /{path} " 122 f"(status: {config['status_code']})" 123 ) 124 125 return Response( 126 content=config["content"], 127 status_code=config["status_code"], 128 media_type=config.get("content_type", "application/json"), 129 ) 130 131 # Default POST response (simulating a typical REST API) 132 self.logger.info(f"Serving default POST response for /{path}") 133 return JSONResponse( 134 status_code=201, 135 content={ 136 "id": 101, 137 "created": True, 138 "message": "Resource created successfully" 139 } 140 ) 141 142 @self._app.get("/{filename:path}") 143 async def serve_file(filename: str, request: Request): 144 """Serves static files or configured responses.""" 145 # Capture request 146 self.captured_requests.append({ 147 "path": f"/{filename}", 148 "method": request.method, 149 "headers": dict(request.headers), 150 "timestamp": time.time(), 151 }) 152 153 self.logger.debug(f"Request for: /{filename}") 154 155 # Check for configured response 156 with self._response_lock: 157 if f"/{filename}" in self._configured_responses: 158 config = self._configured_responses[f"/{filename}"] 159 self.logger.info( 160 f"Serving configured response for /{filename} " 161 f"(status: {config['status_code']})" 162 ) 163 164 return Response( 165 content=config["content"], 166 status_code=config["status_code"], 167 media_type=config.get("content_type", "application/octet-stream"), 168 ) 169 170 # Serve static file 171 file_path = self.content_dir / filename 172 173 if not file_path.exists(): 174 self.logger.warning(f"File not found: {file_path}") 175 return JSONResponse( 176 status_code=404, 177 content={"error": "File not found", "path": filename} 178 ) 179 180 # Determine content type from extension 181 content_type = self._get_content_type(file_path) 182 183 self.logger.info( 184 f"Serving file: {filename} (type: {content_type})" 185 ) 186 187 return FileResponse( 188 path=file_path, 189 media_type=content_type, 190 ) 191 192 def _get_content_type(self, file_path: Path) -> str: 193 """Determines content type from file extension.""" 194 extension = file_path.suffix.lower() 195 196 content_types = { 197 ".json": "application/json", 198 ".html": "text/html", 199 ".htm": "text/html", 200 ".txt": "text/plain", 201 ".xml": "application/xml", 202 ".csv": "text/csv", 203 ".png": "image/png", 204 ".jpg": "image/jpeg", 205 ".jpeg": "image/jpeg", 206 ".gif": "image/gif", 207 ".pdf": "application/pdf", 208 ".zip": "application/zip", 209 } 210 211 return content_types.get(extension, "application/octet-stream") 212 213 @property 214 def url(self) -> str: 215 """Returns the base URL of the running server.""" 216 return f"http://{self.host}:{self.port}" 217 218 @property 219 def started(self) -> bool: 220 """Checks if the uvicorn server instance is started.""" 221 return self._uvicorn_server is not None and self._uvicorn_server.started 222 223 def start(self): 224 """Starts the FastAPI server in a separate thread.""" 225 if self._server_thread is not None and self._server_thread.is_alive(): 226 self.logger.warning("TestStaticFileServer is already running.") 227 return 228 229 config = uvicorn.Config( 230 self._app, host=self.host, port=self.port, log_level="warning" 231 ) 232 self._uvicorn_server = uvicorn.Server(config) 233 234 async def async_serve_wrapper(): 235 """Coroutine to run the server's serve() method.""" 236 try: 237 if self._uvicorn_server: 238 await self._uvicorn_server.serve() 239 except asyncio.CancelledError: 240 self.logger.info("Server.serve() task was cancelled.") 241 except Exception as e: 242 self.logger.error(f"Error during server.serve(): {e}", exc_info=True) 243 244 def run_server_in_new_loop(): 245 """Target function for the server thread.""" 246 loop = asyncio.new_event_loop() 247 asyncio.set_event_loop(loop) 248 try: 249 loop.run_until_complete(async_serve_wrapper()) 250 except KeyboardInterrupt: 251 self.logger.info("KeyboardInterrupt in server thread.") 252 finally: 253 try: 254 all_tasks = asyncio.all_tasks(loop) 255 if all_tasks: 256 for task in all_tasks: 257 task.cancel() 258 loop.run_until_complete( 259 asyncio.gather(*all_tasks, return_exceptions=True) 260 ) 261 if hasattr(loop, "shutdown_asyncgens"): 262 loop.run_until_complete(loop.shutdown_asyncgens()) 263 except Exception as e: 264 self.logger.error( 265 f"Error during loop shutdown: {e}", exc_info=True 266 ) 267 finally: 268 loop.close() 269 self.logger.info("Event loop in server thread closed.") 270 271 self._server_thread = threading.Thread( 272 target=run_server_in_new_loop, daemon=True 273 ) 274 self._server_thread.start() 275 276 self.logger.info(f"TestStaticFileServer starting on {self.url}...") 277 278 def stop(self): 279 """Stops the FastAPI server.""" 280 if self._uvicorn_server: 281 self._uvicorn_server.should_exit = True 282 283 if self._server_thread and self._server_thread.is_alive(): 284 self.logger.info("TestStaticFileServer stopping, joining thread...") 285 self._server_thread.join(timeout=5.0) 286 if self._server_thread.is_alive(): 287 self.logger.warning("Server thread did not exit cleanly.") 288 289 self._server_thread = None 290 self._uvicorn_server = None 291 self.logger.info("TestStaticFileServer stopped.") 292 293 def configure_response( 294 self, 295 path: str, 296 status_code: int, 297 content: bytes, 298 content_type: str = "application/octet-stream" 299 ) -> None: 300 """ 301 Configures a dynamic response for a specific path. 302 303 Args: 304 path: The path to configure (e.g., "/custom.json") 305 status_code: HTTP status code to return 306 content: Response content as bytes 307 content_type: MIME type of the content 308 """ 309 with self._response_lock: 310 self._configured_responses[path] = { 311 "status_code": status_code, 312 "content": content, 313 "content_type": content_type, 314 } 315 316 self.logger.info( 317 f"Configured response for {path}: status={status_code}, " 318 f"type={content_type}, size={len(content)} bytes" 319 ) 320 321 def configure_error_response(self, path: str, status_code: int) -> None: 322 """ 323 Configures an error response for a specific path. 324 325 Args: 326 path: The path to configure 327 status_code: HTTP error status code (e.g., 404, 500) 328 """ 329 error_content = { 330 "error": f"HTTP {status_code}", 331 "path": path, 332 } 333 334 self.configure_response( 335 path=path, 336 status_code=status_code, 337 content=str(error_content).encode("utf-8"), 338 content_type="application/json" 339 ) 340 341 def clear_configured_responses(self) -> None: 342 """Clears all configured responses.""" 343 with self._response_lock: 344 count = len(self._configured_responses) 345 self._configured_responses.clear() 346 347 if count > 0: 348 self.logger.debug(f"Cleared {count} configured responses.") 349 350 def get_file_url(self, filename: str) -> str: 351 """ 352 Returns the full URL for a file. 353 354 Args: 355 filename: Name of the file (e.g., "sample.json") 356 357 Returns: 358 Full URL to the file 359 """ 360 return f"{self.url}/{filename}" 361 362 def get_captured_requests(self) -> List[Dict[str, Any]]: 363 """Returns all captured requests.""" 364 return self.captured_requests.copy() 365 366 def clear_captured_requests(self) -> None: 367 """Clears captured requests.""" 368 count = len(self.captured_requests) 369 self.captured_requests.clear() 370 371 if count > 0: 372 self.logger.debug(f"Cleared {count} captured requests.")