/ api / health_router.py
health_router.py
  1  """
  2  Router for API health checking
  3  ----------------------------------------------
  4  This module implements routes to check the status of the API and associated services.
  5  """
  6  
  7  import os
  8  import time
  9  import platform
 10  import logging
 11  import psutil
 12  from typing import Dict, Any, Optional
 13  
 14  import torch
 15  from fastapi import APIRouter, Depends, Request, Response, status
 16  from pydantic import BaseModel
 17  
 18  # Logging configuration
 19  logger = logging.getLogger("api.health")
 20  
 21  # Create router
 22  health_router = APIRouter(
 23      prefix="/health",
 24      tags=["Health"],
 25  )
 26  
 27  # Pydantic models
 28  class HealthResponse(BaseModel):
 29      """Model for health check response"""
 30      status: str
 31      version: str
 32      timestamp: float
 33      uptime: float
 34  
 35  class HealthDetailedResponse(HealthResponse):
 36      """Model for detailed health check response"""
 37      environment: str
 38      system_info: Dict[str, Any]
 39      resources: Dict[str, Any]
 40      gpu_info: Optional[Dict[str, Any]] = None
 41      services: Dict[str, Any]
 42  
 43  # Global variable for start time
 44  start_time = time.time()
 45  
 46  @health_router.get("", response_model=HealthResponse)
 47  async def health_check():
 48      """Simple API health check"""
 49      return {
 50          "status": "ok",
 51          "version": os.getenv("VERSION", "1.0.0"),
 52          "timestamp": time.time(),
 53          "uptime": time.time() - start_time
 54      }
 55  
 56  @health_router.get("/detailed", response_model=HealthDetailedResponse)
 57  async def detailed_health_check():
 58      """Detailed health check of the API and system resources"""
 59      # System information
 60      system_info = {
 61          "platform": platform.platform(),
 62          "python_version": platform.python_version(),
 63          "processor": platform.processor() or "Not available",
 64          "hostname": platform.node()
 65      }
 66      
 67      # Resource information
 68      cpu_percent = psutil.cpu_percent(interval=0.1)
 69      memory = psutil.virtual_memory()
 70      disk = psutil.disk_usage('/')
 71      
 72      resources = {
 73          "cpu_percent": cpu_percent,
 74          "memory_total": memory.total,
 75          "memory_available": memory.available,
 76          "memory_percent": memory.percent,
 77          "disk_total": disk.total,
 78          "disk_free": disk.free,
 79          "disk_percent": disk.percent
 80      }
 81      
 82      # GPU information if available
 83      gpu_info = None
 84      try:
 85          if torch.cuda.is_available():
 86              gpu_info = {
 87                  "available": True,
 88                  "device_count": torch.cuda.device_count(),
 89                  "current_device": torch.cuda.current_device(),
 90                  "devices": []
 91              }
 92              
 93              for i in range(torch.cuda.device_count()):
 94                  device_info = {
 95                      "name": torch.cuda.get_device_name(i),
 96                      "capability": torch.cuda.get_device_capability(i),
 97                      "properties": {
 98                          "total_memory": torch.cuda.get_device_properties(i).total_memory,
 99                      }
100                  }
101                  gpu_info["devices"].append(device_info)
102                  
103                  # Add usage statistics if available
104                  try:
105                      import pynvml
106                      pynvml.nvmlInit()
107                      handle = pynvml.nvmlDeviceGetHandleByIndex(i)
108                      utilization = pynvml.nvmlDeviceGetUtilizationRates(handle)
109                      memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
110                      
111                      device_info["utilization"] = {
112                          "gpu_util": utilization.gpu,
113                          "memory_util": utilization.memory,
114                          "memory_used": memory_info.used,
115                          "memory_free": memory_info.free
116                      }
117                  except (ImportError, Exception) as e:
118                      logger.debug(f"Unable to get detailed GPU statistics: {str(e)}")
119      except Exception as e:
120          logger.debug(f"Error retrieving GPU information: {str(e)}")
121      
122      # Services status
123      services = check_services()
124      
125      return {
126          "status": "ok" if all(s["status"] == "ok" for s in services.values()) else "degraded",
127          "version": os.getenv("VERSION", "1.0.0"),
128          "timestamp": time.time(),
129          "uptime": time.time() - start_time,
130          "environment": os.getenv("ENVIRONMENT", "development"),
131          "system_info": system_info,
132          "resources": resources,
133          "gpu_info": gpu_info,
134          "services": services
135      }
136  
137  def check_services() -> Dict[str, Any]:
138      """Checks the status of dependent services"""
139      services = {}
140      
141      # Database check
142      try:
143          from database import get_db_connection
144          db = get_db_connection()
145          # Execute a simple query to check the connection
146          db.command("ping")
147          services["database"] = {
148              "status": "ok",
149              "message": "Database connection established"
150          }
151      except Exception as e:
152          logger.warning(f"Database connection error: {str(e)}")
153          services["database"] = {
154              "status": "error",
155              "message": f"Connection error: {str(e)}"
156          }
157      
158      # Models check
159      try:
160          from model_manager import ModelManager
161          manager = ModelManager.get_instance()
162          services["models"] = {
163              "status": "ok",
164              "message": "Model manager initialized",
165              "loaded_models": len(manager.loaded_models)
166          }
167      except Exception as e:
168          logger.warning(f"Problem with model manager: {str(e)}")
169          services["models"] = {
170              "status": "warning",
171              "message": f"Warning: {str(e)}"
172          }
173      
174      # Filesystem check
175      upload_dir = os.path.join(os.getcwd(), "uploads")
176      results_dir = os.path.join(os.getcwd(), "results")
177      
178      if not os.path.exists(upload_dir) or not os.access(upload_dir, os.W_OK):
179          services["filesystem"] = {
180              "status": "error",
181              "message": "Upload directory not writable"
182          }
183      elif not os.path.exists(results_dir) or not os.access(results_dir, os.W_OK):
184          services["filesystem"] = {
185              "status": "error",
186              "message": "Results directory not writable"
187          }
188      else:
189          services["filesystem"] = {
190              "status": "ok",
191              "message": "Filesystem is writable"
192          }
193      
194      return services
195  
196  @health_router.get("/ping")
197  async def ping():
198      """Simple ping endpoint to check if the API is responding"""
199      return {"ping": "pong"}
200  
201  @health_router.get("/ready")
202  async def readiness_probe(response: Response):
203      """
204      Readiness probe for Kubernetes or other orchestrators
205      Checks if the API is ready to receive requests
206      """
207      services = check_services()
208      
209      # If a critical service is in error, the API is not ready
210      critical_services = ["database", "filesystem"]
211      for service_name in critical_services:
212          if service_name in services and services[service_name]["status"] == "error":
213              response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
214              return {"status": "not_ready", "reason": services[service_name]["message"]}
215      
216      return {"status": "ready"}
217  
218  @health_router.get("/live")
219  async def liveness_probe():
220      """
221      Liveness probe for Kubernetes or other orchestrators
222      Checks if the API is alive
223      """
224      return {"status": "alive"}