app.py
 1  """
 2  Custom Solace AI Connector App class for the GDK-based Test Gateway.
 3  Defines configuration schema (if any specific needed) and programmatically
 4  creates the TestGatewayComponent.
 5  """
 6  import logging
 7  from typing import Any, Dict, List, Type
 8  
 9  from pydantic import ValidationError
10  
11  from solace_agent_mesh.gateway.base.app import BaseGatewayApp
12  from solace_agent_mesh.gateway.base.component import BaseGatewayComponent
13  
14  from .component import TestGatewayComponent
15  
16  log = logging.getLogger(__name__)
17  
18  info = {
19      "class_name": "TestGatewayApp",
20      "description": "App class for the GDK-based Test Gateway used in integration testing.",
21  }
22  
23  
24  class TestGatewayApp(BaseGatewayApp):
25      """
26      Custom App class for the GDK-based Test Gateway.
27      - Extends BaseGatewayApp for common gateway functionalities.
28      - Specifies TestGatewayComponent as its operational component.
29      """
30  
31      SPECIFIC_APP_SCHEMA_PARAMS: List[Dict[str, Any]] = []
32  
33      def __init__(self, app_info: Dict[str, Any], **kwargs):
34          """
35          Initializes the TestGatewayApp.
36          Most setup is handled by BaseGatewayApp.
37          """
38          log.debug(
39              "%s Initializing TestGatewayApp...",
40              app_info.get("name", "TestGatewayApp"),
41          )
42  
43          app_info.setdefault("broker", {})
44          app_info["broker"]["dev_mode"] = True
45  
46          super().__init__(app_info=app_info, **kwargs)
47          log.debug("%s TestGatewayApp initialization complete.", self.name)
48  
49      def _get_gateway_component_class(self) -> Type[BaseGatewayComponent]:
50          """
51          Returns the specific gateway component class for this app.
52          """
53          return TestGatewayComponent