/ src / solace_agent_mesh / common / observability.py
observability.py
  1  """
  2  SAM-specific observability monitors.
  3  
  4  Provides type-safe monitor classes for instrumenting SAM agents and tools.
  5  These extend solace_ai_connector's OperationMonitor with constrained APIs
  6  to prevent accidental metric explosion.
  7  """
  8  
  9  from solace_ai_connector.common.observability.monitors.operation import OperationMonitor
 10  from solace_ai_connector.common.observability.monitors.remote import RemoteRequestMonitor
 11  from solace_ai_connector.common.observability.monitors.base import MonitorInstance
 12  
 13  
 14  class AgentMonitor(OperationMonitor):
 15      """
 16      Type-safe monitor for agent execution duration.
 17  
 18      Inherits from OperationMonitor but constrains the API to prevent metric explosion.
 19      Automatically sets type="agent" and operation.name="execute".
 20  
 21      Maps to: operation.duration histogram
 22      Labels: type="agent", component.name=<agent_name>, operation.name="execute", error.type
 23  
 24      Usage:
 25          from solace_agent_mesh.common.observability import AgentMonitor
 26          from solace_ai_connector.common.observability import MonitorLatency
 27  
 28          monitor = MonitorLatency(AgentMonitor.create(name="ResearchAgent"))
 29          with monitor:
 30              # agent execution code
 31      """
 32  
 33      @classmethod
 34      def create(cls, name: str) -> MonitorInstance:
 35          """
 36          Create agent monitor instance.
 37  
 38          Args:
 39              name: The agent name (e.g., "ResearchAgent", "WebAgent")
 40  
 41          Returns:
 42              MonitorInstance configured for agent execution tracking
 43          """
 44          return super().create(
 45              component_type="agent",
 46              component_name=name,
 47              operation="execute"
 48          )
 49  
 50  
 51  class ToolMonitor(OperationMonitor):
 52      """
 53      Type-safe monitor for tool execution duration (aggregated across all agents).
 54  
 55      Inherits from OperationMonitor but constrains the API to prevent metric explosion.
 56      Automatically sets type="tool" and operation.name="execute".
 57  
 58      Maps to: operation.duration histogram
 59      Labels: type="tool", component.name=<tool_name>, operation.name="execute", error.type
 60  
 61      Usage:
 62          from solace_agent_mesh.common.observability import ToolMonitor
 63          from solace_ai_connector.common.observability import MonitorLatency
 64  
 65          monitor = MonitorLatency(ToolMonitor.create(name="web_search"))
 66          with monitor:
 67              # tool execution code
 68      """
 69  
 70      @classmethod
 71      def create(cls, name: str) -> MonitorInstance:
 72          """
 73          Create tool monitor instance.
 74  
 75          Args:
 76              name: The tool name (e.g., "web_search", "deep_research", "query_data_with_sql")
 77  
 78          Returns:
 79              MonitorInstance configured for tool execution tracking (aggregated across agents)
 80          """
 81          return super().create(
 82              component_type="tool",
 83              component_name=name,
 84              operation="execute"
 85          )
 86  
 87  
 88  class RemoteAgentProxyMonitor(OperationMonitor):
 89      """
 90      Type-safe monitor for A2A proxy request duration.
 91  
 92      Inherits from OperationMonitor but constrains the API to prevent metric explosion.
 93      Automatically sets type="a2a_agent" and operation.name="forward_request".
 94  
 95      Maps to: operation.duration histogram
 96      Labels: type="a2a_agent", component.name=<agent_name>, operation.name="forward_request", error.type
 97  
 98      Usage:
 99          from solace_agent_mesh.common.observability import RemoteAgentProxyMonitor
100          from solace_ai_connector.common.observability import MonitorLatency
101  
102          monitor = MonitorLatency(RemoteAgentProxyMonitor.create("MyAgent"))
103          monitor.start()
104          try:
105              # ... forwarding logic ...
106              monitor.stop()
107          except Exception as e:
108              monitor.error(e)
109              raise
110      """
111  
112      @staticmethod
113      def parse_error(exc: Exception) -> str:
114          """
115          Categorize A2A proxy exceptions into error types for observability.
116  
117          Maps exceptions to error.type label values:
118          - "auth_error": A2AClientHTTPError with status 401 or 403
119          - "4xx_error": A2AClientHTTPError with other 4xx status
120          - "5xx_error": A2AClientHTTPError with 5xx status
121          - "jsonrpc_error": A2AClientJSONRPCError (protocol-level errors)
122          - "timeout": httpx.TimeoutException or built-in TimeoutError
123          - "connection_error": ConnectionError
124          - Exception class name: Fallback for uncategorized errors
125          """
126          try:
127              from a2a.client import A2AClientHTTPError
128  
129              if isinstance(exc, A2AClientHTTPError):
130                  code = exc.status_code
131                  if code in (401, 403):
132                      return "auth_error"
133                  if 400 <= code < 500:
134                      return "4xx_error"
135                  if 500 <= code < 600:
136                      return "5xx_error"
137                  return f"http_{code}"
138          except ImportError:
139              pass
140  
141          try:
142              from a2a.client.errors import A2AClientJSONRPCError
143  
144              if isinstance(exc, A2AClientJSONRPCError):
145                  return "jsonrpc_error"
146          except ImportError:
147              pass
148  
149          try:
150              import httpx
151  
152              if isinstance(exc, httpx.TimeoutException):
153                  return "timeout"
154          except ImportError:
155              pass
156  
157          if isinstance(exc, ConnectionError):
158              return "connection_error"
159  
160          return OperationMonitor.parse_error(exc)
161  
162      @classmethod
163      def create(cls, name: str) -> MonitorInstance:
164          """
165          Create monitor instance for a forward_request operation to a remote A2A agent.
166  
167          Args:
168              name: The name of the downstream agent being called (e.g., "MyRemoteAgent").
169  
170          Returns:
171              MonitorInstance configured for A2A proxy request tracking.
172          """
173          instance = super().create(
174              component_type="a2a_agent",
175              component_name=name,
176              operation="forward_request"
177          )
178          instance.error_parser = cls.parse_error
179          return instance
180  class ArtifactMonitor(RemoteRequestMonitor):
181      """
182      Type-safe monitor for artifact service operation duration.
183  
184      Uses RemoteRequestMonitor since artifacts are a single external service,
185      not a group of equivalent components. Constrains the API via named factory
186      methods to prevent metric explosion.
187  
188      Maps to: outbound.request.duration histogram
189      Labels: service.peer.name="artifact_service",
190              operation.name=<operation>, error.type
191  
192      Usage:
193          from solace_agent_mesh.common.observability import ArtifactMonitor
194          from solace_ai_connector.common.observability import MonitorLatency
195  
196          with MonitorLatency(ArtifactMonitor.save()):
197              result = await service.save_artifact(...)
198      """
199  
200      @classmethod
201      def _create(cls, operation: str) -> MonitorInstance:
202          """Internal factory — all public methods delegate here."""
203          return MonitorInstance(
204              monitor_type=cls.monitor_type,
205              labels={
206                  "service.peer.name": "artifact_service",
207                  "operation.name": operation,
208              },
209              error_parser=cls.parse_error,
210          )
211  
212      @classmethod
213      def save(cls) -> MonitorInstance:
214          """Create monitor instance for save_artifact operation."""
215          return cls._create("save")
216  
217      @classmethod
218      def load(cls) -> MonitorInstance:
219          """Create monitor instance for load_artifact and get_artifact_version operations."""
220          return cls._create("load")
221  
222      @classmethod
223      def delete(cls) -> MonitorInstance:
224          """Create monitor instance for delete_artifact operation."""
225          return cls._create("delete")
226  
227      @classmethod
228      def list(cls) -> MonitorInstance:
229          """Create monitor instance for all list operations (keys, versions, artifact_versions)."""
230          return cls._create("list")