/ src / solace_agent_mesh / common / app_base.py
app_base.py
  1  """Base App class for all SAM applications with broker and database health checks."""
  2  
  3  import importlib
  4  import logging
  5  from concurrent.futures import ThreadPoolExecutor
  6  from concurrent.futures import TimeoutError as FuturesTimeoutError
  7  
  8  from solace_ai_connector.common.messaging.solace_messaging import ConnectionStatus
  9  from solace_ai_connector.common.monitoring import Monitoring
 10  from solace_ai_connector.flow.app import App
 11  from sqlalchemy import text
 12  
 13  log = logging.getLogger(__name__)
 14  
 15  # Default timeout for database health checks (in seconds)
 16  DB_HEALTH_CHECK_TIMEOUT_SECONDS = 5.0
 17  
 18  # Config keys for custom health checks
 19  CUSTOM_STARTUP_CHECK_KEY = "custom_startup_check"
 20  CUSTOM_READY_CHECK_KEY = "custom_ready_check"
 21  
 22  
 23  class SamAppBase(App):
 24      """
 25      Base class for all SAM applications.
 26  
 27      Extends solace-ai-connector's App class with broker connection and database
 28      health checks for the is_startup_complete() and is_ready() methods.
 29  
 30      When using dev_mode (DevBroker), broker health checks always return True since
 31      the DevBroker doesn't have real connection issues to monitor.
 32  
 33      When using a real Solace broker, health checks return True only when
 34      the broker connection status is CONNECTED.
 35  
 36      When using SQL-based session services, health checks also verify database
 37      connectivity by testing the connection to each configured database.
 38      """
 39  
 40      def _is_dev_mode(self) -> bool:
 41          """
 42          Check if the broker is configured in dev mode.
 43  
 44          Returns:
 45              True if dev_mode is enabled or no broker config exists, False otherwise.
 46          """
 47          broker_config = self.app_info.get("broker")
 48          if broker_config is None or broker_config == {}:
 49              return True  # No config means assume dev mode for safety
 50  
 51          dev_mode = broker_config.get("dev_mode", False)
 52  
 53          # Handle boolean
 54          if isinstance(dev_mode, bool):
 55              return dev_mode
 56  
 57          # Handle string "true" (case insensitive)
 58          if isinstance(dev_mode, str):
 59              return dev_mode.lower() == "true"
 60  
 61          return False
 62  
 63      def _get_db_health_check_timeout(self) -> float:
 64          """
 65          Get the database health check timeout from configuration.
 66  
 67          Reads from app_info['health_check']['database_timeout_seconds'].
 68          Falls back to DB_HEALTH_CHECK_TIMEOUT_SECONDS if not configured.
 69  
 70          Returns:
 71              Timeout in seconds for database health checks.
 72          """
 73          health_check_config = self.app_info.get("health_check", {})
 74          timeout = health_check_config.get(
 75              "database_timeout_seconds", DB_HEALTH_CHECK_TIMEOUT_SECONDS
 76          )
 77  
 78          # Ensure we have a valid positive number
 79          if timeout is None:
 80              return DB_HEALTH_CHECK_TIMEOUT_SECONDS
 81  
 82          try:
 83              timeout = float(timeout)
 84              if timeout <= 0:
 85                  log.warning(
 86                      "Invalid database_timeout_seconds value: %s, using default: %s",
 87                      timeout,
 88                      DB_HEALTH_CHECK_TIMEOUT_SECONDS,
 89                  )
 90                  return DB_HEALTH_CHECK_TIMEOUT_SECONDS
 91              return timeout
 92          except (TypeError, ValueError):
 93              log.warning(
 94                  "Invalid database_timeout_seconds value: %s, using default: %s",
 95                  timeout,
 96                  DB_HEALTH_CHECK_TIMEOUT_SECONDS,
 97              )
 98              return DB_HEALTH_CHECK_TIMEOUT_SECONDS
 99  
100      def _is_broker_connected(self) -> bool:
101          """
102          Check if the broker connection is healthy.
103  
104          When using dev_mode, this always returns True since the DevBroker
105          doesn't have real connection state to check.
106  
107          When using a real Solace broker, this checks the Monitoring singleton's
108          connection status and returns True only if CONNECTED.
109  
110          Returns:
111              True if broker is connected (or in dev_mode), False otherwise.
112          """
113          # Dev mode always returns True
114          if self._is_dev_mode():
115              log.debug("Broker health check: dev_mode enabled, returning True")
116              return True
117  
118          # For real broker, check the Monitoring singleton's connection status
119          monitoring = Monitoring()
120          status = monitoring.get_connection_status()
121  
122          is_connected = status == ConnectionStatus.CONNECTED
123  
124          if not is_connected:
125              log.debug(
126                  "Broker health check: connection status is %s, returning False",
127                  status,
128              )
129  
130          return is_connected
131  
132      def _get_db_engines_from_components(self) -> list:
133          """
134          Collect database engines from all components.
135  
136          Traverses flows and component groups to find components with:
137          - get_db_engine() method (Gateway/Platform pattern)
138          - session_service.db_engine attribute (Agent pattern)
139  
140          Returns:
141              List of SQLAlchemy Engine objects.
142          """
143          engines = []
144  
145          if not hasattr(self, "flows") or not self.flows:
146              return engines
147  
148          for flow in self.flows:
149              if not hasattr(flow, "component_groups") or not flow.component_groups:
150                  continue
151  
152              for group in flow.component_groups:
153                  for wrapper in group:
154                      # Get the actual component from wrapper if needed
155                      component = getattr(wrapper, "component", wrapper)
156  
157                      # Check for get_db_engine() method (Gateway/Platform pattern)
158                      if hasattr(component, "get_db_engine") and callable(
159                          component.get_db_engine
160                      ):
161                          engine = component.get_db_engine()
162                          if engine is not None:
163                              engines.append(engine)
164                      # Check for session_service.db_engine (Agent pattern)
165                      elif hasattr(component, "session_service"):
166                          session_svc = component.session_service
167                          if hasattr(session_svc, "db_engine") and session_svc.db_engine:
168                              engines.append(session_svc.db_engine)
169  
170          return engines
171  
172      def _test_single_db_connection(self, engine) -> bool:
173          """
174          Test a single database connection.
175  
176          Args:
177              engine: SQLAlchemy engine to test
178  
179          Returns:
180              True if connection successful, False otherwise.
181          """
182          with engine.connect() as conn:
183              conn.execute(text("SELECT 1"))
184          return True
185  
186      def _get_health_check_executor(self) -> ThreadPoolExecutor:
187          """
188          Get or create a shared ThreadPoolExecutor for health check operations.
189  
190          Reuses a single executor instance to avoid thread and memory leaks
191          from creating new ThreadPoolExecutor instances on every health check
192          call. 
193  
194          Returns:
195              A shared ThreadPoolExecutor with max_workers=2.
196          """
197          if not hasattr(self, "_health_check_executor") or self._health_check_executor is None:
198              self._health_check_executor = ThreadPoolExecutor(
199                  max_workers=2, thread_name_prefix="db-health-check"
200              )
201          return self._health_check_executor
202  
203      def _is_database_connected(
204          self, timeout: float = DB_HEALTH_CHECK_TIMEOUT_SECONDS
205      ) -> bool:
206          """
207          Check if all configured databases are connected.
208  
209          Collects database engines from components and tests each connection
210          by executing a simple query with a timeout. Returns True only if ALL
211          databases are reachable within the timeout period.
212  
213          If no databases are configured, returns True.
214  
215          Uses a shared ThreadPoolExecutor to avoid thread/memory leaks from
216          creating new executors on every health check call.
217  
218          Args:
219              timeout: Maximum time in seconds to wait for each database connection
220                       test. Defaults to DB_HEALTH_CHECK_TIMEOUT_SECONDS (5 seconds).
221  
222          Returns:
223              True if all databases are connected (or none configured), False otherwise.
224          """
225          engines = self._get_db_engines_from_components()
226  
227          if not engines:
228              return True
229  
230          executor = self._get_health_check_executor()
231  
232          for engine in engines:
233              try:
234                  future = executor.submit(self._test_single_db_connection, engine)
235                  future.result(timeout=timeout)
236              except FuturesTimeoutError:
237                  log.warning(
238                      "Database health check failed: timed out after %.1f seconds",
239                      timeout,
240                  )
241                  return False
242              except Exception as e:
243                  log.warning(
244                      "Database health check failed: %s",
245                      str(e),
246                  )
247                  return False
248  
249          return True
250  
251      def _load_custom_check(self, check_path: str):
252          """
253          Load a custom health check callable from a module path.
254  
255          Args:
256              check_path: Path in format "module.path:function_name"
257  
258          Returns:
259              The callable function, or None if loading fails.
260          """
261          # Use cached callable if available
262          if not hasattr(self, "_custom_check_cache"):
263              self._custom_check_cache = {}
264  
265          if check_path in self._custom_check_cache:
266              return self._custom_check_cache[check_path]
267  
268          try:
269              if ":" not in check_path:
270                  log.error(
271                      "Invalid custom health check path '%s': "
272                      "expected format 'module.path:function_name'",
273                      check_path,
274                  )
275                  return None
276  
277              module_path, function_name = check_path.rsplit(":", 1)
278              module = importlib.import_module(module_path)
279              func = getattr(module, function_name)
280  
281              if not callable(func):
282                  log.error(
283                      "Custom health check '%s' is not callable",
284                      check_path,
285                  )
286                  return None
287  
288              # Cache the callable
289              self._custom_check_cache[check_path] = func
290              log.info("Loaded custom health check: %s", check_path)
291              return func
292  
293          except ImportError as e:
294              log.error(
295                  "Failed to import custom health check module '%s': %s",
296                  check_path,
297                  e,
298              )
299              return None
300          except AttributeError as e:
301              log.error(
302                  "Custom health check function not found '%s': %s",
303                  check_path,
304                  e,
305              )
306              return None
307  
308      def _run_custom_check(self, check_key: str) -> bool:
309          """
310          Run a custom health check if configured.
311  
312          The custom health check function receives the application instance,
313          allowing it to access app_info, flows, and other application state.
314  
315          Args:
316              check_key: The config key for the custom check
317                         (CUSTOM_STARTUP_CHECK_KEY or CUSTOM_READY_CHECK_KEY)
318  
319          Returns:
320              True if check passes or no custom check configured, False if check fails.
321          """
322          health_check_config = self.app_info.get("health_check", {})
323          check_path = health_check_config.get(check_key)
324  
325          if not check_path:
326              return True  # No custom check configured
327  
328          func = self._load_custom_check(check_path)
329          if func is None:
330              # Loading failed - treat as unhealthy
331              return False
332  
333          try:
334              result = func(self)
335  
336              if not isinstance(result, bool):
337                  log.warning(
338                      "Custom health check '%s' returned non-boolean value: %s, "
339                      "treating as unhealthy",
340                      check_path,
341                      result,
342                  )
343                  return False
344  
345              if not result:
346                  log.warning("Custom health check '%s' returned False", check_path)
347  
348              return result
349  
350          except Exception as e:
351              log.error(
352                  "Custom health check '%s' raised exception: %s",
353                  check_path,
354                  e,
355              )
356              return False
357  
358      def is_startup_complete(self) -> bool:
359          """
360          Check if the app has completed its startup/initialization phase.
361  
362          Returns True if:
363          - Broker is connected (or using dev_mode)
364          - All configured databases are connected
365          - Custom startup check passes (if configured)
366  
367          Returns False if:
368          - Broker is DISCONNECTED or RECONNECTING
369          - Any configured database is unreachable
370          - Custom startup check returns False or raises an exception
371  
372          Returns:
373              bool: True if startup is complete, False if still initializing
374          """
375          timeout = self._get_db_health_check_timeout()
376          return (
377              self._is_broker_connected()
378              and self._is_database_connected(timeout)
379              and self._run_custom_check(CUSTOM_STARTUP_CHECK_KEY)
380          )
381  
382      def is_ready(self) -> bool:
383          """
384          Check if the app is ready to process messages.
385  
386          Returns True if:
387          - Broker is connected (or using dev_mode)
388          - All configured databases are connected
389          - Custom ready check passes (if configured)
390  
391          Returns False if:
392          - Broker is DISCONNECTED or RECONNECTING
393          - Any configured database is unreachable
394          - Custom ready check returns False or raises an exception
395  
396          Returns:
397              bool: True if the app is ready, False otherwise
398          """
399          timeout = self._get_db_health_check_timeout()
400          return (
401              self._is_broker_connected()
402              and self._is_database_connected(timeout)
403              and self._run_custom_check(CUSTOM_READY_CHECK_KEY)
404          )