/ src / solace_agent_mesh / common / a2a / protocol.py
protocol.py
  1  """
  2  Helpers for A2A protocol-level concerns, such as topic construction and
  3  parsing of JSON-RPC requests and responses.
  4  """
  5  import logging
  6  import re
  7  import uuid
  8  from typing import Any, Dict, Optional, Tuple, Union
  9  
 10  from a2a.types import (
 11      A2ARequest,
 12      CancelTaskRequest,
 13      GetTaskSuccessResponse,
 14      InternalError,
 15      InvalidRequestError,
 16      JSONRPCError,
 17      JSONRPCResponse,
 18      JSONRPCSuccessResponse,
 19      Message,
 20      MessageSendParams,
 21      SendMessageRequest,
 22      SendMessageSuccessResponse,
 23      SendStreamingMessageRequest,
 24      SendStreamingMessageSuccessResponse,
 25      Task,
 26      TaskArtifactUpdateEvent,
 27      TaskIdParams,
 28      TaskStatusUpdateEvent,
 29  )
 30  
 31  log = logging.getLogger(__name__)
 32  
 33  # --- Topic Construction Helpers ---
 34  
 35  A2A_VERSION = "v1"
 36  A2A_BASE_PATH = f"a2a/{A2A_VERSION}"
 37  
 38  
 39  def get_a2a_base_topic(namespace: str) -> str:
 40      """Returns the base topic prefix for all A2A communication."""
 41      if not namespace:
 42          raise ValueError("A2A namespace cannot be empty.")
 43      return f"{namespace.rstrip('/')}/{A2A_BASE_PATH}"
 44  
 45  
 46  def get_agent_discovery_topic(namespace: str) -> str:
 47      """Returns the topic for publishing agent card discovery."""
 48      return f"{get_a2a_base_topic(namespace)}/discovery/agentcards"
 49  
 50  
 51  def get_gateway_discovery_topic(namespace: str) -> str:
 52      """Returns the topic for publishing gateway card discovery."""
 53      return f"{get_a2a_base_topic(namespace)}/discovery/gatewaycards"
 54  
 55  
 56  def get_discovery_subscription_topic(namespace: str) -> str:
 57      """
 58      Returns a wildcard subscription topic for receiving all discovery messages
 59      (both agents and gateways).
 60      """
 61      return f"{get_a2a_base_topic(namespace)}/discovery/>"
 62  
 63  
 64  def get_agent_request_topic(namespace: str, agent_name: str) -> str:
 65      """Returns the topic for sending requests to a specific agent."""
 66      if not agent_name:
 67          raise ValueError("Agent name cannot be empty.")
 68      return f"{get_a2a_base_topic(namespace)}/agent/request/{agent_name}"
 69  
 70  
 71  def get_gateway_status_topic(namespace: str, gateway_id: str, task_id: str) -> str:
 72      """
 73      Returns the specific topic for an agent to publish status updates TO a specific gateway instance.
 74      """
 75      if not gateway_id:
 76          raise ValueError("Gateway ID cannot be empty.")
 77      if not task_id:
 78          raise ValueError("Task ID cannot be empty.")
 79      return f"{get_a2a_base_topic(namespace)}/gateway/status/{gateway_id}/{task_id}"
 80  
 81  
 82  def get_gateway_response_topic(namespace: str, gateway_id: str, task_id: str) -> str:
 83      """
 84      Returns the specific topic for an agent to publish the final response TO a specific gateway instance.
 85      Includes task_id for potential correlation/filtering, though gateway might subscribe more broadly.
 86      """
 87      if not gateway_id:
 88          raise ValueError("Gateway ID cannot be empty.")
 89      if not task_id:
 90          raise ValueError("Task ID cannot be empty.")
 91      return f"{get_a2a_base_topic(namespace)}/gateway/response/{gateway_id}/{task_id}"
 92  
 93  
 94  def get_gateway_status_subscription_topic(namespace: str, self_gateway_id: str) -> str:
 95      """
 96      Returns the wildcard topic for a gateway instance to subscribe to receive status updates
 97      intended for it.
 98      """
 99      if not self_gateway_id:
100          raise ValueError("Gateway ID is required for gateway status subscription")
101      return f"{get_a2a_base_topic(namespace)}/gateway/status/{self_gateway_id}/>"
102  
103  
104  def get_gateway_response_subscription_topic(
105      namespace: str, self_gateway_id: str
106  ) -> str:
107      """
108      Returns the wildcard topic for a gateway instance to subscribe to receive final responses
109      intended for it.
110      """
111      if not self_gateway_id:
112          raise ValueError("Gateway ID is required for gateway response subscription")
113      return f"{get_a2a_base_topic(namespace)}/gateway/response/{self_gateway_id}/>"
114  
115  
116  def get_peer_agent_status_topic(
117      namespace: str, delegating_agent_name: str, sub_task_id: str
118  ) -> str:
119      """
120      Returns the topic for publishing status updates for a sub-task *back to the delegating agent*.
121      This topic includes the delegating agent's name.
122      """
123      if not delegating_agent_name:
124          raise ValueError("delegating_agent_name is required for peer status topic")
125      return (
126          f"{get_a2a_base_topic(namespace)}/agent/status/{delegating_agent_name}/{sub_task_id}"
127      )
128  
129  
130  def get_agent_response_topic(
131      namespace: str, delegating_agent_name: str, sub_task_id: str
132  ) -> str:
133      """
134      Returns the specific topic for publishing the final response for a sub-task
135      back to the delegating agent. Includes the delegating agent's name.
136      """
137      if not delegating_agent_name:
138          raise ValueError("delegating_agent_name is required for peer response topic")
139      if not sub_task_id:
140          raise ValueError("sub_task_id is required for peer response topic")
141      return f"{get_a2a_base_topic(namespace)}/agent/response/{delegating_agent_name}/{sub_task_id}"
142  
143  
144  def get_agent_response_subscription_topic(namespace: str, self_agent_name: str) -> str:
145      """
146      Returns the wildcard topic for an agent to subscribe to receive responses
147      for tasks it delegated. Includes the agent's own name.
148      """
149      if not self_agent_name:
150          raise ValueError("self_agent_name is required for agent response subscription")
151      return f"{get_a2a_base_topic(namespace)}/agent/response/{self_agent_name}/>"
152  
153  
154  def get_agent_status_subscription_topic(namespace: str, self_agent_name: str) -> str:
155      """
156      Returns the wildcard topic for an agent to subscribe to receive status updates
157      for tasks it delegated. Includes the agent's own name.
158      """
159      if not self_agent_name:
160          raise ValueError("self_agent_name is required for agent status subscription")
161      return f"{get_a2a_base_topic(namespace)}/agent/status/{self_agent_name}/>"
162  
163  
164  def get_client_response_topic(namespace: str, client_id: str) -> str:
165      """Returns the topic for publishing the final response TO a specific client."""
166      if not client_id:
167          raise ValueError("Client ID cannot be empty.")
168      return f"{get_a2a_base_topic(namespace)}/client/response/{client_id}"
169  
170  
171  def get_client_status_topic(namespace: str, client_id: str, task_id: str) -> str:
172      """
173      Returns the specific topic for publishing status updates for a task *to the original client*.
174      This topic is client and task-specific.
175      """
176      if not client_id:
177          raise ValueError("Client ID cannot be empty.")
178      if not task_id:
179          raise ValueError("Task ID cannot be empty.")
180      return f"{get_a2a_base_topic(namespace)}/client/status/{client_id}/{task_id}"
181  
182  
183  def get_client_status_subscription_topic(namespace: str, client_id: str) -> str:
184      """
185      Returns the wildcard topic for a client to subscribe to receive status updates
186      for tasks it initiated. Includes the client's own ID.
187      """
188      if not client_id:
189          raise ValueError("Client ID cannot be empty.")
190      return f"{get_a2a_base_topic(namespace)}/client/status/{client_id}/>"
191  
192  
193  def get_sam_events_topic(namespace: str, category: str, action: str) -> str:
194      """Returns SAM system events topic."""
195      if not namespace:
196          raise ValueError("Namespace cannot be empty.")
197      if not category:
198          raise ValueError("Category cannot be empty.")
199      if not action:
200          raise ValueError("Action cannot be empty.")
201      return f"{namespace.rstrip('/')}/sam/events/{category}/{action}"
202  
203  
204  def get_feedback_topic(namespace: str) -> str:
205      """Returns the topic for publishing user feedback events."""
206      return f"{namespace.rstrip('/')}/sam/v1/feedback/submit"
207  
208  
209  def get_sam_events_subscription_topic(namespace: str, category: str) -> str:
210      """Returns SAM system events subscription topic."""
211      if not namespace:
212          raise ValueError("Namespace cannot be empty.")
213      if not category:
214          raise ValueError("Category cannot be empty.")
215      return f"{namespace.rstrip('/')}/sam/events/{category}/>"
216  
217  
218  def get_trust_card_topic(namespace: str, component_type: str, component_id: str) -> str:
219      """
220      Returns the topic for publishing a Trust Card.
221  
222      IMPORTANT: The component_id parameter MUST be the exact broker client-username
223      that the component uses to authenticate with the Solace broker. This is critical
224      for trust verification - trust cards are validated against the actual broker
225      authentication identity.
226  
227      Args:
228          namespace: SAM namespace
229          component_type: Type of component ("gateway", "agent", etc.)
230          component_id: MUST be the broker client-username (from broker_username config).
231                       DO NOT use arbitrary IDs like agent_name or gateway_id unless they
232                       match the broker_username exactly.
233  
234      Returns:
235          Topic string: {namespace}/a2a/v1/trust/{component_type}/{component_id}
236  
237      Raises:
238          ValueError: If any parameter is empty
239  
240      Security Note:
241          Trust card verification relies on matching the topic component_id with the
242          authenticated broker client-username. Using a different value breaks the
243          security model and trust chain verification.
244      """
245      if not namespace:
246          raise ValueError("Namespace cannot be empty.")
247      if not component_type:
248          raise ValueError("Component type cannot be empty.")
249      if not component_id:
250          raise ValueError("Component ID cannot be empty.")
251      return f"{get_a2a_base_topic(namespace)}/trust/{component_type}/{component_id}"
252  
253  
254  def get_trust_card_subscription_topic(namespace: str, component_type: Optional[str] = None) -> str:
255      """
256      Returns subscription pattern for Trust Cards.
257      
258      Args:
259          namespace: SAM namespace
260          component_type: Optional - subscribe to specific type, or None for all types
261      
262      Returns:
263          Subscription pattern
264      """
265      if not namespace:
266          raise ValueError("Namespace cannot be empty.")
267      
268      if component_type:
269          return f"{get_a2a_base_topic(namespace)}/trust/{component_type}/*"
270      else:
271          return f"{get_a2a_base_topic(namespace)}/trust/*/*"
272  
273  
274  def extract_trust_card_info_from_topic(topic: str) -> tuple[str, str]:
275      """
276      Extracts component type and ID from trust card topic.
277      
278      Args:
279          topic: Trust card topic
280      
281      Returns:
282          Tuple of (component_type, component_id)
283      
284      Raises:
285          ValueError: If topic format is invalid
286      """
287      parts = topic.split('/')
288      if len(parts) < 6 or parts[1] != 'a2a' or parts[2] != 'v1' or parts[3] != 'trust':
289          raise ValueError(f"Invalid trust card topic format: {topic}")
290      
291      component_type = parts[4]
292      component_id = parts[5]
293      return component_type, component_id
294  
295  
296  def subscription_to_regex(subscription: str) -> str:
297      """Converts a Solace topic subscription string to a regex pattern."""
298      # Escape regex special characters except for Solace wildcards
299      pattern = re.escape(subscription)
300      # Replace Solace single-level wildcard '*' with regex equivalent '[^/]+'
301      pattern = pattern.replace(r"\*", r"[^/]+")
302      # Replace Solace multi-level wildcard '>' at the end with regex equivalent '.*'
303      if pattern.endswith(r"/>"):
304          pattern = pattern[:-1] + r".*"  # Remove escaped '>' and add '.*'
305      return pattern
306  
307  
308  def topic_matches_subscription(topic: str, subscription: str) -> bool:
309      """Checks if a topic matches a Solace subscription pattern."""
310      regex_pattern = subscription_to_regex(subscription)
311      return re.fullmatch(regex_pattern, topic) is not None
312  
313  
314  # --- JSON-RPC Envelope Helpers ---
315  
316  
317  def get_request_id(request: A2ARequest) -> str | int:
318      """Gets the JSON-RPC request ID from any A2A request object."""
319      return request.root.id
320  
321  
322  def get_request_method(request: A2ARequest) -> str:
323      """Gets the JSON-RPC method name from any A2A request object."""
324      return request.root.method
325  
326  
327  def get_message_from_send_request(request: A2ARequest) -> Optional[Message]:
328      """
329      Safely gets the Message object from a SendMessageRequest or
330      SendStreamingMessageRequest. Returns None for other request types.
331      """
332      if isinstance(request.root, (SendMessageRequest, SendStreamingMessageRequest)):
333          return request.root.params.message
334      return None
335  
336  
337  def get_task_id_from_cancel_request(request: A2ARequest) -> Optional[str]:
338      """Safely gets the task ID from a CancelTaskRequest."""
339      if isinstance(request.root, CancelTaskRequest):
340          return request.root.params.id
341      return None
342  
343  
344  def get_response_id(response: JSONRPCResponse) -> Optional[Union[str, int]]:
345      """Safely gets the ID from any JSON-RPC response object."""
346      if hasattr(response.root, "id"):
347          return response.root.id
348      return None
349  
350  
351  def get_response_result(response: JSONRPCResponse) -> Optional[Any]:
352      """Safely gets the result object from any successful JSON-RPC response."""
353      if hasattr(response.root, "result"):
354          return response.root.result
355      return None
356  
357  
358  def get_response_error(response: JSONRPCResponse) -> Optional[JSONRPCError]:
359      """Safely gets the error object from any JSON-RPC error response."""
360      if hasattr(response.root, "error"):
361          return response.root.error
362      return None
363  
364  
365  def get_error_message(error: JSONRPCError) -> str:
366      """Safely gets the message string from a JSONRPCError object."""
367      return error.message
368  
369  
370  def get_error_code(error: JSONRPCError) -> int:
371      """Safely gets the code from a JSONRPCError object."""
372      return error.code
373  
374  
375  def get_error_data(error: JSONRPCError) -> Optional[Any]:
376      """Safely gets the data from a JSONRPCError object."""
377      return error.data
378  
379  
380  def create_success_response(
381      result: Any, request_id: Optional[Union[str, int]]
382  ) -> JSONRPCResponse:
383      """
384      Creates a successful JSON-RPC response object by wrapping the result in the
385      appropriate specific success response model based on the result's type.
386  
387      Args:
388          result: The result payload (e.g., Task, TaskStatusUpdateEvent).
389          request_id: The ID of the original request.
390  
391      Returns:
392          A new `JSONRPCResponse` object.
393  
394      Raises:
395          TypeError: If the result type is not a supported A2A model.
396      """
397      specific_response: Any
398      if isinstance(result, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent)):
399          specific_response = SendStreamingMessageSuccessResponse(
400              id=request_id, result=result
401          )
402      elif isinstance(result, Task):
403          # When returning a final task, GetTaskSuccessResponse is a suitable choice.
404          specific_response = GetTaskSuccessResponse(id=request_id, result=result)
405      else:
406          raise TypeError(
407              f"Unsupported result type for create_success_response: {type(result).__name__}"
408          )
409  
410      return JSONRPCResponse(root=specific_response)
411  
412  
413  def create_internal_error_response(
414      message: str,
415      request_id: Optional[Union[str, int]],
416      data: Optional[Dict[str, Any]] = None,
417  ) -> JSONRPCResponse:
418      """
419      Creates a JSON-RPC response object for an InternalError.
420  
421      Args:
422          message: The error message.
423          request_id: The ID of the original request.
424          data: Optional structured data to include with the error.
425  
426      Returns:
427          A new `JSONRPCResponse` object containing an `InternalError`.
428      """
429      error = create_internal_error(message=message, data=data)
430      return JSONRPCResponse(id=request_id, error=error)
431  
432  
433  def create_invalid_request_error_response(
434      message: str,
435      request_id: Optional[Union[str, int]],
436      data: Optional[Any] = None,
437  ) -> JSONRPCResponse:
438      """
439      Creates a JSON-RPC response object for an InvalidRequestError.
440  
441      Args:
442          message: The error message.
443          request_id: The ID of the original request.
444          data: Optional structured data to include with the error.
445  
446      Returns:
447          A new `JSONRPCResponse` object containing an `InvalidRequestError`.
448      """
449      error = create_invalid_request_error(message=message, data=data)
450      return JSONRPCResponse(id=request_id, error=error)
451  
452  
453  def create_internal_error(
454      message: str,
455      data: Optional[Dict[str, Any]] = None,
456  ) -> InternalError:
457      """
458      Creates an InternalError object.
459  
460      Args:
461          message: The error message.
462          data: Optional structured data to include with the error.
463  
464      Returns:
465          A new `InternalError` object.
466      """
467      return InternalError(message=message, data=data)
468  
469  
470  def create_invalid_request_error(
471      message: str, data: Optional[Any] = None
472  ) -> InvalidRequestError:
473      """
474      Creates an InvalidRequestError object.
475  
476      Args:
477          message: The error message.
478          data: Optional structured data to include with the error.
479  
480      Returns:
481          A new `InvalidRequestError` object.
482      """
483      return InvalidRequestError(message=message, data=data)
484  
485  
486  def create_generic_success_response(
487      result: Any, request_id: Optional[Union[str, int]] = None
488  ) -> JSONRPCSuccessResponse:
489      """
490      Creates a generic successful JSON-RPC response object.
491      Note: This is for non-A2A-spec-compliant endpoints that use a similar structure.
492  
493      Args:
494          result: The result payload for the response.
495          request_id: The ID of the original request.
496  
497      Returns:
498          A new `JSONRPCSuccessResponse` object.
499      """
500      return JSONRPCSuccessResponse(id=request_id, result=result)
501  
502  
503  def create_error_response(
504      error: JSONRPCError,
505      request_id: Optional[Union[str, int]],
506  ) -> JSONRPCResponse:
507      """
508      Creates a JSON-RPC error response object from a given error model.
509  
510      Args:
511          error: The JSONRPCError model instance.
512          request_id: The ID of the original request.
513  
514      Returns:
515          A new `JSONRPCResponse` object containing the error.
516      """
517      return JSONRPCResponse(id=request_id, error=error)
518  
519  
520  def create_cancel_task_request(task_id: str) -> CancelTaskRequest:
521      """
522      Creates a CancelTaskRequest object.
523  
524      Args:
525          task_id: The ID of the task to cancel.
526  
527      Returns:
528          A new `CancelTaskRequest` object.
529      """
530      params = TaskIdParams(id=task_id)
531      return CancelTaskRequest(id=uuid.uuid4().hex, params=params)
532  
533  
534  def create_send_message_request(
535      message: Message,
536      task_id: str,
537      metadata: Optional[Dict[str, Any]] = None,
538  ) -> SendMessageRequest:
539      """
540      Creates a SendMessageRequest object.
541  
542      Args:
543          message: The A2AMessage object to send.
544          task_id: The unique ID for the task.
545          metadata: Optional metadata for the send request.
546  
547      Returns:
548          A new `SendMessageRequest` object.
549      """
550      send_params = MessageSendParams(message=message, metadata=metadata)
551      return SendMessageRequest(id=task_id, params=send_params)
552  
553  
554  def create_send_streaming_message_request(
555      message: Message,
556      task_id: str,
557      metadata: Optional[Dict[str, Any]] = None,
558  ) -> SendStreamingMessageRequest:
559      """
560      Creates a SendStreamingMessageRequest object.
561  
562      Args:
563          message: The A2AMessage object to send.
564          task_id: The unique ID for the task.
565          metadata: Optional metadata for the send request.
566  
567      Returns:
568          A new `SendStreamingMessageRequest` object.
569      """
570      send_params = MessageSendParams(message=message, metadata=metadata)
571      return SendStreamingMessageRequest(id=task_id, params=send_params)
572  
573  
574  def create_send_message_success_response(
575      result: Union[Task, Message], request_id: Optional[Union[str, int]]
576  ) -> SendMessageSuccessResponse:
577      """
578      Creates a SendMessageSuccessResponse object.
579  
580      Args:
581          result: The result payload (Task or Message).
582          request_id: The ID of the original request.
583  
584      Returns:
585          A new `SendMessageSuccessResponse` object.
586      """
587      return SendMessageSuccessResponse(id=request_id, result=result)
588  
589  
590  def create_send_streaming_message_success_response(
591      result: Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent],
592      request_id: Optional[Union[str, int]],
593  ) -> SendStreamingMessageSuccessResponse:
594      """
595      Creates a SendStreamingMessageSuccessResponse object.
596  
597      Args:
598          result: The result payload.
599          request_id: The ID of the original request.
600  
601      Returns:
602          A new `SendStreamingMessageSuccessResponse` object.
603      """
604      return SendStreamingMessageSuccessResponse(id=request_id, result=result)
605  
606  
607  def extract_task_id_from_topic(
608      topic: str, subscription_pattern: str, log_identifier: str
609  ) -> Optional[str]:
610      """Extracts the task ID from the end of a topic string based on the subscription."""
611      base_regex_str = subscription_to_regex(subscription_pattern).replace(r".*", "")
612      match = re.match(base_regex_str, topic)
613      if match:
614          task_id_part = topic[match.end() :]
615          task_id = task_id_part.lstrip("/")
616          if task_id:
617              log.debug(
618                  "%s Extracted Task ID '%s' from topic '%s'",
619                  log_identifier,
620                  task_id,
621                  topic,
622              )
623              return task_id
624      log.warning(
625          "%s Could not extract Task ID from topic '%s' using pattern '%s'",
626          log_identifier,
627          topic,
628          subscription_pattern,
629      )
630      return None
631  
632  
633  # --- Client Event Helpers ---
634  
635  
636  def is_client_event(obj: Any) -> bool:
637      """
638      Checks if an object is a ClientEvent tuple (Task, UpdateEvent).
639  
640      A ClientEvent is a tuple with 2 elements where the first element is a Task
641      and the second is either a TaskStatusUpdateEvent, TaskArtifactUpdateEvent, or None.
642  
643      Args:
644          obj: The object to check.
645  
646      Returns:
647          True if the object is a ClientEvent tuple, False otherwise.
648      """
649      if not isinstance(obj, tuple) or len(obj) != 2:
650          return False
651      
652      task, update_event = obj
653      
654      # First element must be a Task
655      if not isinstance(task, Task):
656          return False
657      
658      # Second element must be an update event or None
659      if update_event is not None and not isinstance(
660          update_event, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
661      ):
662          return False
663      
664      return True
665  
666  
667  def is_message_object(obj: Any) -> bool:
668      """
669      Checks if an object is a Message.
670  
671      Args:
672          obj: The object to check.
673  
674      Returns:
675          True if the object is a Message, False otherwise.
676      """
677      return isinstance(obj, Message)
678  
679  
680  def unpack_client_event(
681      event: tuple,
682  ) -> Tuple[Task, Optional[Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent]]]:
683      """
684      Safely unpacks a ClientEvent tuple into its components.
685  
686      Args:
687          event: A ClientEvent tuple (Task, UpdateEvent).
688  
689      Returns:
690          A tuple of (Task, Optional[UpdateEvent]) where UpdateEvent can be
691          TaskStatusUpdateEvent, TaskArtifactUpdateEvent, or None.
692  
693      Raises:
694          ValueError: If the event is not a valid ClientEvent tuple.
695      """
696      if not is_client_event(event):
697          raise ValueError(
698              f"Expected a ClientEvent tuple, got {type(event).__name__}"
699          )
700      
701      task, update_event = event
702      return task, update_event