server.py
  1  """
  2  Test Static File Server for integration testing of web_request tool.
  3  Serves static test files and supports dynamic response configuration.
  4  """
  5  
  6  import asyncio
  7  import logging
  8  import os
  9  import threading
 10  import time
 11  from pathlib import Path
 12  from typing import Any, Dict, List, Optional
 13  
 14  import uvicorn
 15  from fastapi import FastAPI, Request
 16  from fastapi.responses import Response, JSONResponse
 17  from starlette.responses import FileResponse
 18  
 19  
 20  class TestStaticFileServer:
 21      """
 22      A lightweight HTTP server for serving static test files during integration tests.
 23      
 24      Provides:
 25      - Static file serving from a test data directory
 26      - Dynamic response configuration for testing edge cases
 27      - Request capture for test assertions
 28      - Health check endpoint
 29      """
 30  
 31      def __init__(
 32          self, 
 33          host: str = "127.0.0.1", 
 34          port: int = 8089,
 35          content_dir: Optional[str] = None
 36      ):
 37          self.host = host
 38          self.port = port
 39          
 40          # Determine content directory
 41          if content_dir is None:
 42              # Default to test_data/web_content relative to this file
 43              pkg_dir = Path(__file__).parent.parent
 44              self.content_dir = pkg_dir / "test_data" / "web_content"
 45          else:
 46              self.content_dir = Path(content_dir)
 47          
 48          self.content_dir.mkdir(parents=True, exist_ok=True)
 49          
 50          # Server state
 51          self._uvicorn_server: Optional[uvicorn.Server] = None
 52          self._server_thread: Optional[threading.Thread] = None
 53          self._app = FastAPI()
 54          
 55          # Request capture
 56          self.captured_requests: List[Dict[str, Any]] = []
 57          
 58          # Dynamic response configuration
 59          self._configured_responses: Dict[str, Dict[str, Any]] = {}
 60          self._response_lock = threading.Lock()
 61          
 62          # Setup logger
 63          self._setup_logger()
 64          
 65          # Setup routes
 66          self._setup_routes()
 67          
 68          self.logger.info(
 69              f"TestStaticFileServer initialized. Content dir: {self.content_dir}"
 70          )
 71  
 72      def _setup_logger(self):
 73          """Sets up a dedicated logger for the TestStaticFileServer."""
 74          self.logger = logging.getLogger("TestStaticFileServer")
 75          self.logger.setLevel(logging.DEBUG)
 76          self.logger.propagate = False
 77  
 78          # Remove existing handlers
 79          for handler in self.logger.handlers[:]:
 80              self.logger.removeHandler(handler)
 81  
 82          # Add file handler
 83          log_file_path = os.path.join(os.getcwd(), "test_static_file_server.log")
 84          file_handler = logging.FileHandler(log_file_path, mode="a")
 85          file_handler.setFormatter(
 86              logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 87          )
 88          self.logger.addHandler(file_handler)
 89          self.logger.info(
 90              f"TestStaticFileServer logger initialized. Logging to: {log_file_path}"
 91          )
 92  
 93      def _setup_routes(self):
 94          """Sets up FastAPI routes."""
 95          
 96          @self._app.get("/health")
 97          async def health_check():
 98              """Health check endpoint."""
 99              return JSONResponse({"status": "ok"})
100          
101          @self._app.post("/{path:path}")
102          async def handle_post(path: str, request: Request):
103              """Handles POST requests."""
104              # Capture request
105              body = await request.body()
106              self.captured_requests.append({
107                  "path": f"/{path}",
108                  "method": "POST",
109                  "headers": dict(request.headers),
110                  "body": body.decode("utf-8", errors="replace"),
111                  "timestamp": time.time(),
112              })
113              
114              self.logger.debug(f"POST request for: /{path}")
115              
116              # Check for configured response
117              with self._response_lock:
118                  if f"/{path}" in self._configured_responses:
119                      config = self._configured_responses[f"/{path}"]
120                      self.logger.info(
121                          f"Serving configured POST response for /{path} "
122                          f"(status: {config['status_code']})"
123                      )
124                      
125                      return Response(
126                          content=config["content"],
127                          status_code=config["status_code"],
128                          media_type=config.get("content_type", "application/json"),
129                      )
130              
131              # Default POST response (simulating a typical REST API)
132              self.logger.info(f"Serving default POST response for /{path}")
133              return JSONResponse(
134                  status_code=201,
135                  content={
136                      "id": 101,
137                      "created": True,
138                      "message": "Resource created successfully"
139                  }
140              )
141          
142          @self._app.get("/{filename:path}")
143          async def serve_file(filename: str, request: Request):
144              """Serves static files or configured responses."""
145              # Capture request
146              self.captured_requests.append({
147                  "path": f"/{filename}",
148                  "method": request.method,
149                  "headers": dict(request.headers),
150                  "timestamp": time.time(),
151              })
152              
153              self.logger.debug(f"Request for: /{filename}")
154              
155              # Check for configured response
156              with self._response_lock:
157                  if f"/{filename}" in self._configured_responses:
158                      config = self._configured_responses[f"/{filename}"]
159                      self.logger.info(
160                          f"Serving configured response for /{filename} "
161                          f"(status: {config['status_code']})"
162                      )
163                      
164                      return Response(
165                          content=config["content"],
166                          status_code=config["status_code"],
167                          media_type=config.get("content_type", "application/octet-stream"),
168                      )
169              
170              # Serve static file
171              file_path = self.content_dir / filename
172              
173              if not file_path.exists():
174                  self.logger.warning(f"File not found: {file_path}")
175                  return JSONResponse(
176                      status_code=404,
177                      content={"error": "File not found", "path": filename}
178                  )
179              
180              # Determine content type from extension
181              content_type = self._get_content_type(file_path)
182              
183              self.logger.info(
184                  f"Serving file: {filename} (type: {content_type})"
185              )
186              
187              return FileResponse(
188                  path=file_path,
189                  media_type=content_type,
190              )
191  
192      def _get_content_type(self, file_path: Path) -> str:
193          """Determines content type from file extension."""
194          extension = file_path.suffix.lower()
195          
196          content_types = {
197              ".json": "application/json",
198              ".html": "text/html",
199              ".htm": "text/html",
200              ".txt": "text/plain",
201              ".xml": "application/xml",
202              ".csv": "text/csv",
203              ".png": "image/png",
204              ".jpg": "image/jpeg",
205              ".jpeg": "image/jpeg",
206              ".gif": "image/gif",
207              ".pdf": "application/pdf",
208              ".zip": "application/zip",
209          }
210          
211          return content_types.get(extension, "application/octet-stream")
212  
213      @property
214      def url(self) -> str:
215          """Returns the base URL of the running server."""
216          return f"http://{self.host}:{self.port}"
217  
218      @property
219      def started(self) -> bool:
220          """Checks if the uvicorn server instance is started."""
221          return self._uvicorn_server is not None and self._uvicorn_server.started
222  
223      def start(self):
224          """Starts the FastAPI server in a separate thread."""
225          if self._server_thread is not None and self._server_thread.is_alive():
226              self.logger.warning("TestStaticFileServer is already running.")
227              return
228  
229          config = uvicorn.Config(
230              self._app, host=self.host, port=self.port, log_level="warning"
231          )
232          self._uvicorn_server = uvicorn.Server(config)
233  
234          async def async_serve_wrapper():
235              """Coroutine to run the server's serve() method."""
236              try:
237                  if self._uvicorn_server:
238                      await self._uvicorn_server.serve()
239              except asyncio.CancelledError:
240                  self.logger.info("Server.serve() task was cancelled.")
241              except Exception as e:
242                  self.logger.error(f"Error during server.serve(): {e}", exc_info=True)
243  
244          def run_server_in_new_loop():
245              """Target function for the server thread."""
246              loop = asyncio.new_event_loop()
247              asyncio.set_event_loop(loop)
248              try:
249                  loop.run_until_complete(async_serve_wrapper())
250              except KeyboardInterrupt:
251                  self.logger.info("KeyboardInterrupt in server thread.")
252              finally:
253                  try:
254                      all_tasks = asyncio.all_tasks(loop)
255                      if all_tasks:
256                          for task in all_tasks:
257                              task.cancel()
258                          loop.run_until_complete(
259                              asyncio.gather(*all_tasks, return_exceptions=True)
260                          )
261                      if hasattr(loop, "shutdown_asyncgens"):
262                          loop.run_until_complete(loop.shutdown_asyncgens())
263                  except Exception as e:
264                      self.logger.error(
265                          f"Error during loop shutdown: {e}", exc_info=True
266                      )
267                  finally:
268                      loop.close()
269                      self.logger.info("Event loop in server thread closed.")
270  
271          self._server_thread = threading.Thread(
272              target=run_server_in_new_loop, daemon=True
273          )
274          self._server_thread.start()
275  
276          self.logger.info(f"TestStaticFileServer starting on {self.url}...")
277  
278      def stop(self):
279          """Stops the FastAPI server."""
280          if self._uvicorn_server:
281              self._uvicorn_server.should_exit = True
282  
283          if self._server_thread and self._server_thread.is_alive():
284              self.logger.info("TestStaticFileServer stopping, joining thread...")
285              self._server_thread.join(timeout=5.0)
286              if self._server_thread.is_alive():
287                  self.logger.warning("Server thread did not exit cleanly.")
288          
289          self._server_thread = None
290          self._uvicorn_server = None
291          self.logger.info("TestStaticFileServer stopped.")
292  
293      def configure_response(
294          self,
295          path: str,
296          status_code: int,
297          content: bytes,
298          content_type: str = "application/octet-stream"
299      ) -> None:
300          """
301          Configures a dynamic response for a specific path.
302          
303          Args:
304              path: The path to configure (e.g., "/custom.json")
305              status_code: HTTP status code to return
306              content: Response content as bytes
307              content_type: MIME type of the content
308          """
309          with self._response_lock:
310              self._configured_responses[path] = {
311                  "status_code": status_code,
312                  "content": content,
313                  "content_type": content_type,
314              }
315          
316          self.logger.info(
317              f"Configured response for {path}: status={status_code}, "
318              f"type={content_type}, size={len(content)} bytes"
319          )
320  
321      def configure_error_response(self, path: str, status_code: int) -> None:
322          """
323          Configures an error response for a specific path.
324          
325          Args:
326              path: The path to configure
327              status_code: HTTP error status code (e.g., 404, 500)
328          """
329          error_content = {
330              "error": f"HTTP {status_code}",
331              "path": path,
332          }
333          
334          self.configure_response(
335              path=path,
336              status_code=status_code,
337              content=str(error_content).encode("utf-8"),
338              content_type="application/json"
339          )
340  
341      def clear_configured_responses(self) -> None:
342          """Clears all configured responses."""
343          with self._response_lock:
344              count = len(self._configured_responses)
345              self._configured_responses.clear()
346          
347          if count > 0:
348              self.logger.debug(f"Cleared {count} configured responses.")
349  
350      def get_file_url(self, filename: str) -> str:
351          """
352          Returns the full URL for a file.
353          
354          Args:
355              filename: Name of the file (e.g., "sample.json")
356              
357          Returns:
358              Full URL to the file
359          """
360          return f"{self.url}/{filename}"
361  
362      def get_captured_requests(self) -> List[Dict[str, Any]]:
363          """Returns all captured requests."""
364          return self.captured_requests.copy()
365  
366      def clear_captured_requests(self) -> None:
367          """Clears captured requests."""
368          count = len(self.captured_requests)
369          self.captured_requests.clear()
370          
371          if count > 0:
372              self.logger.debug(f"Cleared {count} captured requests.")