protocol.py
1 """ 2 Helpers for A2A protocol-level concerns, such as topic construction and 3 parsing of JSON-RPC requests and responses. 4 """ 5 import logging 6 import re 7 import uuid 8 from typing import Any, Dict, Optional, Tuple, Union 9 10 from a2a.types import ( 11 A2ARequest, 12 CancelTaskRequest, 13 GetTaskSuccessResponse, 14 InternalError, 15 InvalidRequestError, 16 JSONRPCError, 17 JSONRPCResponse, 18 JSONRPCSuccessResponse, 19 Message, 20 MessageSendParams, 21 SendMessageRequest, 22 SendMessageSuccessResponse, 23 SendStreamingMessageRequest, 24 SendStreamingMessageSuccessResponse, 25 Task, 26 TaskArtifactUpdateEvent, 27 TaskIdParams, 28 TaskStatusUpdateEvent, 29 ) 30 31 log = logging.getLogger(__name__) 32 33 # --- Topic Construction Helpers --- 34 35 A2A_VERSION = "v1" 36 A2A_BASE_PATH = f"a2a/{A2A_VERSION}" 37 38 39 def get_a2a_base_topic(namespace: str) -> str: 40 """Returns the base topic prefix for all A2A communication.""" 41 if not namespace: 42 raise ValueError("A2A namespace cannot be empty.") 43 return f"{namespace.rstrip('/')}/{A2A_BASE_PATH}" 44 45 46 def get_agent_discovery_topic(namespace: str) -> str: 47 """Returns the topic for publishing agent card discovery.""" 48 return f"{get_a2a_base_topic(namespace)}/discovery/agentcards" 49 50 51 def get_gateway_discovery_topic(namespace: str) -> str: 52 """Returns the topic for publishing gateway card discovery.""" 53 return f"{get_a2a_base_topic(namespace)}/discovery/gatewaycards" 54 55 56 def get_discovery_subscription_topic(namespace: str) -> str: 57 """ 58 Returns a wildcard subscription topic for receiving all discovery messages 59 (both agents and gateways). 60 """ 61 return f"{get_a2a_base_topic(namespace)}/discovery/>" 62 63 64 def get_agent_request_topic(namespace: str, agent_name: str) -> str: 65 """Returns the topic for sending requests to a specific agent.""" 66 if not agent_name: 67 raise ValueError("Agent name cannot be empty.") 68 return f"{get_a2a_base_topic(namespace)}/agent/request/{agent_name}" 69 70 71 def get_gateway_status_topic(namespace: str, gateway_id: str, task_id: str) -> str: 72 """ 73 Returns the specific topic for an agent to publish status updates TO a specific gateway instance. 74 """ 75 if not gateway_id: 76 raise ValueError("Gateway ID cannot be empty.") 77 if not task_id: 78 raise ValueError("Task ID cannot be empty.") 79 return f"{get_a2a_base_topic(namespace)}/gateway/status/{gateway_id}/{task_id}" 80 81 82 def get_gateway_response_topic(namespace: str, gateway_id: str, task_id: str) -> str: 83 """ 84 Returns the specific topic for an agent to publish the final response TO a specific gateway instance. 85 Includes task_id for potential correlation/filtering, though gateway might subscribe more broadly. 86 """ 87 if not gateway_id: 88 raise ValueError("Gateway ID cannot be empty.") 89 if not task_id: 90 raise ValueError("Task ID cannot be empty.") 91 return f"{get_a2a_base_topic(namespace)}/gateway/response/{gateway_id}/{task_id}" 92 93 94 def get_gateway_status_subscription_topic(namespace: str, self_gateway_id: str) -> str: 95 """ 96 Returns the wildcard topic for a gateway instance to subscribe to receive status updates 97 intended for it. 98 """ 99 if not self_gateway_id: 100 raise ValueError("Gateway ID is required for gateway status subscription") 101 return f"{get_a2a_base_topic(namespace)}/gateway/status/{self_gateway_id}/>" 102 103 104 def get_gateway_response_subscription_topic( 105 namespace: str, self_gateway_id: str 106 ) -> str: 107 """ 108 Returns the wildcard topic for a gateway instance to subscribe to receive final responses 109 intended for it. 110 """ 111 if not self_gateway_id: 112 raise ValueError("Gateway ID is required for gateway response subscription") 113 return f"{get_a2a_base_topic(namespace)}/gateway/response/{self_gateway_id}/>" 114 115 116 def get_peer_agent_status_topic( 117 namespace: str, delegating_agent_name: str, sub_task_id: str 118 ) -> str: 119 """ 120 Returns the topic for publishing status updates for a sub-task *back to the delegating agent*. 121 This topic includes the delegating agent's name. 122 """ 123 if not delegating_agent_name: 124 raise ValueError("delegating_agent_name is required for peer status topic") 125 return ( 126 f"{get_a2a_base_topic(namespace)}/agent/status/{delegating_agent_name}/{sub_task_id}" 127 ) 128 129 130 def get_agent_response_topic( 131 namespace: str, delegating_agent_name: str, sub_task_id: str 132 ) -> str: 133 """ 134 Returns the specific topic for publishing the final response for a sub-task 135 back to the delegating agent. Includes the delegating agent's name. 136 """ 137 if not delegating_agent_name: 138 raise ValueError("delegating_agent_name is required for peer response topic") 139 if not sub_task_id: 140 raise ValueError("sub_task_id is required for peer response topic") 141 return f"{get_a2a_base_topic(namespace)}/agent/response/{delegating_agent_name}/{sub_task_id}" 142 143 144 def get_agent_response_subscription_topic(namespace: str, self_agent_name: str) -> str: 145 """ 146 Returns the wildcard topic for an agent to subscribe to receive responses 147 for tasks it delegated. Includes the agent's own name. 148 """ 149 if not self_agent_name: 150 raise ValueError("self_agent_name is required for agent response subscription") 151 return f"{get_a2a_base_topic(namespace)}/agent/response/{self_agent_name}/>" 152 153 154 def get_agent_status_subscription_topic(namespace: str, self_agent_name: str) -> str: 155 """ 156 Returns the wildcard topic for an agent to subscribe to receive status updates 157 for tasks it delegated. Includes the agent's own name. 158 """ 159 if not self_agent_name: 160 raise ValueError("self_agent_name is required for agent status subscription") 161 return f"{get_a2a_base_topic(namespace)}/agent/status/{self_agent_name}/>" 162 163 164 def get_client_response_topic(namespace: str, client_id: str) -> str: 165 """Returns the topic for publishing the final response TO a specific client.""" 166 if not client_id: 167 raise ValueError("Client ID cannot be empty.") 168 return f"{get_a2a_base_topic(namespace)}/client/response/{client_id}" 169 170 171 def get_client_status_topic(namespace: str, client_id: str, task_id: str) -> str: 172 """ 173 Returns the specific topic for publishing status updates for a task *to the original client*. 174 This topic is client and task-specific. 175 """ 176 if not client_id: 177 raise ValueError("Client ID cannot be empty.") 178 if not task_id: 179 raise ValueError("Task ID cannot be empty.") 180 return f"{get_a2a_base_topic(namespace)}/client/status/{client_id}/{task_id}" 181 182 183 def get_client_status_subscription_topic(namespace: str, client_id: str) -> str: 184 """ 185 Returns the wildcard topic for a client to subscribe to receive status updates 186 for tasks it initiated. Includes the client's own ID. 187 """ 188 if not client_id: 189 raise ValueError("Client ID cannot be empty.") 190 return f"{get_a2a_base_topic(namespace)}/client/status/{client_id}/>" 191 192 193 def get_sam_events_topic(namespace: str, category: str, action: str) -> str: 194 """Returns SAM system events topic.""" 195 if not namespace: 196 raise ValueError("Namespace cannot be empty.") 197 if not category: 198 raise ValueError("Category cannot be empty.") 199 if not action: 200 raise ValueError("Action cannot be empty.") 201 return f"{namespace.rstrip('/')}/sam/events/{category}/{action}" 202 203 204 def get_feedback_topic(namespace: str) -> str: 205 """Returns the topic for publishing user feedback events.""" 206 return f"{namespace.rstrip('/')}/sam/v1/feedback/submit" 207 208 209 def get_sam_events_subscription_topic(namespace: str, category: str) -> str: 210 """Returns SAM system events subscription topic.""" 211 if not namespace: 212 raise ValueError("Namespace cannot be empty.") 213 if not category: 214 raise ValueError("Category cannot be empty.") 215 return f"{namespace.rstrip('/')}/sam/events/{category}/>" 216 217 218 def get_trust_card_topic(namespace: str, component_type: str, component_id: str) -> str: 219 """ 220 Returns the topic for publishing a Trust Card. 221 222 IMPORTANT: The component_id parameter MUST be the exact broker client-username 223 that the component uses to authenticate with the Solace broker. This is critical 224 for trust verification - trust cards are validated against the actual broker 225 authentication identity. 226 227 Args: 228 namespace: SAM namespace 229 component_type: Type of component ("gateway", "agent", etc.) 230 component_id: MUST be the broker client-username (from broker_username config). 231 DO NOT use arbitrary IDs like agent_name or gateway_id unless they 232 match the broker_username exactly. 233 234 Returns: 235 Topic string: {namespace}/a2a/v1/trust/{component_type}/{component_id} 236 237 Raises: 238 ValueError: If any parameter is empty 239 240 Security Note: 241 Trust card verification relies on matching the topic component_id with the 242 authenticated broker client-username. Using a different value breaks the 243 security model and trust chain verification. 244 """ 245 if not namespace: 246 raise ValueError("Namespace cannot be empty.") 247 if not component_type: 248 raise ValueError("Component type cannot be empty.") 249 if not component_id: 250 raise ValueError("Component ID cannot be empty.") 251 return f"{get_a2a_base_topic(namespace)}/trust/{component_type}/{component_id}" 252 253 254 def get_trust_card_subscription_topic(namespace: str, component_type: Optional[str] = None) -> str: 255 """ 256 Returns subscription pattern for Trust Cards. 257 258 Args: 259 namespace: SAM namespace 260 component_type: Optional - subscribe to specific type, or None for all types 261 262 Returns: 263 Subscription pattern 264 """ 265 if not namespace: 266 raise ValueError("Namespace cannot be empty.") 267 268 if component_type: 269 return f"{get_a2a_base_topic(namespace)}/trust/{component_type}/*" 270 else: 271 return f"{get_a2a_base_topic(namespace)}/trust/*/*" 272 273 274 def extract_trust_card_info_from_topic(topic: str) -> tuple[str, str]: 275 """ 276 Extracts component type and ID from trust card topic. 277 278 Args: 279 topic: Trust card topic 280 281 Returns: 282 Tuple of (component_type, component_id) 283 284 Raises: 285 ValueError: If topic format is invalid 286 """ 287 parts = topic.split('/') 288 if len(parts) < 6 or parts[1] != 'a2a' or parts[2] != 'v1' or parts[3] != 'trust': 289 raise ValueError(f"Invalid trust card topic format: {topic}") 290 291 component_type = parts[4] 292 component_id = parts[5] 293 return component_type, component_id 294 295 296 def subscription_to_regex(subscription: str) -> str: 297 """Converts a Solace topic subscription string to a regex pattern.""" 298 # Escape regex special characters except for Solace wildcards 299 pattern = re.escape(subscription) 300 # Replace Solace single-level wildcard '*' with regex equivalent '[^/]+' 301 pattern = pattern.replace(r"\*", r"[^/]+") 302 # Replace Solace multi-level wildcard '>' at the end with regex equivalent '.*' 303 if pattern.endswith(r"/>"): 304 pattern = pattern[:-1] + r".*" # Remove escaped '>' and add '.*' 305 return pattern 306 307 308 def topic_matches_subscription(topic: str, subscription: str) -> bool: 309 """Checks if a topic matches a Solace subscription pattern.""" 310 regex_pattern = subscription_to_regex(subscription) 311 return re.fullmatch(regex_pattern, topic) is not None 312 313 314 # --- JSON-RPC Envelope Helpers --- 315 316 317 def get_request_id(request: A2ARequest) -> str | int: 318 """Gets the JSON-RPC request ID from any A2A request object.""" 319 return request.root.id 320 321 322 def get_request_method(request: A2ARequest) -> str: 323 """Gets the JSON-RPC method name from any A2A request object.""" 324 return request.root.method 325 326 327 def get_message_from_send_request(request: A2ARequest) -> Optional[Message]: 328 """ 329 Safely gets the Message object from a SendMessageRequest or 330 SendStreamingMessageRequest. Returns None for other request types. 331 """ 332 if isinstance(request.root, (SendMessageRequest, SendStreamingMessageRequest)): 333 return request.root.params.message 334 return None 335 336 337 def get_task_id_from_cancel_request(request: A2ARequest) -> Optional[str]: 338 """Safely gets the task ID from a CancelTaskRequest.""" 339 if isinstance(request.root, CancelTaskRequest): 340 return request.root.params.id 341 return None 342 343 344 def get_response_id(response: JSONRPCResponse) -> Optional[Union[str, int]]: 345 """Safely gets the ID from any JSON-RPC response object.""" 346 if hasattr(response.root, "id"): 347 return response.root.id 348 return None 349 350 351 def get_response_result(response: JSONRPCResponse) -> Optional[Any]: 352 """Safely gets the result object from any successful JSON-RPC response.""" 353 if hasattr(response.root, "result"): 354 return response.root.result 355 return None 356 357 358 def get_response_error(response: JSONRPCResponse) -> Optional[JSONRPCError]: 359 """Safely gets the error object from any JSON-RPC error response.""" 360 if hasattr(response.root, "error"): 361 return response.root.error 362 return None 363 364 365 def get_error_message(error: JSONRPCError) -> str: 366 """Safely gets the message string from a JSONRPCError object.""" 367 return error.message 368 369 370 def get_error_code(error: JSONRPCError) -> int: 371 """Safely gets the code from a JSONRPCError object.""" 372 return error.code 373 374 375 def get_error_data(error: JSONRPCError) -> Optional[Any]: 376 """Safely gets the data from a JSONRPCError object.""" 377 return error.data 378 379 380 def create_success_response( 381 result: Any, request_id: Optional[Union[str, int]] 382 ) -> JSONRPCResponse: 383 """ 384 Creates a successful JSON-RPC response object by wrapping the result in the 385 appropriate specific success response model based on the result's type. 386 387 Args: 388 result: The result payload (e.g., Task, TaskStatusUpdateEvent). 389 request_id: The ID of the original request. 390 391 Returns: 392 A new `JSONRPCResponse` object. 393 394 Raises: 395 TypeError: If the result type is not a supported A2A model. 396 """ 397 specific_response: Any 398 if isinstance(result, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent)): 399 specific_response = SendStreamingMessageSuccessResponse( 400 id=request_id, result=result 401 ) 402 elif isinstance(result, Task): 403 # When returning a final task, GetTaskSuccessResponse is a suitable choice. 404 specific_response = GetTaskSuccessResponse(id=request_id, result=result) 405 else: 406 raise TypeError( 407 f"Unsupported result type for create_success_response: {type(result).__name__}" 408 ) 409 410 return JSONRPCResponse(root=specific_response) 411 412 413 def create_internal_error_response( 414 message: str, 415 request_id: Optional[Union[str, int]], 416 data: Optional[Dict[str, Any]] = None, 417 ) -> JSONRPCResponse: 418 """ 419 Creates a JSON-RPC response object for an InternalError. 420 421 Args: 422 message: The error message. 423 request_id: The ID of the original request. 424 data: Optional structured data to include with the error. 425 426 Returns: 427 A new `JSONRPCResponse` object containing an `InternalError`. 428 """ 429 error = create_internal_error(message=message, data=data) 430 return JSONRPCResponse(id=request_id, error=error) 431 432 433 def create_invalid_request_error_response( 434 message: str, 435 request_id: Optional[Union[str, int]], 436 data: Optional[Any] = None, 437 ) -> JSONRPCResponse: 438 """ 439 Creates a JSON-RPC response object for an InvalidRequestError. 440 441 Args: 442 message: The error message. 443 request_id: The ID of the original request. 444 data: Optional structured data to include with the error. 445 446 Returns: 447 A new `JSONRPCResponse` object containing an `InvalidRequestError`. 448 """ 449 error = create_invalid_request_error(message=message, data=data) 450 return JSONRPCResponse(id=request_id, error=error) 451 452 453 def create_internal_error( 454 message: str, 455 data: Optional[Dict[str, Any]] = None, 456 ) -> InternalError: 457 """ 458 Creates an InternalError object. 459 460 Args: 461 message: The error message. 462 data: Optional structured data to include with the error. 463 464 Returns: 465 A new `InternalError` object. 466 """ 467 return InternalError(message=message, data=data) 468 469 470 def create_invalid_request_error( 471 message: str, data: Optional[Any] = None 472 ) -> InvalidRequestError: 473 """ 474 Creates an InvalidRequestError object. 475 476 Args: 477 message: The error message. 478 data: Optional structured data to include with the error. 479 480 Returns: 481 A new `InvalidRequestError` object. 482 """ 483 return InvalidRequestError(message=message, data=data) 484 485 486 def create_generic_success_response( 487 result: Any, request_id: Optional[Union[str, int]] = None 488 ) -> JSONRPCSuccessResponse: 489 """ 490 Creates a generic successful JSON-RPC response object. 491 Note: This is for non-A2A-spec-compliant endpoints that use a similar structure. 492 493 Args: 494 result: The result payload for the response. 495 request_id: The ID of the original request. 496 497 Returns: 498 A new `JSONRPCSuccessResponse` object. 499 """ 500 return JSONRPCSuccessResponse(id=request_id, result=result) 501 502 503 def create_error_response( 504 error: JSONRPCError, 505 request_id: Optional[Union[str, int]], 506 ) -> JSONRPCResponse: 507 """ 508 Creates a JSON-RPC error response object from a given error model. 509 510 Args: 511 error: The JSONRPCError model instance. 512 request_id: The ID of the original request. 513 514 Returns: 515 A new `JSONRPCResponse` object containing the error. 516 """ 517 return JSONRPCResponse(id=request_id, error=error) 518 519 520 def create_cancel_task_request(task_id: str) -> CancelTaskRequest: 521 """ 522 Creates a CancelTaskRequest object. 523 524 Args: 525 task_id: The ID of the task to cancel. 526 527 Returns: 528 A new `CancelTaskRequest` object. 529 """ 530 params = TaskIdParams(id=task_id) 531 return CancelTaskRequest(id=uuid.uuid4().hex, params=params) 532 533 534 def create_send_message_request( 535 message: Message, 536 task_id: str, 537 metadata: Optional[Dict[str, Any]] = None, 538 ) -> SendMessageRequest: 539 """ 540 Creates a SendMessageRequest object. 541 542 Args: 543 message: The A2AMessage object to send. 544 task_id: The unique ID for the task. 545 metadata: Optional metadata for the send request. 546 547 Returns: 548 A new `SendMessageRequest` object. 549 """ 550 send_params = MessageSendParams(message=message, metadata=metadata) 551 return SendMessageRequest(id=task_id, params=send_params) 552 553 554 def create_send_streaming_message_request( 555 message: Message, 556 task_id: str, 557 metadata: Optional[Dict[str, Any]] = None, 558 ) -> SendStreamingMessageRequest: 559 """ 560 Creates a SendStreamingMessageRequest object. 561 562 Args: 563 message: The A2AMessage object to send. 564 task_id: The unique ID for the task. 565 metadata: Optional metadata for the send request. 566 567 Returns: 568 A new `SendStreamingMessageRequest` object. 569 """ 570 send_params = MessageSendParams(message=message, metadata=metadata) 571 return SendStreamingMessageRequest(id=task_id, params=send_params) 572 573 574 def create_send_message_success_response( 575 result: Union[Task, Message], request_id: Optional[Union[str, int]] 576 ) -> SendMessageSuccessResponse: 577 """ 578 Creates a SendMessageSuccessResponse object. 579 580 Args: 581 result: The result payload (Task or Message). 582 request_id: The ID of the original request. 583 584 Returns: 585 A new `SendMessageSuccessResponse` object. 586 """ 587 return SendMessageSuccessResponse(id=request_id, result=result) 588 589 590 def create_send_streaming_message_success_response( 591 result: Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 592 request_id: Optional[Union[str, int]], 593 ) -> SendStreamingMessageSuccessResponse: 594 """ 595 Creates a SendStreamingMessageSuccessResponse object. 596 597 Args: 598 result: The result payload. 599 request_id: The ID of the original request. 600 601 Returns: 602 A new `SendStreamingMessageSuccessResponse` object. 603 """ 604 return SendStreamingMessageSuccessResponse(id=request_id, result=result) 605 606 607 def extract_task_id_from_topic( 608 topic: str, subscription_pattern: str, log_identifier: str 609 ) -> Optional[str]: 610 """Extracts the task ID from the end of a topic string based on the subscription.""" 611 base_regex_str = subscription_to_regex(subscription_pattern).replace(r".*", "") 612 match = re.match(base_regex_str, topic) 613 if match: 614 task_id_part = topic[match.end() :] 615 task_id = task_id_part.lstrip("/") 616 if task_id: 617 log.debug( 618 "%s Extracted Task ID '%s' from topic '%s'", 619 log_identifier, 620 task_id, 621 topic, 622 ) 623 return task_id 624 log.warning( 625 "%s Could not extract Task ID from topic '%s' using pattern '%s'", 626 log_identifier, 627 topic, 628 subscription_pattern, 629 ) 630 return None 631 632 633 # --- Client Event Helpers --- 634 635 636 def is_client_event(obj: Any) -> bool: 637 """ 638 Checks if an object is a ClientEvent tuple (Task, UpdateEvent). 639 640 A ClientEvent is a tuple with 2 elements where the first element is a Task 641 and the second is either a TaskStatusUpdateEvent, TaskArtifactUpdateEvent, or None. 642 643 Args: 644 obj: The object to check. 645 646 Returns: 647 True if the object is a ClientEvent tuple, False otherwise. 648 """ 649 if not isinstance(obj, tuple) or len(obj) != 2: 650 return False 651 652 task, update_event = obj 653 654 # First element must be a Task 655 if not isinstance(task, Task): 656 return False 657 658 # Second element must be an update event or None 659 if update_event is not None and not isinstance( 660 update_event, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent) 661 ): 662 return False 663 664 return True 665 666 667 def is_message_object(obj: Any) -> bool: 668 """ 669 Checks if an object is a Message. 670 671 Args: 672 obj: The object to check. 673 674 Returns: 675 True if the object is a Message, False otherwise. 676 """ 677 return isinstance(obj, Message) 678 679 680 def unpack_client_event( 681 event: tuple, 682 ) -> Tuple[Task, Optional[Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent]]]: 683 """ 684 Safely unpacks a ClientEvent tuple into its components. 685 686 Args: 687 event: A ClientEvent tuple (Task, UpdateEvent). 688 689 Returns: 690 A tuple of (Task, Optional[UpdateEvent]) where UpdateEvent can be 691 TaskStatusUpdateEvent, TaskArtifactUpdateEvent, or None. 692 693 Raises: 694 ValueError: If the event is not a valid ClientEvent tuple. 695 """ 696 if not is_client_event(event): 697 raise ValueError( 698 f"Expected a ClientEvent tuple, got {type(event).__name__}" 699 ) 700 701 task, update_event = event 702 return task, update_event