observability.py
1 """ 2 SAM-specific observability monitors. 3 4 Provides type-safe monitor classes for instrumenting SAM agents and tools. 5 These extend solace_ai_connector's OperationMonitor with constrained APIs 6 to prevent accidental metric explosion. 7 """ 8 9 from solace_ai_connector.common.observability.monitors.operation import OperationMonitor 10 from solace_ai_connector.common.observability.monitors.remote import RemoteRequestMonitor 11 from solace_ai_connector.common.observability.monitors.base import MonitorInstance 12 13 14 class AgentMonitor(OperationMonitor): 15 """ 16 Type-safe monitor for agent execution duration. 17 18 Inherits from OperationMonitor but constrains the API to prevent metric explosion. 19 Automatically sets type="agent" and operation.name="execute". 20 21 Maps to: operation.duration histogram 22 Labels: type="agent", component.name=<agent_name>, operation.name="execute", error.type 23 24 Usage: 25 from solace_agent_mesh.common.observability import AgentMonitor 26 from solace_ai_connector.common.observability import MonitorLatency 27 28 monitor = MonitorLatency(AgentMonitor.create(name="ResearchAgent")) 29 with monitor: 30 # agent execution code 31 """ 32 33 @classmethod 34 def create(cls, name: str) -> MonitorInstance: 35 """ 36 Create agent monitor instance. 37 38 Args: 39 name: The agent name (e.g., "ResearchAgent", "WebAgent") 40 41 Returns: 42 MonitorInstance configured for agent execution tracking 43 """ 44 return super().create( 45 component_type="agent", 46 component_name=name, 47 operation="execute" 48 ) 49 50 51 class ToolMonitor(OperationMonitor): 52 """ 53 Type-safe monitor for tool execution duration (aggregated across all agents). 54 55 Inherits from OperationMonitor but constrains the API to prevent metric explosion. 56 Automatically sets type="tool" and operation.name="execute". 57 58 Maps to: operation.duration histogram 59 Labels: type="tool", component.name=<tool_name>, operation.name="execute", error.type 60 61 Usage: 62 from solace_agent_mesh.common.observability import ToolMonitor 63 from solace_ai_connector.common.observability import MonitorLatency 64 65 monitor = MonitorLatency(ToolMonitor.create(name="web_search")) 66 with monitor: 67 # tool execution code 68 """ 69 70 @classmethod 71 def create(cls, name: str) -> MonitorInstance: 72 """ 73 Create tool monitor instance. 74 75 Args: 76 name: The tool name (e.g., "web_search", "deep_research", "query_data_with_sql") 77 78 Returns: 79 MonitorInstance configured for tool execution tracking (aggregated across agents) 80 """ 81 return super().create( 82 component_type="tool", 83 component_name=name, 84 operation="execute" 85 ) 86 87 88 class RemoteAgentProxyMonitor(OperationMonitor): 89 """ 90 Type-safe monitor for A2A proxy request duration. 91 92 Inherits from OperationMonitor but constrains the API to prevent metric explosion. 93 Automatically sets type="a2a_agent" and operation.name="forward_request". 94 95 Maps to: operation.duration histogram 96 Labels: type="a2a_agent", component.name=<agent_name>, operation.name="forward_request", error.type 97 98 Usage: 99 from solace_agent_mesh.common.observability import RemoteAgentProxyMonitor 100 from solace_ai_connector.common.observability import MonitorLatency 101 102 monitor = MonitorLatency(RemoteAgentProxyMonitor.create("MyAgent")) 103 monitor.start() 104 try: 105 # ... forwarding logic ... 106 monitor.stop() 107 except Exception as e: 108 monitor.error(e) 109 raise 110 """ 111 112 @staticmethod 113 def parse_error(exc: Exception) -> str: 114 """ 115 Categorize A2A proxy exceptions into error types for observability. 116 117 Maps exceptions to error.type label values: 118 - "auth_error": A2AClientHTTPError with status 401 or 403 119 - "4xx_error": A2AClientHTTPError with other 4xx status 120 - "5xx_error": A2AClientHTTPError with 5xx status 121 - "jsonrpc_error": A2AClientJSONRPCError (protocol-level errors) 122 - "timeout": httpx.TimeoutException or built-in TimeoutError 123 - "connection_error": ConnectionError 124 - Exception class name: Fallback for uncategorized errors 125 """ 126 try: 127 from a2a.client import A2AClientHTTPError 128 129 if isinstance(exc, A2AClientHTTPError): 130 code = exc.status_code 131 if code in (401, 403): 132 return "auth_error" 133 if 400 <= code < 500: 134 return "4xx_error" 135 if 500 <= code < 600: 136 return "5xx_error" 137 return f"http_{code}" 138 except ImportError: 139 pass 140 141 try: 142 from a2a.client.errors import A2AClientJSONRPCError 143 144 if isinstance(exc, A2AClientJSONRPCError): 145 return "jsonrpc_error" 146 except ImportError: 147 pass 148 149 try: 150 import httpx 151 152 if isinstance(exc, httpx.TimeoutException): 153 return "timeout" 154 except ImportError: 155 pass 156 157 if isinstance(exc, ConnectionError): 158 return "connection_error" 159 160 return OperationMonitor.parse_error(exc) 161 162 @classmethod 163 def create(cls, name: str) -> MonitorInstance: 164 """ 165 Create monitor instance for a forward_request operation to a remote A2A agent. 166 167 Args: 168 name: The name of the downstream agent being called (e.g., "MyRemoteAgent"). 169 170 Returns: 171 MonitorInstance configured for A2A proxy request tracking. 172 """ 173 instance = super().create( 174 component_type="a2a_agent", 175 component_name=name, 176 operation="forward_request" 177 ) 178 instance.error_parser = cls.parse_error 179 return instance 180 class ArtifactMonitor(RemoteRequestMonitor): 181 """ 182 Type-safe monitor for artifact service operation duration. 183 184 Uses RemoteRequestMonitor since artifacts are a single external service, 185 not a group of equivalent components. Constrains the API via named factory 186 methods to prevent metric explosion. 187 188 Maps to: outbound.request.duration histogram 189 Labels: service.peer.name="artifact_service", 190 operation.name=<operation>, error.type 191 192 Usage: 193 from solace_agent_mesh.common.observability import ArtifactMonitor 194 from solace_ai_connector.common.observability import MonitorLatency 195 196 with MonitorLatency(ArtifactMonitor.save()): 197 result = await service.save_artifact(...) 198 """ 199 200 @classmethod 201 def _create(cls, operation: str) -> MonitorInstance: 202 """Internal factory — all public methods delegate here.""" 203 return MonitorInstance( 204 monitor_type=cls.monitor_type, 205 labels={ 206 "service.peer.name": "artifact_service", 207 "operation.name": operation, 208 }, 209 error_parser=cls.parse_error, 210 ) 211 212 @classmethod 213 def save(cls) -> MonitorInstance: 214 """Create monitor instance for save_artifact operation.""" 215 return cls._create("save") 216 217 @classmethod 218 def load(cls) -> MonitorInstance: 219 """Create monitor instance for load_artifact and get_artifact_version operations.""" 220 return cls._create("load") 221 222 @classmethod 223 def delete(cls) -> MonitorInstance: 224 """Create monitor instance for delete_artifact operation.""" 225 return cls._create("delete") 226 227 @classmethod 228 def list(cls) -> MonitorInstance: 229 """Create monitor instance for all list operations (keys, versions, artifact_versions).""" 230 return cls._create("list")