health_router.py
1 """ 2 Router for API health checking 3 ---------------------------------------------- 4 This module implements routes to check the status of the API and associated services. 5 """ 6 7 import os 8 import time 9 import platform 10 import logging 11 import psutil 12 from typing import Dict, Any, Optional 13 14 import torch 15 from fastapi import APIRouter, Depends, Request, Response, status 16 from pydantic import BaseModel 17 18 # Logging configuration 19 logger = logging.getLogger("api.health") 20 21 # Create router 22 health_router = APIRouter( 23 prefix="/health", 24 tags=["Health"], 25 ) 26 27 # Pydantic models 28 class HealthResponse(BaseModel): 29 """Model for health check response""" 30 status: str 31 version: str 32 timestamp: float 33 uptime: float 34 35 class HealthDetailedResponse(HealthResponse): 36 """Model for detailed health check response""" 37 environment: str 38 system_info: Dict[str, Any] 39 resources: Dict[str, Any] 40 gpu_info: Optional[Dict[str, Any]] = None 41 services: Dict[str, Any] 42 43 # Global variable for start time 44 start_time = time.time() 45 46 @health_router.get("", response_model=HealthResponse) 47 async def health_check(): 48 """Simple API health check""" 49 return { 50 "status": "ok", 51 "version": os.getenv("VERSION", "1.0.0"), 52 "timestamp": time.time(), 53 "uptime": time.time() - start_time 54 } 55 56 @health_router.get("/detailed", response_model=HealthDetailedResponse) 57 async def detailed_health_check(): 58 """Detailed health check of the API and system resources""" 59 # System information 60 system_info = { 61 "platform": platform.platform(), 62 "python_version": platform.python_version(), 63 "processor": platform.processor() or "Not available", 64 "hostname": platform.node() 65 } 66 67 # Resource information 68 cpu_percent = psutil.cpu_percent(interval=0.1) 69 memory = psutil.virtual_memory() 70 disk = psutil.disk_usage('/') 71 72 resources = { 73 "cpu_percent": cpu_percent, 74 "memory_total": memory.total, 75 "memory_available": memory.available, 76 "memory_percent": memory.percent, 77 "disk_total": disk.total, 78 "disk_free": disk.free, 79 "disk_percent": disk.percent 80 } 81 82 # GPU information if available 83 gpu_info = None 84 try: 85 if torch.cuda.is_available(): 86 gpu_info = { 87 "available": True, 88 "device_count": torch.cuda.device_count(), 89 "current_device": torch.cuda.current_device(), 90 "devices": [] 91 } 92 93 for i in range(torch.cuda.device_count()): 94 device_info = { 95 "name": torch.cuda.get_device_name(i), 96 "capability": torch.cuda.get_device_capability(i), 97 "properties": { 98 "total_memory": torch.cuda.get_device_properties(i).total_memory, 99 } 100 } 101 gpu_info["devices"].append(device_info) 102 103 # Add usage statistics if available 104 try: 105 import pynvml 106 pynvml.nvmlInit() 107 handle = pynvml.nvmlDeviceGetHandleByIndex(i) 108 utilization = pynvml.nvmlDeviceGetUtilizationRates(handle) 109 memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle) 110 111 device_info["utilization"] = { 112 "gpu_util": utilization.gpu, 113 "memory_util": utilization.memory, 114 "memory_used": memory_info.used, 115 "memory_free": memory_info.free 116 } 117 except (ImportError, Exception) as e: 118 logger.debug(f"Unable to get detailed GPU statistics: {str(e)}") 119 except Exception as e: 120 logger.debug(f"Error retrieving GPU information: {str(e)}") 121 122 # Services status 123 services = check_services() 124 125 return { 126 "status": "ok" if all(s["status"] == "ok" for s in services.values()) else "degraded", 127 "version": os.getenv("VERSION", "1.0.0"), 128 "timestamp": time.time(), 129 "uptime": time.time() - start_time, 130 "environment": os.getenv("ENVIRONMENT", "development"), 131 "system_info": system_info, 132 "resources": resources, 133 "gpu_info": gpu_info, 134 "services": services 135 } 136 137 def check_services() -> Dict[str, Any]: 138 """Checks the status of dependent services""" 139 services = {} 140 141 # Database check 142 try: 143 from database import get_db_connection 144 db = get_db_connection() 145 # Execute a simple query to check the connection 146 db.command("ping") 147 services["database"] = { 148 "status": "ok", 149 "message": "Database connection established" 150 } 151 except Exception as e: 152 logger.warning(f"Database connection error: {str(e)}") 153 services["database"] = { 154 "status": "error", 155 "message": f"Connection error: {str(e)}" 156 } 157 158 # Models check 159 try: 160 from model_manager import ModelManager 161 manager = ModelManager.get_instance() 162 services["models"] = { 163 "status": "ok", 164 "message": "Model manager initialized", 165 "loaded_models": len(manager.loaded_models) 166 } 167 except Exception as e: 168 logger.warning(f"Problem with model manager: {str(e)}") 169 services["models"] = { 170 "status": "warning", 171 "message": f"Warning: {str(e)}" 172 } 173 174 # Filesystem check 175 upload_dir = os.path.join(os.getcwd(), "uploads") 176 results_dir = os.path.join(os.getcwd(), "results") 177 178 if not os.path.exists(upload_dir) or not os.access(upload_dir, os.W_OK): 179 services["filesystem"] = { 180 "status": "error", 181 "message": "Upload directory not writable" 182 } 183 elif not os.path.exists(results_dir) or not os.access(results_dir, os.W_OK): 184 services["filesystem"] = { 185 "status": "error", 186 "message": "Results directory not writable" 187 } 188 else: 189 services["filesystem"] = { 190 "status": "ok", 191 "message": "Filesystem is writable" 192 } 193 194 return services 195 196 @health_router.get("/ping") 197 async def ping(): 198 """Simple ping endpoint to check if the API is responding""" 199 return {"ping": "pong"} 200 201 @health_router.get("/ready") 202 async def readiness_probe(response: Response): 203 """ 204 Readiness probe for Kubernetes or other orchestrators 205 Checks if the API is ready to receive requests 206 """ 207 services = check_services() 208 209 # If a critical service is in error, the API is not ready 210 critical_services = ["database", "filesystem"] 211 for service_name in critical_services: 212 if service_name in services and services[service_name]["status"] == "error": 213 response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE 214 return {"status": "not_ready", "reason": services[service_name]["message"]} 215 216 return {"status": "ready"} 217 218 @health_router.get("/live") 219 async def liveness_probe(): 220 """ 221 Liveness probe for Kubernetes or other orchestrators 222 Checks if the API is alive 223 """ 224 return {"status": "alive"}