component.py
1 """ 2 Base Component class for Gateway implementations in the Solace AI Connector. 3 """ 4 5 import logging 6 import asyncio 7 import base64 8 import queue 9 import re 10 import uuid 11 from datetime import datetime, timezone 12 from typing import Any, Dict, Optional, List, Tuple, Union 13 14 from google.adk.artifacts import BaseArtifactService 15 16 from ...common.agent_registry import AgentRegistry 17 from ...common.gateway_registry import GatewayRegistry 18 from ...common.sac.sam_component_base import SamComponentBase 19 from ...core_a2a.service import CoreA2AService 20 from ...agent.adk.services import initialize_artifact_service 21 from ...common.services.identity_service import ( 22 BaseIdentityService, 23 create_identity_service, 24 ) 25 from .task_context import TaskContextManager 26 from .auth_interface import AuthHandler 27 from .. import constants 28 from ...common.a2a.types import ContentPart 29 from ...common.utils.rbac_utils import validate_agent_access 30 from a2a.types import ( 31 Message as A2AMessage, 32 AgentCard, 33 JSONRPCResponse, 34 Task, 35 TaskState, 36 TaskStatusUpdateEvent, 37 TaskArtifactUpdateEvent, 38 JSONRPCError, 39 TextPart, 40 DataPart, 41 FilePart, 42 FileWithBytes, 43 Artifact as A2AArtifact, 44 ) 45 from ...common import a2a 46 from ...common.a2a.utils import is_gateway_card 47 from ...common.utils.embeds import ( 48 resolve_embeds_in_string, 49 evaluate_embed, 50 LATE_EMBED_TYPES, 51 EARLY_EMBED_TYPES, 52 resolve_embeds_recursively_in_string, 53 ) 54 from ...common.utils.embeds.types import ResolutionMode 55 from ...agent.utils.artifact_helpers import ( 56 load_artifact_content_or_metadata, 57 format_artifact_uri, 58 ) 59 from ...common.utils.mime_helpers import is_text_based_mime_type 60 from solace_ai_connector.common.message import ( 61 Message as SolaceMessage, 62 ) 63 from solace_ai_connector.common.event import Event, EventType 64 from abc import abstractmethod 65 66 from ...common.middleware.registry import MiddlewareRegistry 67 68 log = logging.getLogger(__name__) 69 70 info = { 71 "class_name": "BaseGatewayComponent", 72 "description": ( 73 "Abstract base component for A2A gateways. Handles common service " 74 "initialization and provides a framework for platform-specific logic. " 75 "Configuration is typically derived from the parent BaseGatewayApp's app_config." 76 ), 77 "config_parameters": [], 78 "input_schema": { 79 "type": "object", 80 "description": "Not typically used directly; component reacts to events from its input queue.", 81 }, 82 "output_schema": { 83 "type": "object", 84 "description": "Not typically used directly; component sends data to external systems.", 85 }, 86 } 87 88 89 class BaseGatewayComponent(SamComponentBase): 90 """ 91 Abstract base class for Gateway components. 92 93 Initializes shared services and manages the core lifecycle for processing 94 A2A messages and interacting with an external communication platform. 95 """ 96 97 _RESOLVE_EMBEDS_IN_FINAL_RESPONSE = False 98 99 def get_config(self, key: str, default: Any = None) -> Any: 100 """ 101 Overrides the default get_config to first look inside the nested 102 'app_config' dictionary that BaseGatewayApp places in the component_config. 103 This is the primary way gateway components should access their configuration. 104 """ 105 if "app_config" in self.component_config: 106 value = self.component_config["app_config"].get(key) 107 if value is not None: 108 return value 109 110 return super().get_config(key, default) 111 112 def __init__( 113 self, 114 resolve_artifact_uris_in_gateway: bool = True, 115 supports_inline_artifact_resolution: bool = False, 116 filter_tool_data_parts: bool = True, 117 **kwargs: Any 118 ): 119 """ 120 Initialize the BaseGatewayComponent. 121 122 Args: 123 resolve_artifact_uris_in_gateway: If True, resolves artifact URIs before sending to external. 124 supports_inline_artifact_resolution: If True, SIGNAL_ARTIFACT_RETURN embeds are converted 125 to FileParts during embed resolution. If False (default), signals are passed through 126 for the gateway to handle manually. Use False for legacy gateways (e.g., Slack), 127 True for modern gateways that support inline artifact rendering (e.g., HTTP SSE). 128 filter_tool_data_parts: If True (default), filters out tool-related DataParts (tool_call, 129 tool_result, etc.) from final Task messages before sending to gateway. Use True for 130 gateways that don't want to display internal tool execution details (e.g., Slack), 131 False for gateways that display all parts (e.g., HTTP SSE Web UI). 132 **kwargs: Additional arguments passed to parent class. 133 """ 134 super().__init__(info, **kwargs) 135 self.resolve_artifact_uris_in_gateway = resolve_artifact_uris_in_gateway 136 self.supports_inline_artifact_resolution = supports_inline_artifact_resolution 137 self.filter_tool_data_parts = filter_tool_data_parts 138 log.info("%s Initializing Base Gateway Component...", self.log_identifier) 139 140 try: 141 # Note: self.namespace and self.max_message_size_bytes are initialized in SamComponentBase 142 self.gateway_id: str = self.get_config("gateway_id") 143 if not self.gateway_id: 144 raise ValueError("Gateway ID must be configured in the app_config.") 145 146 self.enable_embed_resolution: bool = self.get_config( 147 "enable_embed_resolution", True 148 ) 149 self.gateway_max_artifact_resolve_size_bytes: int = self.get_config( 150 "gateway_max_artifact_resolve_size_bytes" 151 ) 152 self.gateway_recursive_embed_depth: int = self.get_config( 153 "gateway_recursive_embed_depth" 154 ) 155 self.artifact_handling_mode: str = self.get_config( 156 "artifact_handling_mode", "embed" 157 ) 158 _ = self.get_config("artifact_service") 159 160 log.info( 161 "%s Retrieved common configs: Namespace=%s, GatewayID=%s", 162 self.log_identifier, 163 self.namespace, 164 self.gateway_id, 165 ) 166 167 except Exception as e: 168 log.error( 169 "%s Failed to retrieve essential configuration: %s", 170 self.log_identifier, 171 e, 172 ) 173 raise ValueError(f"Configuration retrieval error: {e}") from e 174 175 self.agent_registry: AgentRegistry = AgentRegistry() 176 self.gateway_registry: GatewayRegistry = GatewayRegistry() 177 self.core_a2a_service: CoreA2AService = CoreA2AService( 178 agent_registry=self.agent_registry, 179 namespace=self.namespace, 180 component_id="WebUI" 181 ) 182 self.shared_artifact_service: Optional[BaseArtifactService] = ( 183 initialize_artifact_service(self) 184 ) 185 186 self.task_context_manager: TaskContextManager = TaskContextManager() 187 self.internal_event_queue: queue.Queue = queue.Queue() 188 189 identity_service_config = self.get_config("identity_service") 190 self.identity_service: Optional[BaseIdentityService] = create_identity_service( 191 identity_service_config, self 192 ) 193 194 self._config_resolver = MiddlewareRegistry.get_config_resolver() 195 log.info( 196 "%s Middleware system initialized (using default configuration resolver).", 197 self.log_identifier, 198 ) 199 200 self._gateway_card_publishing_config = self.get_config( 201 "gateway_card_publishing", 202 {"enabled": True, "interval_seconds": 30} 203 ) 204 self._gateway_card_config = self.get_config("gateway_card", {}) 205 self._gateway_card_timer_id = f"publish_gateway_card_{self.gateway_id}" 206 207 # Task timeout: max idle time (seconds) waiting for agent activity before canceling 208 self.task_timeout_seconds: int = self.get_config( 209 "task_timeout_seconds", constants.DEFAULT_TASK_TIMEOUT_SECONDS 210 ) 211 212 # Authentication handler (optional, enterprise feature) 213 self.auth_handler: Optional[AuthHandler] = None 214 215 # Setup authentication if enabled (subclasses override _setup_auth) 216 self._setup_auth() 217 218 log.info( 219 "%s Initialized Base Gateway Component.", self.log_identifier 220 ) 221 222 def _setup_auth(self) -> None: 223 """ 224 Setup authentication handler if enabled. 225 226 This method is called during initialization and can be overridden 227 by subclasses to customize auth setup. The default implementation 228 does nothing - subclasses should override to enable auth. 229 230 Example override in subclass: 231 def _setup_auth(self): 232 if self.get_config('enable_auth', False): 233 from enterprise.auth import SAMOAuth2Handler 234 self.auth_handler = SAMOAuth2Handler(self.config) 235 """ 236 # Base implementation: no auth 237 # Subclasses (like GenericGateway) override to enable auth 238 pass 239 240 async def _inject_auth_headers(self, headers: Dict[str, str]) -> Dict[str, str]: 241 """ 242 Inject authentication headers if authenticated. 243 244 This helper method should be called before making outgoing HTTP requests 245 to add authentication headers (e.g., Bearer tokens) to the request. 246 247 Args: 248 headers: Existing headers dictionary 249 250 Returns: 251 Headers dictionary with auth headers added (if authenticated) 252 253 Example: 254 headers = {"Content-Type": "application/json"} 255 headers = await self._inject_auth_headers(headers) 256 # headers now includes Authorization if authenticated 257 """ 258 if self.auth_handler: 259 try: 260 auth_headers = await self.auth_handler.get_auth_headers() 261 headers.update(auth_headers) 262 except Exception as e: 263 log.warning( 264 "%s Failed to get auth headers: %s", 265 self.log_identifier, 266 e 267 ) 268 269 return headers 270 271 async def authenticate_and_enrich_user( 272 self, external_event_data: Any 273 ) -> Optional[Dict[str, Any]]: 274 """ 275 Orchestrates the full authentication and identity enrichment flow. 276 This method should be called by gateway handlers. 277 """ 278 log_id_prefix = f"{self.log_identifier}[AuthAndEnrich]" 279 280 auth_claims = await self._extract_initial_claims(external_event_data) 281 if not auth_claims: 282 log.warning( 283 "%s Initial claims extraction failed or returned no identity.", 284 log_id_prefix, 285 ) 286 return None 287 288 if self.identity_service: 289 enriched_profile = await self.identity_service.get_user_profile(auth_claims) 290 if enriched_profile: 291 final_profile = enriched_profile.copy() 292 final_profile.update(auth_claims) 293 log.info( 294 "%s Successfully merged auth claims and enriched profile for user: %s", 295 log_id_prefix, 296 auth_claims.get("id"), 297 ) 298 return final_profile 299 else: 300 log.debug( 301 "%s IdentityService found no profile for user: %s. Using claims only.", 302 log_id_prefix, 303 auth_claims.get("id"), 304 ) 305 306 return auth_claims 307 308 async def submit_a2a_task( 309 self, 310 target_agent_name: str, 311 a2a_parts: List[ContentPart], 312 external_request_context: Dict[str, Any], 313 user_identity: Any, 314 is_streaming: bool = True, 315 api_version: str = "v2", 316 task_id_override: str | None = None, 317 metadata: dict[str, Any] | None = None, 318 ) -> str: 319 log_id_prefix = f"{self.log_identifier}[SubmitA2ATask]" 320 log.info( 321 "%s Submitting task for user_identity: %s", 322 log_id_prefix, 323 user_identity.get("id", user_identity), 324 ) 325 326 if not isinstance(user_identity, dict) or not user_identity.get("id"): 327 log.error( 328 "%s Authentication failed or returned invalid profile. Denying task submission.", 329 log_id_prefix, 330 ) 331 raise PermissionError("User not authenticated or identity is invalid.") 332 333 force_identity_str = self.get_config("force_user_identity") 334 if force_identity_str: 335 original_identity_id = user_identity.get("id") 336 user_identity = {"id": force_identity_str, "name": force_identity_str} 337 log.warning( 338 "%s DEVELOPMENT MODE: Forcing user_identity from '%s' to '%s'", 339 log_id_prefix, 340 original_identity_id, 341 force_identity_str, 342 ) 343 344 config_resolver = MiddlewareRegistry.get_config_resolver() 345 gateway_context = { 346 "gateway_id": self.gateway_id, 347 "gateway_app_config": self.component_config.get("app_config", {}), 348 } 349 350 try: 351 user_config = await config_resolver.resolve_user_config( 352 user_identity, gateway_context, {} 353 ) 354 log.debug( 355 "%s Resolved user configuration for user_identity '%s': %s", 356 log_id_prefix, 357 user_identity.get("id"), 358 {k: v for k, v in user_config.items() if not k.startswith("_")}, 359 ) 360 except Exception as config_err: 361 log.exception( 362 "%s Error resolving user configuration for '%s': %s. Proceeding with default configuration.", 363 log_id_prefix, 364 user_identity.get("id"), 365 config_err, 366 ) 367 user_config = {} 368 369 user_config["user_profile"] = user_identity 370 371 # Validate user has permission to access this target agent 372 validate_agent_access( 373 user_config=user_config, 374 target_agent_name=target_agent_name, 375 validation_context={ 376 "gateway_id": self.gateway_id, 377 "source": "gateway_request", 378 }, 379 log_identifier=log_id_prefix, 380 ) 381 382 external_request_context["user_identity"] = user_identity 383 external_request_context["a2a_user_config"] = user_config 384 external_request_context["api_version"] = api_version 385 external_request_context["is_streaming"] = is_streaming 386 external_request_context["target_agent_name"] = target_agent_name 387 log.debug( 388 "%s Stored user_identity, configuration, api_version (%s), and is_streaming (%s) in external_request_context.", 389 log_id_prefix, 390 api_version, 391 is_streaming, 392 ) 393 394 now = datetime.now(timezone.utc) 395 timestamp_str = now.isoformat() 396 timestamp_header_part = TextPart( 397 text=f"Request received by gateway at: {timestamp_str}" 398 ) 399 if not isinstance(a2a_parts, list): 400 a2a_parts = list(a2a_parts) 401 a2a_parts.insert(0, timestamp_header_part) 402 log.debug("%s Prepended timestamp to a2a_parts.", log_id_prefix) 403 404 a2a_session_id = external_request_context.get("a2a_session_id") 405 user_id_for_a2a = external_request_context.get( 406 "user_id_for_a2a", user_identity.get("id") 407 ) 408 409 system_purpose = self.get_config("system_purpose", "") 410 response_format = self.get_config("response_format", "") 411 412 if not a2a_session_id: 413 a2a_session_id = f"gdk-session-{uuid.uuid4().hex}" 414 log.warning( 415 "%s 'a2a_session_id' not found in external_request_context, generated: %s", 416 self.log_identifier, 417 a2a_session_id, 418 ) 419 external_request_context["a2a_session_id"] = a2a_session_id 420 421 a2a_metadata = { 422 "agent_name": target_agent_name, 423 "system_purpose": system_purpose, 424 "response_format": response_format, 425 } 426 427 # Add session behavior if provided by adapter 428 session_behavior = external_request_context.get("session_behavior") 429 if session_behavior: 430 a2a_metadata["sessionBehavior"] = session_behavior 431 log.debug( 432 "%s Setting sessionBehavior to: %s", log_id_prefix, session_behavior 433 ) 434 435 invoked_artifacts = external_request_context.get("invoked_with_artifacts") 436 if invoked_artifacts: 437 a2a_metadata["invoked_with_artifacts"] = invoked_artifacts 438 log.debug( 439 "%s Found %d artifact identifiers in external context to pass to agent.", 440 log_id_prefix, 441 len(invoked_artifacts), 442 ) 443 444 if metadata: 445 a2a_metadata.update(metadata) 446 447 # This correlation ID is used by the gateway to track the task 448 if task_id_override: 449 task_id = task_id_override 450 else: 451 task_id = f"gdk-task-{uuid.uuid4().hex}" 452 453 prepared_a2a_parts = await self._prepare_parts_for_publishing( 454 parts=a2a_parts, 455 user_id=user_id_for_a2a, 456 session_id=a2a_session_id, 457 target_agent_name=target_agent_name, 458 ) 459 460 a2a_message = a2a.create_user_message( 461 parts=prepared_a2a_parts, 462 metadata=a2a_metadata, 463 context_id=a2a_session_id, 464 ) 465 466 if is_streaming: 467 a2a_request = a2a.create_send_streaming_message_request( 468 message=a2a_message, task_id=task_id 469 ) 470 else: 471 a2a_request = a2a.create_send_message_request( 472 message=a2a_message, task_id=task_id 473 ) 474 475 payload = a2a_request.model_dump(by_alias=True, exclude_none=True) 476 target_topic = a2a.get_agent_request_topic(self.namespace, target_agent_name) 477 478 user_properties = { 479 "clientId": self.gateway_id, 480 "userId": user_id_for_a2a, 481 } 482 if user_config: 483 user_properties["a2aUserConfig"] = user_config 484 485 # Enterprise feature: Add signed user claims if trust manager available 486 if hasattr(self, "trust_manager") and self.trust_manager: 487 log.debug( 488 "%s Attempting to sign user claims for task %s", 489 log_id_prefix, 490 task_id, 491 ) 492 try: 493 auth_token = self.trust_manager.sign_user_claims( 494 user_info=user_identity, task_id=task_id 495 ) 496 user_properties["authToken"] = auth_token 497 log.debug( 498 "%s Successfully signed user claims for task %s", 499 log_id_prefix, 500 task_id, 501 ) 502 except Exception as e: 503 log.error( 504 "%s Failed to sign user claims for task %s: %s", 505 log_id_prefix, 506 task_id, 507 e, 508 ) 509 # Continue without token - enterprise feature is optional 510 else: 511 log.debug( 512 "%s Trust Manager not available, proceeding without authentication token", 513 log_id_prefix, 514 ) 515 516 user_properties["replyTo"] = a2a.get_gateway_response_topic( 517 self.namespace, self.gateway_id, task_id 518 ) 519 if is_streaming: 520 user_properties["a2aStatusTopic"] = a2a.get_gateway_status_topic( 521 self.namespace, self.gateway_id, task_id 522 ) 523 524 self.task_context_manager.store_context(task_id, external_request_context) 525 log.info("%s Stored external context for task_id: %s", log_id_prefix, task_id) 526 527 self.publish_a2a_message( 528 payload=payload, topic=target_topic, user_properties=user_properties 529 ) 530 log.info( 531 "%s Submitted A2A task %s to agent %s. Streaming: %s", 532 log_id_prefix, 533 task_id, 534 target_agent_name, 535 is_streaming, 536 ) 537 return task_id 538 539 def _handle_message(self, message: SolaceMessage, topic: str) -> None: 540 """ 541 Override to use queue-based pattern instead of direct async. 542 543 Gateway uses an internal queue for message processing to ensure 544 strict ordering and backpressure handling. 545 546 Args: 547 message: The Solace message 548 topic: The topic the message was received on 549 """ 550 log.debug( 551 "%s Received SolaceMessage on topic: %s. Bridging to internal queue.", 552 self.log_identifier, 553 topic, 554 ) 555 556 try: 557 msg_data_for_processor = { 558 "topic": topic, 559 "payload": message.get_payload(), 560 "user_properties": message.get_user_properties(), 561 "_original_broker_message": message, 562 } 563 self.internal_event_queue.put_nowait(msg_data_for_processor) 564 except queue.Full: 565 log.error( 566 "%s Internal event queue full. Cannot bridge message.", 567 self.log_identifier, 568 ) 569 raise 570 except Exception as e: 571 log.exception( 572 "%s Error bridging message to internal queue: %s", 573 self.log_identifier, 574 e, 575 ) 576 raise 577 578 async def _handle_message_async(self, message, topic: str) -> None: 579 """ 580 Not used by gateway - we override _handle_message() instead. 581 582 This is here to satisfy the abstract method requirement, but the 583 gateway uses the queue-based pattern via _handle_message() override. 584 """ 585 raise NotImplementedError( 586 "Gateway uses queue-based message handling via _handle_message() override" 587 ) 588 589 async def _handle_resolved_signals( 590 self, 591 external_request_context: Dict, 592 signals: List[Tuple[None, str, Any]], 593 original_rpc_id: Optional[str], 594 is_finalizing_context: bool = False, 595 ): 596 log_id_prefix = f"{self.log_identifier}[SignalHandler]" 597 if not signals: 598 return 599 600 for signal_tuple in signals: 601 if ( 602 isinstance(signal_tuple, tuple) 603 and len(signal_tuple) == 3 604 and signal_tuple[0] is None 605 ): 606 signal_type = signal_tuple[1] 607 signal_data = signal_tuple[2] 608 609 if signal_type == "SIGNAL_STATUS_UPDATE": 610 status_text = signal_data 611 log.info( 612 "%s Handling SIGNAL_STATUS_UPDATE: '%s'", 613 log_id_prefix, 614 status_text, 615 ) 616 if is_finalizing_context: 617 log.debug( 618 "%s Suppressing SIGNAL_STATUS_UPDATE ('%s') during finalizing context.", 619 log_id_prefix, 620 status_text, 621 ) 622 continue 623 try: 624 signal_a2a_message = a2a.create_agent_data_message( 625 data={ 626 "type": "agent_progress_update", 627 "status_text": status_text, 628 }, 629 part_metadata={"source": "gateway_signal"}, 630 ) 631 a2a_task_id_for_signal = external_request_context.get( 632 "a2a_task_id_for_event", original_rpc_id 633 ) 634 if not a2a_task_id_for_signal: 635 log.error( 636 "%s Cannot determine A2A task ID for signal event. Skipping.", 637 log_id_prefix, 638 ) 639 continue 640 641 signal_event = a2a.create_status_update( 642 task_id=a2a_task_id_for_signal, 643 context_id=external_request_context.get("a2a_session_id"), 644 message=signal_a2a_message, 645 is_final=False, 646 ) 647 await self._send_update_to_external( 648 external_request_context=external_request_context, 649 event_data=signal_event, 650 is_final_chunk_of_update=True, 651 ) 652 log.debug( 653 "%s Sent status signal as TaskStatusUpdateEvent.", 654 log_id_prefix, 655 ) 656 except Exception as e: 657 log.exception( 658 "%s Error sending status signal: %s", log_id_prefix, e 659 ) 660 elif signal_type == "SIGNAL_ARTIFACT_RETURN": 661 # Handle artifact return signal for legacy gateways 662 # During finalizing context (final Task), suppress this to avoid duplicates 663 # since the same signal might appear in both streaming and final responses 664 if is_finalizing_context: 665 log.debug( 666 "%s Suppressing SIGNAL_ARTIFACT_RETURN during finalizing context to avoid duplicate: %s", 667 log_id_prefix, 668 signal_data, 669 ) 670 continue 671 672 log.info( 673 "%s Handling SIGNAL_ARTIFACT_RETURN for legacy gateway: %s", 674 log_id_prefix, 675 signal_data, 676 ) 677 try: 678 filename = signal_data.get("filename") 679 version = signal_data.get("version") 680 681 if not filename: 682 log.error( 683 "%s SIGNAL_ARTIFACT_RETURN missing filename. Skipping.", 684 log_id_prefix, 685 ) 686 continue 687 688 # Load artifact content (not just metadata) for legacy gateways 689 # Legacy gateways like Slack need the actual bytes to upload files 690 artifact_data = await load_artifact_content_or_metadata( 691 self.shared_artifact_service, 692 app_name=external_request_context.get( 693 "app_name_for_artifacts", self.gateway_id 694 ), 695 user_id=external_request_context.get("user_id_for_artifacts"), 696 session_id=external_request_context.get("a2a_session_id"), 697 filename=filename, 698 version=version, 699 load_metadata_only=False, # Load full content for legacy gateways 700 ) 701 702 if artifact_data.get("status") != "success": 703 log.error( 704 "%s Failed to load artifact content for %s v%s", 705 log_id_prefix, 706 filename, 707 version, 708 ) 709 continue 710 711 # Get content and ensure it's bytes 712 content = artifact_data.get("content") 713 if not content: 714 log.error( 715 "%s No content found in artifact %s v%s", 716 log_id_prefix, 717 filename, 718 version, 719 ) 720 continue 721 722 # Convert to bytes if it's a string (text-based artifacts) 723 if isinstance(content, str): 724 content_bytes = content.encode("utf-8") 725 elif isinstance(content, bytes): 726 content_bytes = content 727 else: 728 log.error( 729 "%s Artifact content is neither string nor bytes: %s", 730 log_id_prefix, 731 type(content), 732 ) 733 continue 734 735 # Resolve any late embeds inside the artifact content before returning. 736 content_bytes = await self._resolve_embeds_in_artifact_content( 737 content_bytes=content_bytes, 738 mime_type=artifact_data.get("metadata", {}).get( 739 "mime_type" 740 ), 741 filename=filename, 742 external_request_context=external_request_context, 743 log_id_prefix=log_id_prefix, 744 ) 745 746 # Create FilePart with bytes for legacy gateway to upload 747 file_part = a2a.create_file_part_from_bytes( 748 content_bytes=content_bytes, 749 name=filename, 750 mime_type=artifact_data.get("metadata", {}).get( 751 "mime_type" 752 ), 753 ) 754 755 # Create artifact with the file part 756 # Import Part type for wrapping 757 from a2a.types import Artifact, Part 758 artifact = Artifact( 759 artifact_id=str(uuid.uuid4().hex), 760 parts=[Part(root=file_part)], 761 name=filename, 762 description=f"Artifact: {filename}", 763 ) 764 765 # Send as TaskArtifactUpdateEvent 766 a2a_task_id_for_signal = external_request_context.get( 767 "a2a_task_id_for_event", original_rpc_id 768 ) 769 770 if not a2a_task_id_for_signal: 771 log.error( 772 "%s Cannot determine A2A task ID for artifact signal. Skipping.", 773 log_id_prefix, 774 ) 775 continue 776 777 artifact_event = a2a.create_artifact_update( 778 task_id=a2a_task_id_for_signal, 779 context_id=external_request_context.get("a2a_session_id"), 780 artifact=artifact, 781 ) 782 783 await self._send_update_to_external( 784 external_request_context=external_request_context, 785 event_data=artifact_event, 786 is_final_chunk_of_update=False, 787 ) 788 log.info( 789 "%s Sent artifact signal as TaskArtifactUpdateEvent for %s", 790 log_id_prefix, 791 filename, 792 ) 793 except Exception as e: 794 log.exception( 795 "%s Error sending artifact signal: %s", log_id_prefix, e 796 ) 797 elif signal_type == "SIGNAL_ARTIFACT_CREATION_COMPLETE": 798 # Handle artifact creation completion for legacy gateways 799 # This is similar to SIGNAL_ARTIFACT_RETURN but for newly created artifacts 800 log.info( 801 "%s Handling SIGNAL_ARTIFACT_CREATION_COMPLETE for legacy gateway: %s", 802 log_id_prefix, 803 signal_data, 804 ) 805 try: 806 filename = signal_data.get("filename") 807 version = signal_data.get("version") 808 809 if not filename: 810 log.error( 811 "%s SIGNAL_ARTIFACT_CREATION_COMPLETE missing filename. Skipping.", 812 log_id_prefix, 813 ) 814 continue 815 816 # Load artifact content (not just metadata) for legacy gateways 817 # Legacy gateways like Slack need the actual bytes to upload files 818 artifact_data = await load_artifact_content_or_metadata( 819 self.shared_artifact_service, 820 app_name=external_request_context.get( 821 "app_name_for_artifacts", self.gateway_id 822 ), 823 user_id=external_request_context.get("user_id_for_artifacts"), 824 session_id=external_request_context.get("a2a_session_id"), 825 filename=filename, 826 version=version, 827 load_metadata_only=False, # Load full content for legacy gateways 828 ) 829 830 if artifact_data.get("status") != "success": 831 log.error( 832 "%s Failed to load artifact content for %s v%s", 833 log_id_prefix, 834 filename, 835 version, 836 ) 837 continue 838 839 # Get content and ensure it's bytes 840 content = artifact_data.get("content") 841 if not content: 842 log.error( 843 "%s No content found in artifact %s v%s", 844 log_id_prefix, 845 filename, 846 version, 847 ) 848 continue 849 850 # Convert to bytes if it's a string (text-based artifacts) 851 if isinstance(content, str): 852 content_bytes = content.encode("utf-8") 853 elif isinstance(content, bytes): 854 content_bytes = content 855 else: 856 log.error( 857 "%s Artifact content is neither string nor bytes: %s", 858 log_id_prefix, 859 type(content), 860 ) 861 continue 862 863 # Create FilePart with bytes for legacy gateway to upload 864 file_part = a2a.create_file_part_from_bytes( 865 content_bytes=content_bytes, 866 name=filename, 867 mime_type=signal_data.get("mime_type") or artifact_data.get("metadata", {}).get("mime_type"), 868 ) 869 870 # Create artifact with the file part 871 # Import Part type for wrapping 872 from a2a.types import Artifact, Part 873 artifact = Artifact( 874 artifact_id=str(uuid.uuid4().hex), 875 parts=[Part(root=file_part)], 876 name=filename, 877 description=f"Artifact: {filename}", 878 ) 879 880 # Send as TaskArtifactUpdateEvent 881 a2a_task_id_for_signal = external_request_context.get( 882 "a2a_task_id_for_event", original_rpc_id 883 ) 884 885 if not a2a_task_id_for_signal: 886 log.error( 887 "%s Cannot determine A2A task ID for artifact creation signal. Skipping.", 888 log_id_prefix, 889 ) 890 continue 891 892 artifact_event = a2a.create_artifact_update( 893 task_id=a2a_task_id_for_signal, 894 context_id=external_request_context.get("a2a_session_id"), 895 artifact=artifact, 896 ) 897 898 await self._send_update_to_external( 899 external_request_context=external_request_context, 900 event_data=artifact_event, 901 is_final_chunk_of_update=False, 902 ) 903 log.info( 904 "%s Sent artifact creation completion as TaskArtifactUpdateEvent for %s", 905 log_id_prefix, 906 filename, 907 ) 908 except Exception as e: 909 log.exception( 910 "%s Error sending artifact creation completion signal: %s", log_id_prefix, e 911 ) 912 elif signal_type == "SIGNAL_DEEP_RESEARCH_REPORT": 913 # Handle deep research report signal for legacy gateways 914 # For legacy gateways, we send the report as a file attachment 915 if is_finalizing_context: 916 log.debug( 917 "%s Suppressing SIGNAL_DEEP_RESEARCH_REPORT during finalizing context to avoid duplicate: %s", 918 log_id_prefix, 919 signal_data, 920 ) 921 continue 922 923 try: 924 filename = signal_data.get("filename") 925 version = signal_data.get("version") 926 927 if not filename: 928 log.error( 929 "%s SIGNAL_DEEP_RESEARCH_REPORT missing filename. Skipping.", 930 log_id_prefix, 931 ) 932 continue 933 934 # Load artifact content for legacy gateways 935 artifact_data = await load_artifact_content_or_metadata( 936 self.shared_artifact_service, 937 app_name=external_request_context.get( 938 "app_name_for_artifacts", self.gateway_id 939 ), 940 user_id=external_request_context.get("user_id_for_artifacts"), 941 session_id=external_request_context.get("a2a_session_id"), 942 filename=filename, 943 version=version, 944 load_metadata_only=False, 945 ) 946 947 if artifact_data.get("status") != "success": 948 log.error( 949 "%s Failed to load deep research report content for %s v%s", 950 log_id_prefix, 951 filename, 952 version, 953 ) 954 continue 955 956 content = artifact_data.get("content") 957 if not content: 958 log.error( 959 "%s No content found in deep research report %s v%s", 960 log_id_prefix, 961 filename, 962 version, 963 ) 964 continue 965 966 # Convert to bytes if it's a string 967 if isinstance(content, str): 968 content_bytes = content.encode("utf-8") 969 elif isinstance(content, bytes): 970 content_bytes = content 971 else: 972 log.error( 973 "%s Deep research report content is neither string nor bytes: %s", 974 log_id_prefix, 975 type(content), 976 ) 977 continue 978 979 # Create FilePart with bytes for legacy gateway to upload 980 file_part = a2a.create_file_part_from_bytes( 981 content_bytes=content_bytes, 982 name=filename, 983 mime_type=artifact_data.get("metadata", {}).get( 984 "mime_type", "text/markdown" 985 ), 986 ) 987 988 # Create artifact with the file part 989 from a2a.types import Artifact, Part 990 artifact = Artifact( 991 artifact_id=str(uuid.uuid4().hex), 992 parts=[Part(root=file_part)], 993 name=filename, 994 description=f"Deep Research Report: {filename}", 995 ) 996 997 # Send as TaskArtifactUpdateEvent 998 a2a_task_id_for_signal = external_request_context.get( 999 "a2a_task_id_for_event", original_rpc_id 1000 ) 1001 1002 if not a2a_task_id_for_signal: 1003 log.error( 1004 "%s Cannot determine A2A task ID for deep research report signal. Skipping.", 1005 log_id_prefix, 1006 ) 1007 continue 1008 1009 artifact_event = a2a.create_artifact_update( 1010 task_id=a2a_task_id_for_signal, 1011 context_id=external_request_context.get("a2a_session_id"), 1012 artifact=artifact, 1013 ) 1014 1015 await self._send_update_to_external( 1016 external_request_context=external_request_context, 1017 event_data=artifact_event, 1018 is_final_chunk_of_update=False, 1019 ) 1020 log.info( 1021 "%s Sent deep research report as TaskArtifactUpdateEvent for %s", 1022 log_id_prefix, 1023 filename, 1024 ) 1025 except Exception as e: 1026 log.exception( 1027 "%s Error sending deep research report signal: %s", log_id_prefix, e 1028 ) 1029 else: 1030 log.warning( 1031 "%s Received unhandled signal type during embed resolution: %s", 1032 log_id_prefix, 1033 signal_type, 1034 ) 1035 1036 async def _resolve_embeds_in_artifact_content( 1037 self, 1038 content_bytes: bytes, 1039 mime_type: Optional[str], 1040 filename: str, 1041 external_request_context: Dict[str, Any], 1042 log_id_prefix: str, 1043 ) -> bytes: 1044 """ 1045 Checks if content is text-based and, if so, resolves late embeds within it. 1046 Returns the (potentially modified) content as bytes. 1047 """ 1048 if is_text_based_mime_type(mime_type): 1049 log.info( 1050 "%s Artifact '%s' is text-based (%s). Resolving late embeds.", 1051 log_id_prefix, 1052 filename, 1053 mime_type, 1054 ) 1055 try: 1056 decoded_content = content_bytes.decode("utf-8") 1057 1058 # Construct context and config for the resolver 1059 embed_eval_context = { 1060 "artifact_service": self.shared_artifact_service, 1061 "session_context": { 1062 "app_name": external_request_context.get( 1063 "app_name_for_artifacts", self.gateway_id 1064 ), 1065 "user_id": external_request_context.get( 1066 "user_id_for_artifacts" 1067 ), 1068 "session_id": external_request_context.get("a2a_session_id"), 1069 }, 1070 } 1071 embed_eval_config = { 1072 "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes, 1073 "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth, 1074 } 1075 1076 resolved_string = await resolve_embeds_recursively_in_string( 1077 text=decoded_content, 1078 context=embed_eval_context, 1079 resolver_func=evaluate_embed, 1080 types_to_resolve=LATE_EMBED_TYPES, 1081 resolution_mode=ResolutionMode.RECURSIVE_ARTIFACT_CONTENT, 1082 log_identifier=f"{log_id_prefix}[RecursiveResolve]", 1083 config=embed_eval_config, 1084 max_depth=self.gateway_recursive_embed_depth, 1085 ) 1086 resolved_bytes = resolved_string.encode("utf-8") 1087 log.info( 1088 "%s Successfully resolved embeds in '%s'. New size: %d bytes.", 1089 log_id_prefix, 1090 filename, 1091 len(resolved_bytes), 1092 ) 1093 return resolved_bytes 1094 except Exception as resolve_err: 1095 log.error( 1096 "%s Failed to resolve embeds within artifact '%s': %s. Returning raw content.", 1097 log_id_prefix, 1098 filename, 1099 resolve_err, 1100 ) 1101 return content_bytes 1102 1103 async def _resolve_uri_in_file_part( 1104 self, file_part: FilePart, external_request_context: Dict[str, Any] 1105 ): 1106 """ 1107 Checks if a FilePart has a resolvable URI and, if so, 1108 resolves it and mutates the part in-place by calling the common utility. 1109 After resolving the URI, it also resolves any late embeds within the content. 1110 """ 1111 await a2a.resolve_file_part_uri( 1112 part=file_part, 1113 artifact_service=self.shared_artifact_service, 1114 log_identifier=self.log_identifier, 1115 ) 1116 1117 # After resolving the URI to get the content, resolve any late embeds inside it. 1118 if file_part.file and isinstance(file_part.file, FileWithBytes): 1119 # The content is a base64 encoded string in the `bytes` attribute. 1120 # We need to decode it to raw bytes for processing. 1121 try: 1122 content_bytes = base64.b64decode(file_part.file.bytes) 1123 except Exception as e: 1124 log.error( 1125 "%s Failed to base64 decode file content for embed resolution: %s", 1126 f"{self.log_identifier}[UriResolve]", 1127 e, 1128 ) 1129 return 1130 1131 resolved_bytes = await self._resolve_embeds_in_artifact_content( 1132 content_bytes=content_bytes, 1133 mime_type=file_part.file.mime_type, 1134 filename=file_part.file.name, 1135 external_request_context=external_request_context, 1136 log_id_prefix=f"{self.log_identifier}[UriResolve]", 1137 ) 1138 # Re-encode the resolved content back to a base64 string for the FileWithBytes model. 1139 file_part.file.bytes = base64.b64encode(resolved_bytes).decode("utf-8") 1140 1141 async def _resolve_uris_in_parts_list( 1142 self, parts: List[ContentPart], external_request_context: Dict[str, Any] 1143 ): 1144 """Iterates over a list of part objects and resolves any FilePart URIs.""" 1145 if not parts: 1146 return 1147 for part in parts: 1148 if isinstance(part, FilePart): 1149 await self._resolve_uri_in_file_part(part, external_request_context) 1150 1151 async def _resolve_uris_in_payload( 1152 self, parsed_event: Any, external_request_context: Dict[str, Any] 1153 ): 1154 """ 1155 Dispatcher that calls the appropriate targeted URI resolver based on the 1156 Pydantic model type of the event. 1157 """ 1158 parts_to_resolve: List[ContentPart] = [] 1159 if isinstance(parsed_event, TaskStatusUpdateEvent): 1160 message = a2a.get_message_from_status_update(parsed_event) 1161 if message: 1162 parts_to_resolve.extend(a2a.get_parts_from_message(message)) 1163 elif isinstance(parsed_event, TaskArtifactUpdateEvent): 1164 artifact = a2a.get_artifact_from_artifact_update(parsed_event) 1165 if artifact: 1166 parts_to_resolve.extend(a2a.get_parts_from_artifact(artifact)) 1167 elif isinstance(parsed_event, Task): 1168 if parsed_event.status and parsed_event.status.message: 1169 parts_to_resolve.extend( 1170 a2a.get_parts_from_message(parsed_event.status.message) 1171 ) 1172 if parsed_event.artifacts: 1173 for artifact in parsed_event.artifacts: 1174 parts_to_resolve.extend(a2a.get_parts_from_artifact(artifact)) 1175 1176 if parts_to_resolve: 1177 await self._resolve_uris_in_parts_list( 1178 parts_to_resolve, external_request_context 1179 ) 1180 else: 1181 log.debug( 1182 "%s Payload type '%s' did not yield any parts for URI resolution. Skipping.", 1183 self.log_identifier, 1184 type(parsed_event).__name__, 1185 ) 1186 1187 async def _handle_discovery_message(self, payload: Dict) -> bool: 1188 """Handles incoming agent and gateway discovery messages.""" 1189 try: 1190 agent_card = AgentCard(**payload) 1191 1192 # Route to appropriate registry based on card type 1193 if is_gateway_card(agent_card): 1194 # This is a gateway card - track in gateway registry 1195 is_new = self.gateway_registry.add_or_update_gateway(agent_card) 1196 if is_new: 1197 gateway_type = self.gateway_registry.get_gateway_type(agent_card.name) 1198 log.info( 1199 "%s New gateway discovered: %s (type: %s)", 1200 self.log_identifier, 1201 agent_card.name, 1202 gateway_type or "unknown" 1203 ) 1204 else: 1205 log.debug( 1206 "%s Gateway heartbeat received: %s", 1207 self.log_identifier, 1208 agent_card.name 1209 ) 1210 else: 1211 # This is an agent card - use existing logic 1212 self.core_a2a_service.process_discovery_message(agent_card) 1213 1214 return True 1215 except Exception as e: 1216 log.error( 1217 "%s Failed to process discovery message: %s. Payload: %s", 1218 self.log_identifier, 1219 e, 1220 payload, 1221 ) 1222 return False 1223 1224 async def _prepare_parts_for_publishing( 1225 self, 1226 parts: List[ContentPart], 1227 user_id: str, 1228 session_id: str, 1229 target_agent_name: str, 1230 ) -> List[ContentPart]: 1231 """ 1232 Prepares message parts for publishing according to the configured artifact_handling_mode 1233 by calling the common utility function. 1234 """ 1235 processed_parts: List[ContentPart] = [] 1236 for part in parts: 1237 if isinstance(part, FilePart): 1238 processed_part = await a2a.prepare_file_part_for_publishing( 1239 part=part, 1240 mode=self.artifact_handling_mode, 1241 artifact_service=self.shared_artifact_service, 1242 user_id=user_id, 1243 session_id=session_id, 1244 target_agent_name=target_agent_name, 1245 log_identifier=self.log_identifier, 1246 ) 1247 if processed_part: 1248 processed_parts.append(processed_part) 1249 else: 1250 processed_parts.append(part) 1251 return processed_parts 1252 1253 def _should_include_data_part_in_final_output(self, part: Any) -> bool: 1254 """ 1255 Determines if a DataPart should be included in the final output sent to the gateway. 1256 1257 This filters out internal/tool-related DataParts that shouldn't be shown to end users. 1258 Gateways can override this method for custom filtering logic. 1259 1260 Args: 1261 part: The part to check (expected to be a DataPart) 1262 1263 Returns: 1264 True if the part should be included, False if it should be filtered out 1265 """ 1266 from a2a.types import DataPart 1267 1268 if not isinstance(part, DataPart): 1269 return True 1270 1271 # Check if this is a tool result by looking at metadata 1272 # Tool results have metadata.tool_name set 1273 if part.metadata and part.metadata.get("tool_name"): 1274 # This is a tool result - filter it out 1275 return False 1276 1277 # Get the type of the data part 1278 data_type = part.data.get("type") if part.data else None 1279 1280 # Filter out tool-related data parts that are internal 1281 tool_related_types = { 1282 "tool_call", 1283 "tool_result", 1284 "tool_error", 1285 "tool_execution", 1286 } 1287 1288 if data_type in tool_related_types: 1289 return False 1290 1291 # Handle artifact_creation_progress based on gateway capabilities 1292 if data_type == "artifact_creation_progress": 1293 # For modern gateways (HTTP SSE), keep these to display progress bubbles 1294 # For legacy gateways (Slack), filter them out as they'll be converted to FileParts 1295 if self.supports_inline_artifact_resolution: 1296 return True # Keep for HTTP SSE 1297 else: 1298 return False # Filter for Slack (will be converted to FileParts instead) 1299 1300 # Keep user-facing data parts like general progress updates 1301 user_facing_types = { 1302 "agent_progress_update", 1303 "thinking_content", 1304 } 1305 1306 if data_type in user_facing_types: 1307 return True 1308 1309 # Default: include unknown types (to avoid hiding potentially useful info) 1310 return True 1311 1312 async def _resolve_embeds_and_handle_signals( 1313 self, 1314 event_with_parts: Union[TaskStatusUpdateEvent, Task, TaskArtifactUpdateEvent], 1315 external_request_context: Dict[str, Any], 1316 a2a_task_id: str, 1317 original_rpc_id: Optional[str], 1318 is_finalizing_context: bool = False, 1319 ) -> bool: 1320 if not self.enable_embed_resolution: 1321 return False 1322 1323 log_id_prefix = f"{self.log_identifier}[EmbedResolve:{a2a_task_id}]" 1324 content_modified = False 1325 1326 embed_eval_context = { 1327 "artifact_service": self.shared_artifact_service, 1328 "session_context": { 1329 "app_name": external_request_context.get( 1330 "app_name_for_artifacts", self.gateway_id 1331 ), 1332 "user_id": external_request_context.get("user_id_for_artifacts"), 1333 "session_id": external_request_context.get("a2a_session_id"), 1334 }, 1335 } 1336 embed_eval_config = { 1337 "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes, 1338 "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth, 1339 } 1340 1341 parts_owner: Optional[Union[A2AMessage, A2AArtifact]] = None 1342 if isinstance(event_with_parts, (TaskStatusUpdateEvent, Task)): 1343 if event_with_parts.status and event_with_parts.status.message: 1344 parts_owner = event_with_parts.status.message 1345 elif isinstance(event_with_parts, TaskArtifactUpdateEvent): 1346 if event_with_parts.artifact: 1347 parts_owner = event_with_parts.artifact 1348 1349 if not (parts_owner and parts_owner.parts): 1350 return False 1351 1352 is_streaming_status_update = isinstance(event_with_parts, TaskStatusUpdateEvent) 1353 stream_buffer_key = f"{a2a_task_id}_stream_buffer" 1354 current_buffer = "" 1355 if is_streaming_status_update: 1356 current_buffer = ( 1357 self.task_context_manager.get_context(stream_buffer_key) or "" 1358 ) 1359 1360 original_parts: List[ContentPart] = ( 1361 a2a.get_parts_from_message(parts_owner) 1362 if isinstance(parts_owner, A2AMessage) 1363 else a2a.get_parts_from_artifact(parts_owner) 1364 ) 1365 1366 new_parts: List[ContentPart] = [] 1367 other_signals = [] 1368 1369 for part in original_parts: 1370 if isinstance(part, TextPart) and part.text: 1371 text_to_resolve = current_buffer + part.text 1372 current_buffer = "" # Buffer is now being processed 1373 1374 # Debug: Log the text before embed resolution 1375 log.debug( 1376 "%s Input text for embed resolution (len=%d): %s", 1377 log_id_prefix, 1378 len(text_to_resolve), 1379 text_to_resolve[:500] + "..." if len(text_to_resolve) > 500 else text_to_resolve, 1380 ) 1381 1382 ( 1383 resolved_text, 1384 processed_idx, 1385 signals_with_placeholders, 1386 ) = await resolve_embeds_in_string( 1387 text=text_to_resolve, 1388 context=embed_eval_context, 1389 resolver_func=evaluate_embed, 1390 types_to_resolve=LATE_EMBED_TYPES.union({"status_update"}), 1391 resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER, 1392 log_identifier=log_id_prefix, 1393 config=embed_eval_config, 1394 ) 1395 1396 # Debug: Log the resolved text 1397 log.debug( 1398 "%s Resolved text (processed_idx=%d, signals=%d, len=%d): %s", 1399 log_id_prefix, 1400 processed_idx, 1401 len(signals_with_placeholders), 1402 len(resolved_text), 1403 resolved_text[:500] + "..." if len(resolved_text) > 500 else resolved_text, 1404 ) 1405 1406 if not signals_with_placeholders: 1407 new_parts.append(a2a.create_text_part(text=resolved_text)) 1408 else: 1409 placeholder_map = {p: s for _, s, p in signals_with_placeholders} 1410 split_pattern = ( 1411 f"({'|'.join(re.escape(p) for p in placeholder_map.keys())})" 1412 ) 1413 text_fragments = re.split(split_pattern, resolved_text) 1414 1415 for i, fragment in enumerate(text_fragments): 1416 if not fragment: 1417 continue 1418 if fragment in placeholder_map: 1419 signal_tuple = placeholder_map[fragment] 1420 signal_type, signal_data = signal_tuple[1], signal_tuple[2] 1421 if signal_type == "SIGNAL_ARTIFACT_RETURN": 1422 # Only convert to FilePart if gateway supports inline artifact resolution 1423 if self.supports_inline_artifact_resolution: 1424 try: 1425 filename, version = ( 1426 signal_data["filename"], 1427 signal_data["version"], 1428 ) 1429 artifact_data = ( 1430 await load_artifact_content_or_metadata( 1431 self.shared_artifact_service, 1432 **embed_eval_context["session_context"], 1433 filename=filename, 1434 version=version, 1435 load_metadata_only=True, 1436 ) 1437 ) 1438 if artifact_data.get("status") == "success": 1439 uri = format_artifact_uri( 1440 **embed_eval_context["session_context"], 1441 filename=filename, 1442 version=artifact_data.get("version"), 1443 ) 1444 new_parts.append( 1445 a2a.create_file_part_from_uri( 1446 uri, 1447 filename, 1448 artifact_data.get("metadata", {}).get( 1449 "mime_type" 1450 ), 1451 ) 1452 ) 1453 else: 1454 new_parts.append( 1455 a2a.create_text_part( 1456 f"[Error: Artifact '{filename}' v{version} not found.]" 1457 ) 1458 ) 1459 except Exception as e: 1460 log.exception( 1461 "%s Error handling SIGNAL_ARTIFACT_RETURN: %s", 1462 log_id_prefix, 1463 e, 1464 ) 1465 new_parts.append( 1466 a2a.create_text_part( 1467 f"[Error: Could not retrieve artifact '{signal_data.get('filename')}'.]" 1468 ) 1469 ) 1470 else: 1471 # Legacy gateway mode: pass signal through for gateway to handle 1472 other_signals.append(signal_tuple) 1473 elif signal_type == "SIGNAL_DEEP_RESEARCH_REPORT": 1474 # Deep research reports should be rendered by the frontend component 1475 # For modern gateways (HTTP SSE), create a DataPart with artifact reference 1476 # For legacy gateways, pass through as signal 1477 if self.supports_inline_artifact_resolution: 1478 try: 1479 filename = signal_data["filename"] 1480 version = signal_data["version"] 1481 log.info( 1482 "%s Converting SIGNAL_DEEP_RESEARCH_REPORT to DataPart for frontend rendering: %s v%s", 1483 log_id_prefix, 1484 filename, 1485 version, 1486 ) 1487 # Create a DataPart that the frontend can use to render DeepResearchReportBubble 1488 # The frontend will fetch the artifact content separately 1489 artifact_data = ( 1490 await load_artifact_content_or_metadata( 1491 self.shared_artifact_service, 1492 **embed_eval_context["session_context"], 1493 filename=filename, 1494 version=version, 1495 load_metadata_only=True, 1496 ) 1497 ) 1498 if artifact_data.get("status") == "success": 1499 uri = format_artifact_uri( 1500 **embed_eval_context["session_context"], 1501 filename=filename, 1502 version=artifact_data.get("version"), 1503 ) 1504 # Create a DataPart with deep_research_report type 1505 # This will be rendered by DeepResearchReportBubble in the frontend 1506 data_part = a2a.create_data_part( 1507 data={ 1508 "type": "deep_research_report", 1509 "filename": filename, 1510 "version": artifact_data.get("version"), 1511 "uri": uri, 1512 }, 1513 metadata={"source": "deep_research_tool"}, 1514 ) 1515 new_parts.append(data_part) 1516 else: 1517 new_parts.append( 1518 a2a.create_text_part( 1519 f"[Error: Deep research report '{filename}' v{version} not found.]" 1520 ) 1521 ) 1522 except Exception as e: 1523 log.exception( 1524 "%s Error handling SIGNAL_DEEP_RESEARCH_REPORT: %s", 1525 log_id_prefix, 1526 e, 1527 ) 1528 new_parts.append( 1529 a2a.create_text_part( 1530 f"[Error: Could not retrieve deep research report '{signal_data.get('filename')}'.]" 1531 ) 1532 ) 1533 else: 1534 # Legacy gateway mode: pass signal through for gateway to handle 1535 other_signals.append(signal_tuple) 1536 elif signal_type == "SIGNAL_INLINE_BINARY_CONTENT": 1537 signal_data["content_bytes"] = signal_data.get("bytes") 1538 del signal_data["bytes"] 1539 new_parts.append( 1540 a2a.create_file_part_from_bytes(**signal_data) 1541 ) 1542 else: 1543 other_signals.append(signal_tuple) 1544 else: 1545 # Check if the non-placeholder fragment is just whitespace 1546 # and is between two placeholders. If so, drop it. 1547 is_just_whitespace = not fragment.strip() 1548 prev_fragment_was_placeholder = ( 1549 i > 0 and text_fragments[i - 1] in placeholder_map 1550 ) 1551 next_fragment_is_placeholder = ( 1552 i < len(text_fragments) - 1 1553 and text_fragments[i + 1] in placeholder_map 1554 ) 1555 1556 if ( 1557 is_just_whitespace 1558 and prev_fragment_was_placeholder 1559 and next_fragment_is_placeholder 1560 ): 1561 log.debug( 1562 "%s Dropping whitespace fragment between two file signals.", 1563 log_id_prefix, 1564 ) 1565 continue 1566 1567 new_parts.append(a2a.create_text_part(text=fragment)) 1568 1569 if is_streaming_status_update: 1570 current_buffer = text_to_resolve[processed_idx:] 1571 1572 elif isinstance(part, FilePart) and part.file: 1573 # Handle recursive embeds in text-based FileParts 1574 new_parts.append(part) # Placeholder for now 1575 elif isinstance(part, DataPart): 1576 # Handle special DataPart types 1577 data_type = part.data.get("type") if part.data else None 1578 1579 if data_type == "template_block": 1580 # Resolve template block and replace with resolved text 1581 try: 1582 from ...common.utils.templates import resolve_template_blocks_in_string 1583 1584 # Reconstruct the template block syntax 1585 data_artifact = part.data.get("data_artifact", "") 1586 jsonpath = part.data.get("jsonpath") 1587 limit = part.data.get("limit") 1588 template_content = part.data.get("template_content", "") 1589 1590 # Build params string 1591 params_parts = [f'data="{data_artifact}"'] 1592 if jsonpath: 1593 params_parts.append(f'jsonpath="{jsonpath}"') 1594 if limit is not None: 1595 params_parts.append(f'limit="{limit}"') 1596 params_str = " ".join(params_parts) 1597 1598 # Reconstruct full template block 1599 template_block = f"«««template: {params_str}\n{template_content}\n»»»" 1600 1601 log.debug( 1602 "%s Resolving template block inline: data=%s", 1603 log_id_prefix, 1604 data_artifact, 1605 ) 1606 1607 # Resolve the template 1608 resolved_text = await resolve_template_blocks_in_string( 1609 text=template_block, 1610 artifact_service=self.shared_artifact_service, 1611 session_context={ 1612 "app_name": external_request_context.get( 1613 "app_name_for_artifacts", self.gateway_id 1614 ), 1615 "user_id": external_request_context.get("user_id_for_artifacts"), 1616 "session_id": external_request_context.get("a2a_session_id"), 1617 }, 1618 log_identifier=f"{log_id_prefix}[TemplateResolve]", 1619 ) 1620 1621 log.info( 1622 "%s Template resolved successfully. Output length: %d", 1623 log_id_prefix, 1624 len(resolved_text), 1625 ) 1626 1627 # Replace the DataPart with a TextPart containing the resolved content 1628 new_parts.append(a2a.create_text_part(text=resolved_text)) 1629 1630 except Exception as e: 1631 log.error( 1632 "%s Failed to resolve template block: %s", 1633 log_id_prefix, 1634 e, 1635 exc_info=True, 1636 ) 1637 # Send error message as TextPart 1638 error_text = f"[Template rendering error: {str(e)}]" 1639 new_parts.append(a2a.create_text_part(text=error_text)) 1640 1641 elif ( 1642 data_type == "artifact_creation_progress" 1643 and not self.supports_inline_artifact_resolution 1644 ): 1645 # Legacy gateway mode: convert completed artifact creation to FilePart 1646 status = part.data.get("status") 1647 if status == "completed" and not is_finalizing_context: 1648 # Extract artifact info from the DataPart 1649 filename = part.data.get("filename") 1650 version = part.data.get("version") 1651 mime_type = part.data.get("mime_type") 1652 1653 if filename and version is not None: 1654 log.info( 1655 "%s Converting artifact creation completion to FilePart for legacy gateway: %s v%s", 1656 log_id_prefix, 1657 filename, 1658 version, 1659 ) 1660 # This will be sent as an artifact signal, so don't add to new_parts 1661 # Instead, add to other_signals for processing 1662 signal_tuple = ( 1663 None, 1664 "SIGNAL_ARTIFACT_CREATION_COMPLETE", 1665 { 1666 "filename": filename, 1667 "version": version, 1668 "mime_type": mime_type, 1669 }, 1670 ) 1671 other_signals.append(signal_tuple) 1672 else: 1673 # Missing required info, keep the DataPart as-is 1674 new_parts.append(part) 1675 elif status == "completed" and is_finalizing_context: 1676 # Suppress during finalizing to avoid duplicates 1677 log.debug( 1678 "%s Suppressing artifact creation completion during finalizing context for %s", 1679 log_id_prefix, 1680 part.data.get("filename"), 1681 ) 1682 continue 1683 else: 1684 # Keep in-progress or failed status DataParts 1685 new_parts.append(part) 1686 else: 1687 # Not an artifact creation DataPart, or modern gateway - keep as-is 1688 new_parts.append(part) 1689 else: 1690 new_parts.append(part) 1691 1692 if other_signals: 1693 await self._handle_resolved_signals( 1694 external_request_context, 1695 other_signals, 1696 original_rpc_id, 1697 is_finalizing_context, 1698 ) 1699 1700 if new_parts != original_parts: 1701 content_modified = True 1702 if isinstance(parts_owner, A2AMessage): 1703 if isinstance(event_with_parts, TaskStatusUpdateEvent): 1704 event_with_parts.status.message = a2a.update_message_parts( 1705 parts_owner, new_parts 1706 ) 1707 elif isinstance(event_with_parts, Task): 1708 event_with_parts.status.message = a2a.update_message_parts( 1709 parts_owner, new_parts 1710 ) 1711 elif isinstance(parts_owner, A2AArtifact): 1712 event_with_parts.artifact = a2a.update_artifact_parts( 1713 parts_owner, new_parts 1714 ) 1715 1716 if is_streaming_status_update: 1717 self.task_context_manager.store_context(stream_buffer_key, current_buffer) 1718 1719 return content_modified or bool(other_signals) 1720 1721 async def _process_parsed_a2a_event( 1722 self, 1723 parsed_event: Union[ 1724 Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError 1725 ], 1726 external_request_context: Dict[str, Any], 1727 a2a_task_id: str, 1728 original_rpc_id: Optional[str], 1729 ) -> None: 1730 """ 1731 Processes a parsed A2A event: resolves embeds, handles signals, 1732 sends to external, and manages context. 1733 """ 1734 log_id_prefix = f"{self.log_identifier}[ProcessParsed:{a2a_task_id}]" 1735 is_truly_final_event_for_context_cleanup = False 1736 is_finalizing_context_for_embeds = False 1737 1738 if isinstance(parsed_event, JSONRPCError): 1739 log.warning( 1740 "%s Handling JSONRPCError for task %s.", log_id_prefix, a2a_task_id 1741 ) 1742 await self._send_error_to_external(external_request_context, parsed_event) 1743 is_truly_final_event_for_context_cleanup = True 1744 else: 1745 content_was_modified_or_signals_handled = False 1746 1747 if isinstance(parsed_event, TaskStatusUpdateEvent) and parsed_event.final: 1748 is_finalizing_context_for_embeds = True 1749 elif isinstance(parsed_event, Task): 1750 is_finalizing_context_for_embeds = True 1751 1752 if not isinstance(parsed_event, JSONRPCError): 1753 content_was_modified_or_signals_handled = ( 1754 await self._resolve_embeds_and_handle_signals( 1755 parsed_event, 1756 external_request_context, 1757 a2a_task_id, 1758 original_rpc_id, 1759 is_finalizing_context=is_finalizing_context_for_embeds, 1760 ) 1761 ) 1762 1763 if self.resolve_artifact_uris_in_gateway: 1764 log.debug( 1765 "%s Resolving artifact URIs before sending to external...", 1766 log_id_prefix, 1767 ) 1768 await self._resolve_uris_in_payload( 1769 parsed_event, external_request_context 1770 ) 1771 1772 send_this_event_to_external = True 1773 is_final_chunk_of_status_update = False 1774 1775 if isinstance(parsed_event, TaskStatusUpdateEvent): 1776 # Try enterprise handling for input_required state (OAuth authentication) 1777 if parsed_event.status and parsed_event.status.state == TaskState.input_required: 1778 try: 1779 from solace_agent_mesh_enterprise.auth.input_required import ( 1780 handle_input_required_request, 1781 ) 1782 parsed_event = handle_input_required_request( 1783 parsed_event, 1784 a2a_task_id, 1785 self # Gateway component for caching 1786 ) 1787 except ImportError: 1788 pass # Enterprise not installed 1789 1790 is_final_chunk_of_status_update = parsed_event.final 1791 if ( 1792 not ( 1793 parsed_event.status 1794 and parsed_event.status.message 1795 and parsed_event.status.message.parts 1796 ) 1797 and not parsed_event.metadata 1798 and not is_final_chunk_of_status_update 1799 and not content_was_modified_or_signals_handled 1800 ): 1801 send_this_event_to_external = False 1802 log.debug( 1803 "%s Suppressing empty intermediate status update.", 1804 log_id_prefix, 1805 ) 1806 elif isinstance(parsed_event, TaskArtifactUpdateEvent): 1807 if ( 1808 not (parsed_event.artifact and parsed_event.artifact.parts) 1809 and not content_was_modified_or_signals_handled 1810 ): 1811 send_this_event_to_external = False 1812 log.debug("%s Suppressing empty artifact update.", log_id_prefix) 1813 elif isinstance(parsed_event, Task): 1814 is_truly_final_event_for_context_cleanup = True 1815 1816 if ( 1817 self._RESOLVE_EMBEDS_IN_FINAL_RESPONSE 1818 and parsed_event.status 1819 and parsed_event.status.message 1820 and parsed_event.status.message.parts 1821 ): 1822 log.debug( 1823 "%s Resolving embeds in final task response...", log_id_prefix 1824 ) 1825 message = parsed_event.status.message 1826 combined_text = a2a.get_text_from_message(message) 1827 data_parts = a2a.get_data_parts_from_message(message) 1828 file_parts = a2a.get_file_parts_from_message(message) 1829 non_text_parts = data_parts + file_parts 1830 1831 if combined_text: 1832 embed_eval_context = { 1833 "artifact_service": self.shared_artifact_service, 1834 "session_context": { 1835 "app_name": external_request_context.get( 1836 "app_name_for_artifacts", self.gateway_id 1837 ), 1838 "user_id": external_request_context.get( 1839 "user_id_for_artifacts" 1840 ), 1841 "session_id": external_request_context.get( 1842 "a2a_session_id" 1843 ), 1844 }, 1845 } 1846 embed_eval_config = { 1847 "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes, 1848 "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth, 1849 } 1850 all_embed_types = EARLY_EMBED_TYPES.union(LATE_EMBED_TYPES) 1851 resolved_text, _, signals = await resolve_embeds_in_string( 1852 text=combined_text, 1853 context=embed_eval_context, 1854 resolver_func=evaluate_embed, 1855 types_to_resolve=all_embed_types, 1856 resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER, 1857 log_identifier=log_id_prefix, 1858 config=embed_eval_config, 1859 ) 1860 if signals: 1861 log.debug( 1862 "%s Handling %d signals found during final response embed resolution.", 1863 log_id_prefix, 1864 len(signals), 1865 ) 1866 await self._handle_resolved_signals( 1867 external_request_context, 1868 signals, 1869 original_rpc_id, 1870 is_finalizing_context=True, 1871 ) 1872 1873 new_parts = ( 1874 [a2a.create_text_part(text=resolved_text)] 1875 if resolved_text 1876 else [] 1877 ) 1878 new_parts.extend(non_text_parts) 1879 parsed_event.status.message = a2a.update_message_parts( 1880 message=parsed_event.status.message, 1881 new_parts=new_parts, 1882 ) 1883 log.info( 1884 "%s Final response text updated with resolved embeds.", 1885 log_id_prefix, 1886 ) 1887 1888 final_buffer_key = f"{a2a_task_id}_stream_buffer" 1889 remaining_buffer = self.task_context_manager.get_context( 1890 final_buffer_key 1891 ) 1892 if remaining_buffer: 1893 log.info( 1894 "%s Flushing remaining buffer for task %s before final response.", 1895 log_id_prefix, 1896 a2a_task_id, 1897 ) 1898 embed_eval_context = { 1899 "artifact_service": self.shared_artifact_service, 1900 "session_context": { 1901 "app_name": external_request_context.get( 1902 "app_name_for_artifacts", self.gateway_id 1903 ), 1904 "user_id": external_request_context.get( 1905 "user_id_for_artifacts" 1906 ), 1907 "session_id": external_request_context.get( 1908 "a2a_session_id" 1909 ), 1910 }, 1911 } 1912 embed_eval_config = { 1913 "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes, 1914 "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth, 1915 } 1916 resolved_remaining_text, _, signals = ( 1917 await resolve_embeds_in_string( 1918 text=remaining_buffer, 1919 context=embed_eval_context, 1920 resolver_func=evaluate_embed, 1921 types_to_resolve=LATE_EMBED_TYPES.copy(), 1922 resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER, 1923 log_identifier=log_id_prefix, 1924 config=embed_eval_config, 1925 ) 1926 ) 1927 await self._handle_resolved_signals( 1928 external_request_context, 1929 signals, 1930 original_rpc_id, 1931 is_finalizing_context=True, 1932 ) 1933 if resolved_remaining_text: 1934 flush_message = a2a.create_agent_text_message( 1935 text=resolved_remaining_text 1936 ) 1937 flush_event = a2a.create_status_update( 1938 task_id=a2a_task_id, 1939 context_id=external_request_context.get("a2a_session_id"), 1940 message=flush_message, 1941 is_final=False, 1942 ) 1943 await self._send_update_to_external( 1944 external_request_context, flush_event, True 1945 ) 1946 self.task_context_manager.remove_context(final_buffer_key) 1947 1948 if send_this_event_to_external: 1949 if isinstance(parsed_event, Task): 1950 # Filter DataParts from final Task if gateway has filtering enabled 1951 # This prevents tool results and other internal data from appearing in user-facing output 1952 if ( 1953 self.filter_tool_data_parts 1954 and parsed_event.status 1955 and parsed_event.status.message 1956 and parsed_event.status.message.parts 1957 ): 1958 original_parts = a2a.get_parts_from_message( 1959 parsed_event.status.message 1960 ) 1961 filtered_parts = [ 1962 part 1963 for part in original_parts 1964 if self._should_include_data_part_in_final_output(part) 1965 ] 1966 if len(filtered_parts) != len(original_parts): 1967 log.debug( 1968 "%s Filtered %d DataParts from final Task message", 1969 log_id_prefix, 1970 len(original_parts) - len(filtered_parts), 1971 ) 1972 parsed_event.status.message = a2a.update_message_parts( 1973 parsed_event.status.message, filtered_parts 1974 ) 1975 1976 await self._send_final_response_to_external( 1977 external_request_context, parsed_event 1978 ) 1979 elif isinstance( 1980 parsed_event, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent) 1981 ): 1982 final_chunk_flag = ( 1983 is_final_chunk_of_status_update 1984 if isinstance(parsed_event, TaskStatusUpdateEvent) 1985 else False 1986 ) 1987 await self._send_update_to_external( 1988 external_request_context, parsed_event, final_chunk_flag 1989 ) 1990 1991 # Manage task timeout timer: cancel on final events, reset on intermediate activity 1992 if is_truly_final_event_for_context_cleanup: 1993 self._cancel_task_timeout(a2a_task_id) 1994 else: 1995 self._start_task_timeout(a2a_task_id) 1996 1997 if is_truly_final_event_for_context_cleanup: 1998 log.info( 1999 "%s Truly final event processed for task %s. Closing connections and removing context.", 2000 log_id_prefix, 2001 a2a_task_id, 2002 ) 2003 await self._close_external_connections(external_request_context) 2004 self.task_context_manager.remove_context(a2a_task_id) 2005 self.task_context_manager.remove_context(f"{a2a_task_id}_stream_buffer") 2006 2007 async def _handle_agent_event( 2008 self, topic: str, payload: Dict, task_id_from_topic: str 2009 ) -> bool: 2010 """ 2011 Handles messages received on gateway response and status topics. 2012 Parses the payload, retrieves context using task_id_from_topic, and dispatches for processing. 2013 """ 2014 try: 2015 rpc_response = JSONRPCResponse.model_validate(payload) 2016 except Exception as e: 2017 log.error( 2018 "%s Failed to parse payload as JSONRPCResponse for topic %s (Task ID from topic: %s): %s. Payload: %s", 2019 self.log_identifier, 2020 topic, 2021 task_id_from_topic, 2022 e, 2023 payload, 2024 ) 2025 return False 2026 2027 original_rpc_id = str(a2a.get_response_id(rpc_response)) 2028 2029 external_request_context = self.task_context_manager.get_context( 2030 task_id_from_topic 2031 ) 2032 if not external_request_context: 2033 log.warning( 2034 "%s No external context found for A2A Task ID: %s (from topic). Ignoring message. Topic: %s, RPC ID: %s", 2035 self.log_identifier, 2036 task_id_from_topic, 2037 topic, 2038 original_rpc_id, 2039 ) 2040 return True 2041 2042 external_request_context["a2a_task_id_for_event"] = task_id_from_topic 2043 external_request_context["original_rpc_id"] = original_rpc_id 2044 2045 parsed_event_obj: Union[ 2046 Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError, None 2047 ] = None 2048 error = a2a.get_response_error(rpc_response) 2049 if error: 2050 parsed_event_obj = error 2051 else: 2052 result = a2a.get_response_result(rpc_response) 2053 if result: 2054 # The result is already a parsed Pydantic model. 2055 parsed_event_obj = result 2056 2057 # Validate task ID match 2058 actual_task_id = None 2059 if isinstance(parsed_event_obj, Task): 2060 actual_task_id = parsed_event_obj.id 2061 elif isinstance( 2062 parsed_event_obj, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent) 2063 ): 2064 actual_task_id = parsed_event_obj.task_id 2065 2066 if ( 2067 task_id_from_topic 2068 and actual_task_id 2069 and actual_task_id != task_id_from_topic 2070 ): 2071 log.error( 2072 "%s Task ID mismatch! Expected: %s, Got from payload: %s.", 2073 self.log_identifier, 2074 task_id_from_topic, 2075 actual_task_id, 2076 ) 2077 parsed_event_obj = None 2078 2079 if not parsed_event_obj: 2080 log.error( 2081 "%s Failed to parse or validate A2A event from RPC result for task %s. Result: %s", 2082 self.log_identifier, 2083 task_id_from_topic, 2084 a2a.get_response_result(rpc_response) or "N/A", 2085 ) 2086 generic_error = JSONRPCError( 2087 code=-32000, message="Invalid event structure received from agent." 2088 ) 2089 await self._send_error_to_external(external_request_context, generic_error) 2090 await self._close_external_connections(external_request_context) 2091 self.task_context_manager.remove_context(task_id_from_topic) 2092 self.task_context_manager.remove_context( 2093 f"{task_id_from_topic}_stream_buffer" 2094 ) 2095 return False 2096 2097 try: 2098 await self._process_parsed_a2a_event( 2099 parsed_event_obj, 2100 external_request_context, 2101 task_id_from_topic, 2102 original_rpc_id, 2103 ) 2104 return True 2105 except Exception as e: 2106 log.exception( 2107 "%s Error in _process_parsed_a2a_event for task %s: %s", 2108 self.log_identifier, 2109 task_id_from_topic, 2110 e, 2111 ) 2112 error_obj = JSONRPCError( 2113 code=-32000, message=f"Gateway processing error: {e}" 2114 ) 2115 await self._send_error_to_external(external_request_context, error_obj) 2116 await self._close_external_connections(external_request_context) 2117 self.task_context_manager.remove_context(task_id_from_topic) 2118 self.task_context_manager.remove_context( 2119 f"{task_id_from_topic}_stream_buffer" 2120 ) 2121 return False 2122 2123 # --- Task Timeout Methods --- 2124 2125 def _task_timeout_timer_id(self, task_id: str) -> str: 2126 return f"task_timeout_{task_id}" 2127 2128 def _start_task_timeout(self, task_id: str) -> None: 2129 """Start (or restart) the idle timeout timer for a task.""" 2130 if self.task_timeout_seconds <= 0: 2131 return 2132 timer_id = self._task_timeout_timer_id(task_id) 2133 # Cancel existing timer before starting a new one (reset) 2134 self.cancel_timer(timer_id) 2135 log.debug( 2136 "%s Starting task timeout timer for task %s (%d seconds)", 2137 self.log_identifier, 2138 task_id, 2139 self.task_timeout_seconds, 2140 ) 2141 SamComponentBase.add_timer( 2142 self, 2143 delay_ms=self.task_timeout_seconds * 1000, 2144 timer_id=timer_id, 2145 interval_ms=None, 2146 callback=lambda td, tid=task_id: self._on_task_timeout(tid), 2147 ) 2148 2149 def _cancel_task_timeout(self, task_id: str) -> None: 2150 """Cancel the idle timeout timer for a task.""" 2151 if self.task_timeout_seconds <= 0: 2152 return 2153 self.cancel_timer(self._task_timeout_timer_id(task_id)) 2154 2155 def _on_task_timeout(self, task_id: str) -> None: 2156 """Timer callback when a task times out. Schedules async cleanup on the event loop.""" 2157 log.warning( 2158 "%s Task %s timed out after %d seconds of inactivity", 2159 self.log_identifier, 2160 task_id, 2161 self.task_timeout_seconds, 2162 ) 2163 asyncio.run_coroutine_threadsafe( 2164 self._handle_task_timeout(task_id), self.get_async_loop() 2165 ) 2166 2167 async def _handle_task_timeout(self, task_id: str) -> None: 2168 """Handle a timed-out task. Subclasses should override to send errors to the adapter.""" 2169 log.warning( 2170 "%s Task %s timed out but _handle_task_timeout is not overridden", 2171 self.log_identifier, 2172 task_id, 2173 ) 2174 2175 async def _async_setup_and_run(self) -> None: 2176 """Main async logic for the gateway component.""" 2177 # Call base class to initialize Trust Manager 2178 await super()._async_setup_and_run() 2179 2180 if self._gateway_card_publishing_config.get("enabled", True): 2181 self._start_gateway_card_publishing() 2182 2183 log.info( 2184 "%s Starting _start_listener() to initiate external platform connection.", 2185 self.log_identifier, 2186 ) 2187 self._start_listener() 2188 2189 await self._message_processor_loop() 2190 2191 def _pre_async_cleanup(self) -> None: 2192 """Pre-cleanup actions for the gateway component.""" 2193 # Cleanup Trust Manager if present (ENTERPRISE FEATURE) 2194 if self.trust_manager: 2195 try: 2196 log.info("%s Cleaning up Trust Manager...", self.log_identifier) 2197 self.trust_manager.cleanup(self.cancel_timer) 2198 log.info("%s Trust Manager cleanup complete", self.log_identifier) 2199 except Exception as e: 2200 log.error( 2201 "%s Error during Trust Manager cleanup: %s", self.log_identifier, e 2202 ) 2203 2204 log.info("%s Calling _stop_listener()...", self.log_identifier) 2205 self._stop_listener() 2206 2207 if self.internal_event_queue: 2208 log.info( 2209 "%s Signaling _message_processor_loop to stop by putting sentinel on queue...", 2210 self.log_identifier, 2211 ) 2212 # This unblocks the `self.internal_event_queue.get()` call in the loop 2213 self.internal_event_queue.put(None) 2214 2215 async def _message_processor_loop(self): 2216 log.debug("%s Starting message processor loop as an asyncio task...", self.log_identifier) 2217 loop = self.get_async_loop() 2218 2219 while not self.stop_signal.is_set(): 2220 original_broker_message: Optional[SolaceMessage] = None 2221 item = None 2222 processed_successfully = False 2223 topic = None 2224 2225 try: 2226 item = await loop.run_in_executor(None, self.internal_event_queue.get) 2227 2228 if item is None: 2229 log.info( 2230 "%s Received shutdown sentinel. Exiting message processor loop.", 2231 self.log_identifier, 2232 ) 2233 break 2234 2235 topic = item.get("topic") 2236 payload = item.get("payload") 2237 original_broker_message = item.get("_original_broker_message") 2238 2239 if not topic or payload is None or not original_broker_message: 2240 log.warning( 2241 "%s Invalid item received from internal queue: %s", 2242 self.log_identifier, 2243 item, 2244 ) 2245 processed_successfully = False 2246 continue 2247 2248 if a2a.topic_matches_subscription( 2249 topic, a2a.get_discovery_subscription_topic(self.namespace) 2250 ): 2251 processed_successfully = await self._handle_discovery_message( 2252 payload 2253 ) 2254 elif ( 2255 hasattr(self, "trust_manager") 2256 and self.trust_manager 2257 and self.trust_manager.is_trust_card_topic(topic) 2258 ): 2259 await self.trust_manager.handle_trust_card_message(payload, topic) 2260 processed_successfully = True 2261 elif a2a.topic_matches_subscription( 2262 topic, 2263 a2a.get_gateway_response_subscription_topic( 2264 self.namespace, self.gateway_id 2265 ), 2266 ) or a2a.topic_matches_subscription( 2267 topic, 2268 a2a.get_gateway_status_subscription_topic( 2269 self.namespace, self.gateway_id 2270 ), 2271 ): 2272 task_id_from_topic: Optional[str] = None 2273 response_sub = a2a.get_gateway_response_subscription_topic( 2274 self.namespace, self.gateway_id 2275 ) 2276 status_sub = a2a.get_gateway_status_subscription_topic( 2277 self.namespace, self.gateway_id 2278 ) 2279 2280 if a2a.topic_matches_subscription(topic, response_sub): 2281 task_id_from_topic = a2a.extract_task_id_from_topic( 2282 topic, response_sub, self.log_identifier 2283 ) 2284 elif a2a.topic_matches_subscription(topic, status_sub): 2285 task_id_from_topic = a2a.extract_task_id_from_topic( 2286 topic, status_sub, self.log_identifier 2287 ) 2288 2289 if task_id_from_topic: 2290 processed_successfully = await self._handle_agent_event( 2291 topic, payload, task_id_from_topic 2292 ) 2293 else: 2294 log.error( 2295 "%s Could not extract task_id from topic %s for _handle_agent_event. Ignoring.", 2296 self.log_identifier, 2297 topic, 2298 ) 2299 processed_successfully = False 2300 else: 2301 log.warning( 2302 "%s Received message on unhandled topic: %s. Acknowledging.", 2303 self.log_identifier, 2304 topic, 2305 ) 2306 processed_successfully = True 2307 2308 except queue.Empty: 2309 continue 2310 except asyncio.CancelledError: 2311 log.info("%s Message processor loop cancelled.", self.log_identifier) 2312 break 2313 except Exception as e: 2314 log.exception( 2315 "%s Unhandled error in message processor loop: %s", 2316 self.log_identifier, 2317 e, 2318 ) 2319 processed_successfully = False 2320 await asyncio.sleep(1) 2321 finally: 2322 if original_broker_message: 2323 if processed_successfully: 2324 original_broker_message.call_acknowledgements() 2325 else: 2326 original_broker_message.call_negative_acknowledgements() 2327 log.warning( 2328 "%s NACKed SolaceMessage for topic: %s", 2329 self.log_identifier, 2330 topic or "unknown", 2331 ) 2332 2333 if item and item is not None: 2334 self.internal_event_queue.task_done() 2335 2336 log.info("%s Message processor loop finished.", self.log_identifier) 2337 2338 2339 2340 @abstractmethod 2341 async def _extract_initial_claims( 2342 self, external_event_data: Any 2343 ) -> Optional[Dict[str, Any]]: 2344 """ 2345 Extracts the primary identity claims from a platform-specific event. 2346 This method MUST be implemented by derived gateway components. 2347 2348 Args: 2349 external_event_data: Raw event data from the external platform 2350 (e.g., FastAPIRequest, Slack event dictionary). 2351 2352 Returns: 2353 A dictionary of initial claims, which MUST include an 'id' key. 2354 Example: {"id": "user@example.com", "source": "slack_api"} 2355 Return None if authentication fails. 2356 """ 2357 pass 2358 2359 @abstractmethod 2360 async def _translate_external_input( 2361 self, external_event: Any 2362 ) -> Tuple[str, List[ContentPart], Dict[str, Any]]: 2363 """ 2364 Translates raw platform-specific event data into A2A task parameters. 2365 2366 Args: 2367 external_event: Raw event data from the external platform 2368 (e.g., FastAPIRequest, Slack event dictionary). 2369 2370 Returns: 2371 A tuple containing: 2372 - target_agent_name (str): The name of the A2A agent to target. 2373 - a2a_parts (List[ContentPart]): A list of A2A Part objects. 2374 - external_request_context (Dict[str, Any]): Context for TaskContextManager. 2375 """ 2376 pass 2377 2378 @abstractmethod 2379 def _start_listener(self) -> None: 2380 pass 2381 2382 @abstractmethod 2383 def _stop_listener(self) -> None: 2384 pass 2385 2386 @abstractmethod 2387 async def _send_update_to_external( 2388 self, 2389 external_request_context: Dict[str, Any], 2390 event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], 2391 is_final_chunk_of_update: bool, 2392 ) -> None: 2393 pass 2394 2395 @abstractmethod 2396 async def _send_final_response_to_external( 2397 self, external_request_context: Dict[str, Any], task_data: Task 2398 ) -> None: 2399 pass 2400 2401 @abstractmethod 2402 async def _send_error_to_external( 2403 self, external_request_context: Dict[str, Any], error_data: JSONRPCError 2404 ) -> None: 2405 pass 2406 2407 async def _close_external_connections( 2408 self, external_request_context: Dict[str, Any] 2409 ) -> None: 2410 """Close external connections (e.g., SSE) during context cleanup. 2411 2412 Called after the final event is fully processed, ensuring any 2413 pending status updates are delivered before the connection closes. 2414 Subclasses should override to perform transport-specific cleanup. 2415 """ 2416 pass 2417 2418 def _detect_gateway_type(self) -> str: 2419 """Auto-detect gateway type from component class or configuration.""" 2420 configured_type = self.get_config("gateway_type") 2421 if configured_type: 2422 return configured_type 2423 2424 class_name = self.__class__.__name__ 2425 if "WebUI" in class_name or "HttpSse" in class_name: 2426 return "http_sse" 2427 2428 if hasattr(self, 'adapter') and self.adapter: 2429 adapter_name = self.adapter.__class__.__name__.lower() 2430 if "rest" in adapter_name: 2431 return "rest" 2432 if "slack" in adapter_name: 2433 return "slack" 2434 if "teams" in adapter_name: 2435 return "teams" 2436 2437 return "generic" 2438 2439 def _build_gateway_card(self) -> AgentCard: 2440 """Build gateway discovery card as AgentCard with gateway extension.""" 2441 from a2a.types import AgentCapabilities, AgentExtension 2442 2443 gateway_type = self._detect_gateway_type() 2444 gateway_url = f"solace:{self.namespace}/a2a/v1/gateway/request/{self.gateway_id}" 2445 description = self._gateway_card_config.get( 2446 "description", 2447 f"{gateway_type.upper()} Gateway" 2448 ) 2449 2450 gateway_role_extension = AgentExtension( 2451 uri="https://solace.com/a2a/extensions/sam/gateway-role", 2452 required=False, 2453 params={ 2454 "gateway_id": self.gateway_id, 2455 "gateway_type": gateway_type, 2456 "namespace": self.namespace, 2457 } 2458 ) 2459 2460 extensions = [gateway_role_extension] 2461 2462 deployment_config = self.get_config("deployment", {}) 2463 deployment_id = deployment_config.get("id") if isinstance(deployment_config, dict) else None 2464 if deployment_id: 2465 deployment_extension = AgentExtension( 2466 uri="https://solace.com/a2a/extensions/sam/deployment", 2467 required=False, 2468 params={ 2469 "deployment_id": deployment_id, 2470 } 2471 ) 2472 extensions.append(deployment_extension) 2473 2474 try: 2475 from solace_agent_mesh import __version__ as sam_version 2476 except ImportError: 2477 sam_version = "unknown" 2478 2479 gateway_card = AgentCard( 2480 name=self.gateway_id, 2481 url=gateway_url, 2482 description=description, 2483 version=sam_version, 2484 protocol_version="1.0", 2485 capabilities=AgentCapabilities( 2486 supports_streaming=True, 2487 supports_cancellation=True, 2488 extensions=extensions 2489 ), 2490 default_input_modes=self._gateway_card_config.get("defaultInputModes", ["text"]), 2491 default_output_modes=self._gateway_card_config.get("defaultOutputModes", ["text"]), 2492 skills=self._gateway_card_config.get("skills", []), 2493 ) 2494 2495 return gateway_card 2496 2497 def _publish_gateway_card(self) -> None: 2498 """Publish gateway card to gateway discovery topic.""" 2499 try: 2500 gateway_card = self._build_gateway_card() 2501 discovery_topic = a2a.get_gateway_discovery_topic(self.namespace) 2502 2503 payload = gateway_card.model_dump(by_alias=True, exclude_none=True) 2504 self.publish_a2a_message(payload, discovery_topic) 2505 2506 log.debug( 2507 "%s Published gateway card: gateway_id=%s, type=%s, topic=%s", 2508 self.log_identifier, 2509 self.gateway_id, 2510 self._detect_gateway_type(), 2511 discovery_topic 2512 ) 2513 except Exception as e: 2514 log.error( 2515 "%s Failed to publish gateway card: %s", 2516 self.log_identifier, 2517 e, 2518 exc_info=True 2519 ) 2520 2521 def _start_gateway_card_publishing(self) -> None: 2522 """Start periodic gateway card publishing.""" 2523 interval_seconds = self._gateway_card_publishing_config.get("interval_seconds", 30) 2524 2525 if interval_seconds <= 0: 2526 log.info( 2527 "%s Gateway card publishing disabled (interval_seconds=%d)", 2528 self.log_identifier, 2529 interval_seconds 2530 ) 2531 return 2532 2533 log.info( 2534 "%s Starting gateway card publishing every %d seconds", 2535 self.log_identifier, 2536 interval_seconds 2537 ) 2538 2539 SamComponentBase.add_timer( 2540 self, 2541 delay_ms=1000, 2542 timer_id=self._gateway_card_timer_id, 2543 interval_ms=interval_seconds * 1000, 2544 callback=lambda timer_data: self._publish_gateway_card() 2545 ) 2546 2547 def _get_component_id(self) -> str: 2548 """Returns the gateway ID as the component identifier.""" 2549 return self.gateway_id 2550 2551 def _get_component_type(self) -> str: 2552 """Returns 'gateway' as the component type.""" 2553 return "gateway" 2554 2555 def invoke(self, message, data): 2556 if isinstance(message, SolaceMessage): 2557 message.call_acknowledgements() 2558 log.warning("%s Invoke method called unexpectedly.", self.log_identifier) 2559 return None