app_base.py
1 """Base App class for all SAM applications with broker and database health checks.""" 2 3 import importlib 4 import logging 5 from concurrent.futures import ThreadPoolExecutor 6 from concurrent.futures import TimeoutError as FuturesTimeoutError 7 8 from solace_ai_connector.common.messaging.solace_messaging import ConnectionStatus 9 from solace_ai_connector.common.monitoring import Monitoring 10 from solace_ai_connector.flow.app import App 11 from sqlalchemy import text 12 13 log = logging.getLogger(__name__) 14 15 # Default timeout for database health checks (in seconds) 16 DB_HEALTH_CHECK_TIMEOUT_SECONDS = 5.0 17 18 # Config keys for custom health checks 19 CUSTOM_STARTUP_CHECK_KEY = "custom_startup_check" 20 CUSTOM_READY_CHECK_KEY = "custom_ready_check" 21 22 23 class SamAppBase(App): 24 """ 25 Base class for all SAM applications. 26 27 Extends solace-ai-connector's App class with broker connection and database 28 health checks for the is_startup_complete() and is_ready() methods. 29 30 When using dev_mode (DevBroker), broker health checks always return True since 31 the DevBroker doesn't have real connection issues to monitor. 32 33 When using a real Solace broker, health checks return True only when 34 the broker connection status is CONNECTED. 35 36 When using SQL-based session services, health checks also verify database 37 connectivity by testing the connection to each configured database. 38 """ 39 40 def _is_dev_mode(self) -> bool: 41 """ 42 Check if the broker is configured in dev mode. 43 44 Returns: 45 True if dev_mode is enabled or no broker config exists, False otherwise. 46 """ 47 broker_config = self.app_info.get("broker") 48 if broker_config is None or broker_config == {}: 49 return True # No config means assume dev mode for safety 50 51 dev_mode = broker_config.get("dev_mode", False) 52 53 # Handle boolean 54 if isinstance(dev_mode, bool): 55 return dev_mode 56 57 # Handle string "true" (case insensitive) 58 if isinstance(dev_mode, str): 59 return dev_mode.lower() == "true" 60 61 return False 62 63 def _get_db_health_check_timeout(self) -> float: 64 """ 65 Get the database health check timeout from configuration. 66 67 Reads from app_info['health_check']['database_timeout_seconds']. 68 Falls back to DB_HEALTH_CHECK_TIMEOUT_SECONDS if not configured. 69 70 Returns: 71 Timeout in seconds for database health checks. 72 """ 73 health_check_config = self.app_info.get("health_check", {}) 74 timeout = health_check_config.get( 75 "database_timeout_seconds", DB_HEALTH_CHECK_TIMEOUT_SECONDS 76 ) 77 78 # Ensure we have a valid positive number 79 if timeout is None: 80 return DB_HEALTH_CHECK_TIMEOUT_SECONDS 81 82 try: 83 timeout = float(timeout) 84 if timeout <= 0: 85 log.warning( 86 "Invalid database_timeout_seconds value: %s, using default: %s", 87 timeout, 88 DB_HEALTH_CHECK_TIMEOUT_SECONDS, 89 ) 90 return DB_HEALTH_CHECK_TIMEOUT_SECONDS 91 return timeout 92 except (TypeError, ValueError): 93 log.warning( 94 "Invalid database_timeout_seconds value: %s, using default: %s", 95 timeout, 96 DB_HEALTH_CHECK_TIMEOUT_SECONDS, 97 ) 98 return DB_HEALTH_CHECK_TIMEOUT_SECONDS 99 100 def _is_broker_connected(self) -> bool: 101 """ 102 Check if the broker connection is healthy. 103 104 When using dev_mode, this always returns True since the DevBroker 105 doesn't have real connection state to check. 106 107 When using a real Solace broker, this checks the Monitoring singleton's 108 connection status and returns True only if CONNECTED. 109 110 Returns: 111 True if broker is connected (or in dev_mode), False otherwise. 112 """ 113 # Dev mode always returns True 114 if self._is_dev_mode(): 115 log.debug("Broker health check: dev_mode enabled, returning True") 116 return True 117 118 # For real broker, check the Monitoring singleton's connection status 119 monitoring = Monitoring() 120 status = monitoring.get_connection_status() 121 122 is_connected = status == ConnectionStatus.CONNECTED 123 124 if not is_connected: 125 log.debug( 126 "Broker health check: connection status is %s, returning False", 127 status, 128 ) 129 130 return is_connected 131 132 def _get_db_engines_from_components(self) -> list: 133 """ 134 Collect database engines from all components. 135 136 Traverses flows and component groups to find components with: 137 - get_db_engine() method (Gateway/Platform pattern) 138 - session_service.db_engine attribute (Agent pattern) 139 140 Returns: 141 List of SQLAlchemy Engine objects. 142 """ 143 engines = [] 144 145 if not hasattr(self, "flows") or not self.flows: 146 return engines 147 148 for flow in self.flows: 149 if not hasattr(flow, "component_groups") or not flow.component_groups: 150 continue 151 152 for group in flow.component_groups: 153 for wrapper in group: 154 # Get the actual component from wrapper if needed 155 component = getattr(wrapper, "component", wrapper) 156 157 # Check for get_db_engine() method (Gateway/Platform pattern) 158 if hasattr(component, "get_db_engine") and callable( 159 component.get_db_engine 160 ): 161 engine = component.get_db_engine() 162 if engine is not None: 163 engines.append(engine) 164 # Check for session_service.db_engine (Agent pattern) 165 elif hasattr(component, "session_service"): 166 session_svc = component.session_service 167 if hasattr(session_svc, "db_engine") and session_svc.db_engine: 168 engines.append(session_svc.db_engine) 169 170 return engines 171 172 def _test_single_db_connection(self, engine) -> bool: 173 """ 174 Test a single database connection. 175 176 Args: 177 engine: SQLAlchemy engine to test 178 179 Returns: 180 True if connection successful, False otherwise. 181 """ 182 with engine.connect() as conn: 183 conn.execute(text("SELECT 1")) 184 return True 185 186 def _get_health_check_executor(self) -> ThreadPoolExecutor: 187 """ 188 Get or create a shared ThreadPoolExecutor for health check operations. 189 190 Reuses a single executor instance to avoid thread and memory leaks 191 from creating new ThreadPoolExecutor instances on every health check 192 call. 193 194 Returns: 195 A shared ThreadPoolExecutor with max_workers=2. 196 """ 197 if not hasattr(self, "_health_check_executor") or self._health_check_executor is None: 198 self._health_check_executor = ThreadPoolExecutor( 199 max_workers=2, thread_name_prefix="db-health-check" 200 ) 201 return self._health_check_executor 202 203 def _is_database_connected( 204 self, timeout: float = DB_HEALTH_CHECK_TIMEOUT_SECONDS 205 ) -> bool: 206 """ 207 Check if all configured databases are connected. 208 209 Collects database engines from components and tests each connection 210 by executing a simple query with a timeout. Returns True only if ALL 211 databases are reachable within the timeout period. 212 213 If no databases are configured, returns True. 214 215 Uses a shared ThreadPoolExecutor to avoid thread/memory leaks from 216 creating new executors on every health check call. 217 218 Args: 219 timeout: Maximum time in seconds to wait for each database connection 220 test. Defaults to DB_HEALTH_CHECK_TIMEOUT_SECONDS (5 seconds). 221 222 Returns: 223 True if all databases are connected (or none configured), False otherwise. 224 """ 225 engines = self._get_db_engines_from_components() 226 227 if not engines: 228 return True 229 230 executor = self._get_health_check_executor() 231 232 for engine in engines: 233 try: 234 future = executor.submit(self._test_single_db_connection, engine) 235 future.result(timeout=timeout) 236 except FuturesTimeoutError: 237 log.warning( 238 "Database health check failed: timed out after %.1f seconds", 239 timeout, 240 ) 241 return False 242 except Exception as e: 243 log.warning( 244 "Database health check failed: %s", 245 str(e), 246 ) 247 return False 248 249 return True 250 251 def _load_custom_check(self, check_path: str): 252 """ 253 Load a custom health check callable from a module path. 254 255 Args: 256 check_path: Path in format "module.path:function_name" 257 258 Returns: 259 The callable function, or None if loading fails. 260 """ 261 # Use cached callable if available 262 if not hasattr(self, "_custom_check_cache"): 263 self._custom_check_cache = {} 264 265 if check_path in self._custom_check_cache: 266 return self._custom_check_cache[check_path] 267 268 try: 269 if ":" not in check_path: 270 log.error( 271 "Invalid custom health check path '%s': " 272 "expected format 'module.path:function_name'", 273 check_path, 274 ) 275 return None 276 277 module_path, function_name = check_path.rsplit(":", 1) 278 module = importlib.import_module(module_path) 279 func = getattr(module, function_name) 280 281 if not callable(func): 282 log.error( 283 "Custom health check '%s' is not callable", 284 check_path, 285 ) 286 return None 287 288 # Cache the callable 289 self._custom_check_cache[check_path] = func 290 log.info("Loaded custom health check: %s", check_path) 291 return func 292 293 except ImportError as e: 294 log.error( 295 "Failed to import custom health check module '%s': %s", 296 check_path, 297 e, 298 ) 299 return None 300 except AttributeError as e: 301 log.error( 302 "Custom health check function not found '%s': %s", 303 check_path, 304 e, 305 ) 306 return None 307 308 def _run_custom_check(self, check_key: str) -> bool: 309 """ 310 Run a custom health check if configured. 311 312 The custom health check function receives the application instance, 313 allowing it to access app_info, flows, and other application state. 314 315 Args: 316 check_key: The config key for the custom check 317 (CUSTOM_STARTUP_CHECK_KEY or CUSTOM_READY_CHECK_KEY) 318 319 Returns: 320 True if check passes or no custom check configured, False if check fails. 321 """ 322 health_check_config = self.app_info.get("health_check", {}) 323 check_path = health_check_config.get(check_key) 324 325 if not check_path: 326 return True # No custom check configured 327 328 func = self._load_custom_check(check_path) 329 if func is None: 330 # Loading failed - treat as unhealthy 331 return False 332 333 try: 334 result = func(self) 335 336 if not isinstance(result, bool): 337 log.warning( 338 "Custom health check '%s' returned non-boolean value: %s, " 339 "treating as unhealthy", 340 check_path, 341 result, 342 ) 343 return False 344 345 if not result: 346 log.warning("Custom health check '%s' returned False", check_path) 347 348 return result 349 350 except Exception as e: 351 log.error( 352 "Custom health check '%s' raised exception: %s", 353 check_path, 354 e, 355 ) 356 return False 357 358 def is_startup_complete(self) -> bool: 359 """ 360 Check if the app has completed its startup/initialization phase. 361 362 Returns True if: 363 - Broker is connected (or using dev_mode) 364 - All configured databases are connected 365 - Custom startup check passes (if configured) 366 367 Returns False if: 368 - Broker is DISCONNECTED or RECONNECTING 369 - Any configured database is unreachable 370 - Custom startup check returns False or raises an exception 371 372 Returns: 373 bool: True if startup is complete, False if still initializing 374 """ 375 timeout = self._get_db_health_check_timeout() 376 return ( 377 self._is_broker_connected() 378 and self._is_database_connected(timeout) 379 and self._run_custom_check(CUSTOM_STARTUP_CHECK_KEY) 380 ) 381 382 def is_ready(self) -> bool: 383 """ 384 Check if the app is ready to process messages. 385 386 Returns True if: 387 - Broker is connected (or using dev_mode) 388 - All configured databases are connected 389 - Custom ready check passes (if configured) 390 391 Returns False if: 392 - Broker is DISCONNECTED or RECONNECTING 393 - Any configured database is unreachable 394 - Custom ready check returns False or raises an exception 395 396 Returns: 397 bool: True if the app is ready, False otherwise 398 """ 399 timeout = self._get_db_health_check_timeout() 400 return ( 401 self._is_broker_connected() 402 and self._is_database_connected(timeout) 403 and self._run_custom_check(CUSTOM_READY_CHECK_KEY) 404 )