/ src / solace_agent_mesh / gateway / base / component.py
component.py
   1  """
   2  Base Component class for Gateway implementations in the Solace AI Connector.
   3  """
   4  
   5  import logging
   6  import asyncio
   7  import base64
   8  import queue
   9  import re
  10  import uuid
  11  from datetime import datetime, timezone
  12  from typing import Any, Dict, Optional, List, Tuple, Union
  13  
  14  from google.adk.artifacts import BaseArtifactService
  15  
  16  from ...common.agent_registry import AgentRegistry
  17  from ...common.gateway_registry import GatewayRegistry
  18  from ...common.sac.sam_component_base import SamComponentBase
  19  from ...core_a2a.service import CoreA2AService
  20  from ...agent.adk.services import initialize_artifact_service
  21  from ...common.services.identity_service import (
  22      BaseIdentityService,
  23      create_identity_service,
  24  )
  25  from .task_context import TaskContextManager
  26  from .auth_interface import AuthHandler
  27  from .. import constants
  28  from ...common.a2a.types import ContentPart
  29  from ...common.utils.rbac_utils import validate_agent_access
  30  from a2a.types import (
  31      Message as A2AMessage,
  32      AgentCard,
  33      JSONRPCResponse,
  34      Task,
  35      TaskState,
  36      TaskStatusUpdateEvent,
  37      TaskArtifactUpdateEvent,
  38      JSONRPCError,
  39      TextPart,
  40      DataPart,
  41      FilePart,
  42      FileWithBytes,
  43      Artifact as A2AArtifact,
  44  )
  45  from ...common import a2a
  46  from ...common.a2a.utils import is_gateway_card
  47  from ...common.utils.embeds import (
  48      resolve_embeds_in_string,
  49      evaluate_embed,
  50      LATE_EMBED_TYPES,
  51      EARLY_EMBED_TYPES,
  52      resolve_embeds_recursively_in_string,
  53  )
  54  from ...common.utils.embeds.types import ResolutionMode
  55  from ...agent.utils.artifact_helpers import (
  56      load_artifact_content_or_metadata,
  57      format_artifact_uri,
  58  )
  59  from ...common.utils.mime_helpers import is_text_based_mime_type
  60  from solace_ai_connector.common.message import (
  61      Message as SolaceMessage,
  62  )
  63  from solace_ai_connector.common.event import Event, EventType
  64  from abc import abstractmethod
  65  
  66  from ...common.middleware.registry import MiddlewareRegistry
  67  
  68  log = logging.getLogger(__name__)
  69  
  70  info = {
  71      "class_name": "BaseGatewayComponent",
  72      "description": (
  73          "Abstract base component for A2A gateways. Handles common service "
  74          "initialization and provides a framework for platform-specific logic. "
  75          "Configuration is typically derived from the parent BaseGatewayApp's app_config."
  76      ),
  77      "config_parameters": [],
  78      "input_schema": {
  79          "type": "object",
  80          "description": "Not typically used directly; component reacts to events from its input queue.",
  81      },
  82      "output_schema": {
  83          "type": "object",
  84          "description": "Not typically used directly; component sends data to external systems.",
  85      },
  86  }
  87  
  88  
  89  class BaseGatewayComponent(SamComponentBase):
  90      """
  91      Abstract base class for Gateway components.
  92  
  93      Initializes shared services and manages the core lifecycle for processing
  94      A2A messages and interacting with an external communication platform.
  95      """
  96  
  97      _RESOLVE_EMBEDS_IN_FINAL_RESPONSE = False
  98  
  99      def get_config(self, key: str, default: Any = None) -> Any:
 100          """
 101          Overrides the default get_config to first look inside the nested
 102          'app_config' dictionary that BaseGatewayApp places in the component_config.
 103          This is the primary way gateway components should access their configuration.
 104          """
 105          if "app_config" in self.component_config:
 106              value = self.component_config["app_config"].get(key)
 107              if value is not None:
 108                  return value
 109  
 110          return super().get_config(key, default)
 111  
 112      def __init__(
 113          self,
 114          resolve_artifact_uris_in_gateway: bool = True,
 115          supports_inline_artifact_resolution: bool = False,
 116          filter_tool_data_parts: bool = True,
 117          **kwargs: Any
 118      ):
 119          """
 120          Initialize the BaseGatewayComponent.
 121  
 122          Args:
 123              resolve_artifact_uris_in_gateway: If True, resolves artifact URIs before sending to external.
 124              supports_inline_artifact_resolution: If True, SIGNAL_ARTIFACT_RETURN embeds are converted
 125                  to FileParts during embed resolution. If False (default), signals are passed through
 126                  for the gateway to handle manually. Use False for legacy gateways (e.g., Slack),
 127                  True for modern gateways that support inline artifact rendering (e.g., HTTP SSE).
 128              filter_tool_data_parts: If True (default), filters out tool-related DataParts (tool_call,
 129                  tool_result, etc.) from final Task messages before sending to gateway. Use True for
 130                  gateways that don't want to display internal tool execution details (e.g., Slack),
 131                  False for gateways that display all parts (e.g., HTTP SSE Web UI).
 132              **kwargs: Additional arguments passed to parent class.
 133          """
 134          super().__init__(info, **kwargs)
 135          self.resolve_artifact_uris_in_gateway = resolve_artifact_uris_in_gateway
 136          self.supports_inline_artifact_resolution = supports_inline_artifact_resolution
 137          self.filter_tool_data_parts = filter_tool_data_parts
 138          log.info("%s Initializing Base Gateway Component...", self.log_identifier)
 139  
 140          try:
 141              # Note: self.namespace and self.max_message_size_bytes are initialized in SamComponentBase
 142              self.gateway_id: str = self.get_config("gateway_id")
 143              if not self.gateway_id:
 144                  raise ValueError("Gateway ID must be configured in the app_config.")
 145  
 146              self.enable_embed_resolution: bool = self.get_config(
 147                  "enable_embed_resolution", True
 148              )
 149              self.gateway_max_artifact_resolve_size_bytes: int = self.get_config(
 150                  "gateway_max_artifact_resolve_size_bytes"
 151              )
 152              self.gateway_recursive_embed_depth: int = self.get_config(
 153                  "gateway_recursive_embed_depth"
 154              )
 155              self.artifact_handling_mode: str = self.get_config(
 156                  "artifact_handling_mode", "embed"
 157              )
 158              _ = self.get_config("artifact_service")
 159  
 160              log.info(
 161                  "%s Retrieved common configs: Namespace=%s, GatewayID=%s",
 162                  self.log_identifier,
 163                  self.namespace,
 164                  self.gateway_id,
 165              )
 166  
 167          except Exception as e:
 168              log.error(
 169                  "%s Failed to retrieve essential configuration: %s",
 170                  self.log_identifier,
 171                  e,
 172              )
 173              raise ValueError(f"Configuration retrieval error: {e}") from e
 174  
 175          self.agent_registry: AgentRegistry = AgentRegistry()
 176          self.gateway_registry: GatewayRegistry = GatewayRegistry()
 177          self.core_a2a_service: CoreA2AService = CoreA2AService(
 178              agent_registry=self.agent_registry,
 179              namespace=self.namespace,
 180              component_id="WebUI"
 181          )
 182          self.shared_artifact_service: Optional[BaseArtifactService] = (
 183              initialize_artifact_service(self)
 184          )
 185  
 186          self.task_context_manager: TaskContextManager = TaskContextManager()
 187          self.internal_event_queue: queue.Queue = queue.Queue()
 188  
 189          identity_service_config = self.get_config("identity_service")
 190          self.identity_service: Optional[BaseIdentityService] = create_identity_service(
 191              identity_service_config, self
 192          )
 193  
 194          self._config_resolver = MiddlewareRegistry.get_config_resolver()
 195          log.info(
 196              "%s Middleware system initialized (using default configuration resolver).",
 197              self.log_identifier,
 198          )
 199  
 200          self._gateway_card_publishing_config = self.get_config(
 201              "gateway_card_publishing",
 202              {"enabled": True, "interval_seconds": 30}
 203          )
 204          self._gateway_card_config = self.get_config("gateway_card", {})
 205          self._gateway_card_timer_id = f"publish_gateway_card_{self.gateway_id}"
 206  
 207          # Task timeout: max idle time (seconds) waiting for agent activity before canceling
 208          self.task_timeout_seconds: int = self.get_config(
 209              "task_timeout_seconds", constants.DEFAULT_TASK_TIMEOUT_SECONDS
 210          )
 211  
 212          # Authentication handler (optional, enterprise feature)
 213          self.auth_handler: Optional[AuthHandler] = None
 214  
 215          # Setup authentication if enabled (subclasses override _setup_auth)
 216          self._setup_auth()
 217  
 218          log.info(
 219              "%s Initialized Base Gateway Component.", self.log_identifier
 220          )
 221  
 222      def _setup_auth(self) -> None:
 223          """
 224          Setup authentication handler if enabled.
 225  
 226          This method is called during initialization and can be overridden
 227          by subclasses to customize auth setup. The default implementation
 228          does nothing - subclasses should override to enable auth.
 229  
 230          Example override in subclass:
 231              def _setup_auth(self):
 232                  if self.get_config('enable_auth', False):
 233                      from enterprise.auth import SAMOAuth2Handler
 234                      self.auth_handler = SAMOAuth2Handler(self.config)
 235          """
 236          # Base implementation: no auth
 237          # Subclasses (like GenericGateway) override to enable auth
 238          pass
 239  
 240      async def _inject_auth_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
 241          """
 242          Inject authentication headers if authenticated.
 243  
 244          This helper method should be called before making outgoing HTTP requests
 245          to add authentication headers (e.g., Bearer tokens) to the request.
 246  
 247          Args:
 248              headers: Existing headers dictionary
 249  
 250          Returns:
 251              Headers dictionary with auth headers added (if authenticated)
 252  
 253          Example:
 254              headers = {"Content-Type": "application/json"}
 255              headers = await self._inject_auth_headers(headers)
 256              # headers now includes Authorization if authenticated
 257          """
 258          if self.auth_handler:
 259              try:
 260                  auth_headers = await self.auth_handler.get_auth_headers()
 261                  headers.update(auth_headers)
 262              except Exception as e:
 263                  log.warning(
 264                      "%s Failed to get auth headers: %s",
 265                      self.log_identifier,
 266                      e
 267                  )
 268  
 269          return headers
 270  
 271      async def authenticate_and_enrich_user(
 272          self, external_event_data: Any
 273      ) -> Optional[Dict[str, Any]]:
 274          """
 275          Orchestrates the full authentication and identity enrichment flow.
 276          This method should be called by gateway handlers.
 277          """
 278          log_id_prefix = f"{self.log_identifier}[AuthAndEnrich]"
 279  
 280          auth_claims = await self._extract_initial_claims(external_event_data)
 281          if not auth_claims:
 282              log.warning(
 283                  "%s Initial claims extraction failed or returned no identity.",
 284                  log_id_prefix,
 285              )
 286              return None
 287  
 288          if self.identity_service:
 289              enriched_profile = await self.identity_service.get_user_profile(auth_claims)
 290              if enriched_profile:
 291                  final_profile = enriched_profile.copy()
 292                  final_profile.update(auth_claims)
 293                  log.info(
 294                      "%s Successfully merged auth claims and enriched profile for user: %s",
 295                      log_id_prefix,
 296                      auth_claims.get("id"),
 297                  )
 298                  return final_profile
 299              else:
 300                  log.debug(
 301                      "%s IdentityService found no profile for user: %s. Using claims only.",
 302                      log_id_prefix,
 303                      auth_claims.get("id"),
 304                  )
 305  
 306          return auth_claims
 307  
 308      async def submit_a2a_task(
 309          self,
 310          target_agent_name: str,
 311          a2a_parts: List[ContentPart],
 312          external_request_context: Dict[str, Any],
 313          user_identity: Any,
 314          is_streaming: bool = True,
 315          api_version: str = "v2",
 316          task_id_override: str | None = None,
 317          metadata: dict[str, Any] | None = None,
 318      ) -> str:
 319          log_id_prefix = f"{self.log_identifier}[SubmitA2ATask]"
 320          log.info(
 321              "%s Submitting task for user_identity: %s",
 322              log_id_prefix,
 323              user_identity.get("id", user_identity),
 324          )
 325  
 326          if not isinstance(user_identity, dict) or not user_identity.get("id"):
 327              log.error(
 328                  "%s Authentication failed or returned invalid profile. Denying task submission.",
 329                  log_id_prefix,
 330              )
 331              raise PermissionError("User not authenticated or identity is invalid.")
 332  
 333          force_identity_str = self.get_config("force_user_identity")
 334          if force_identity_str:
 335              original_identity_id = user_identity.get("id")
 336              user_identity = {"id": force_identity_str, "name": force_identity_str}
 337              log.warning(
 338                  "%s DEVELOPMENT MODE: Forcing user_identity from '%s' to '%s'",
 339                  log_id_prefix,
 340                  original_identity_id,
 341                  force_identity_str,
 342              )
 343  
 344          config_resolver = MiddlewareRegistry.get_config_resolver()
 345          gateway_context = {
 346              "gateway_id": self.gateway_id,
 347              "gateway_app_config": self.component_config.get("app_config", {}),
 348          }
 349  
 350          try:
 351              user_config = await config_resolver.resolve_user_config(
 352                  user_identity, gateway_context, {}
 353              )
 354              log.debug(
 355                  "%s Resolved user configuration for user_identity '%s': %s",
 356                  log_id_prefix,
 357                  user_identity.get("id"),
 358                  {k: v for k, v in user_config.items() if not k.startswith("_")},
 359              )
 360          except Exception as config_err:
 361              log.exception(
 362                  "%s Error resolving user configuration for '%s': %s. Proceeding with default configuration.",
 363                  log_id_prefix,
 364                  user_identity.get("id"),
 365                  config_err,
 366              )
 367              user_config = {}
 368  
 369          user_config["user_profile"] = user_identity
 370  
 371          # Validate user has permission to access this target agent
 372          validate_agent_access(
 373              user_config=user_config,
 374              target_agent_name=target_agent_name,
 375              validation_context={
 376                  "gateway_id": self.gateway_id,
 377                  "source": "gateway_request",
 378              },
 379              log_identifier=log_id_prefix,
 380          )
 381  
 382          external_request_context["user_identity"] = user_identity
 383          external_request_context["a2a_user_config"] = user_config
 384          external_request_context["api_version"] = api_version
 385          external_request_context["is_streaming"] = is_streaming
 386          external_request_context["target_agent_name"] = target_agent_name
 387          log.debug(
 388              "%s Stored user_identity, configuration, api_version (%s), and is_streaming (%s) in external_request_context.",
 389              log_id_prefix,
 390              api_version,
 391              is_streaming,
 392          )
 393  
 394          now = datetime.now(timezone.utc)
 395          timestamp_str = now.isoformat()
 396          timestamp_header_part = TextPart(
 397              text=f"Request received by gateway at: {timestamp_str}"
 398          )
 399          if not isinstance(a2a_parts, list):
 400              a2a_parts = list(a2a_parts)
 401          a2a_parts.insert(0, timestamp_header_part)
 402          log.debug("%s Prepended timestamp to a2a_parts.", log_id_prefix)
 403  
 404          a2a_session_id = external_request_context.get("a2a_session_id")
 405          user_id_for_a2a = external_request_context.get(
 406              "user_id_for_a2a", user_identity.get("id")
 407          )
 408  
 409          system_purpose = self.get_config("system_purpose", "")
 410          response_format = self.get_config("response_format", "")
 411  
 412          if not a2a_session_id:
 413              a2a_session_id = f"gdk-session-{uuid.uuid4().hex}"
 414              log.warning(
 415                  "%s 'a2a_session_id' not found in external_request_context, generated: %s",
 416                  self.log_identifier,
 417                  a2a_session_id,
 418              )
 419              external_request_context["a2a_session_id"] = a2a_session_id
 420  
 421          a2a_metadata = {
 422              "agent_name": target_agent_name,
 423              "system_purpose": system_purpose,
 424              "response_format": response_format,
 425          }
 426  
 427          # Add session behavior if provided by adapter
 428          session_behavior = external_request_context.get("session_behavior")
 429          if session_behavior:
 430              a2a_metadata["sessionBehavior"] = session_behavior
 431              log.debug(
 432                  "%s Setting sessionBehavior to: %s", log_id_prefix, session_behavior
 433              )
 434  
 435          invoked_artifacts = external_request_context.get("invoked_with_artifacts")
 436          if invoked_artifacts:
 437              a2a_metadata["invoked_with_artifacts"] = invoked_artifacts
 438              log.debug(
 439                  "%s Found %d artifact identifiers in external context to pass to agent.",
 440                  log_id_prefix,
 441                  len(invoked_artifacts),
 442              )
 443          
 444          if metadata:
 445              a2a_metadata.update(metadata)
 446  
 447          # This correlation ID is used by the gateway to track the task
 448          if task_id_override:
 449              task_id = task_id_override
 450          else:
 451              task_id = f"gdk-task-{uuid.uuid4().hex}"
 452  
 453          prepared_a2a_parts = await self._prepare_parts_for_publishing(
 454              parts=a2a_parts,
 455              user_id=user_id_for_a2a,
 456              session_id=a2a_session_id,
 457              target_agent_name=target_agent_name,
 458          )
 459  
 460          a2a_message = a2a.create_user_message(
 461              parts=prepared_a2a_parts,
 462              metadata=a2a_metadata,
 463              context_id=a2a_session_id,
 464          )
 465  
 466          if is_streaming:
 467              a2a_request = a2a.create_send_streaming_message_request(
 468                  message=a2a_message, task_id=task_id
 469              )
 470          else:
 471              a2a_request = a2a.create_send_message_request(
 472                  message=a2a_message, task_id=task_id
 473              )
 474  
 475          payload = a2a_request.model_dump(by_alias=True, exclude_none=True)
 476          target_topic = a2a.get_agent_request_topic(self.namespace, target_agent_name)
 477  
 478          user_properties = {
 479              "clientId": self.gateway_id,
 480              "userId": user_id_for_a2a,
 481          }
 482          if user_config:
 483              user_properties["a2aUserConfig"] = user_config
 484  
 485          # Enterprise feature: Add signed user claims if trust manager available
 486          if hasattr(self, "trust_manager") and self.trust_manager:
 487              log.debug(
 488                  "%s Attempting to sign user claims for task %s",
 489                  log_id_prefix,
 490                  task_id,
 491              )
 492              try:
 493                  auth_token = self.trust_manager.sign_user_claims(
 494                      user_info=user_identity, task_id=task_id
 495                  )
 496                  user_properties["authToken"] = auth_token
 497                  log.debug(
 498                      "%s Successfully signed user claims for task %s",
 499                      log_id_prefix,
 500                      task_id,
 501                  )
 502              except Exception as e:
 503                  log.error(
 504                      "%s Failed to sign user claims for task %s: %s",
 505                      log_id_prefix,
 506                      task_id,
 507                      e,
 508                  )
 509                  # Continue without token - enterprise feature is optional
 510          else:
 511              log.debug(
 512                  "%s Trust Manager not available, proceeding without authentication token",
 513                  log_id_prefix,
 514              )
 515  
 516          user_properties["replyTo"] = a2a.get_gateway_response_topic(
 517              self.namespace, self.gateway_id, task_id
 518          )
 519          if is_streaming:
 520              user_properties["a2aStatusTopic"] = a2a.get_gateway_status_topic(
 521                  self.namespace, self.gateway_id, task_id
 522              )
 523  
 524          self.task_context_manager.store_context(task_id, external_request_context)
 525          log.info("%s Stored external context for task_id: %s", log_id_prefix, task_id)
 526  
 527          self.publish_a2a_message(
 528              payload=payload, topic=target_topic, user_properties=user_properties
 529          )
 530          log.info(
 531              "%s Submitted A2A task %s to agent %s. Streaming: %s",
 532              log_id_prefix,
 533              task_id,
 534              target_agent_name,
 535              is_streaming,
 536          )
 537          return task_id
 538  
 539      def _handle_message(self, message: SolaceMessage, topic: str) -> None:
 540          """
 541          Override to use queue-based pattern instead of direct async.
 542  
 543          Gateway uses an internal queue for message processing to ensure
 544          strict ordering and backpressure handling.
 545  
 546          Args:
 547              message: The Solace message
 548              topic: The topic the message was received on
 549          """
 550          log.debug(
 551              "%s Received SolaceMessage on topic: %s. Bridging to internal queue.",
 552              self.log_identifier,
 553              topic,
 554          )
 555  
 556          try:
 557              msg_data_for_processor = {
 558                  "topic": topic,
 559                  "payload": message.get_payload(),
 560                  "user_properties": message.get_user_properties(),
 561                  "_original_broker_message": message,
 562              }
 563              self.internal_event_queue.put_nowait(msg_data_for_processor)
 564          except queue.Full:
 565              log.error(
 566                  "%s Internal event queue full. Cannot bridge message.",
 567                  self.log_identifier,
 568              )
 569              raise
 570          except Exception as e:
 571              log.exception(
 572                  "%s Error bridging message to internal queue: %s",
 573                  self.log_identifier,
 574                  e,
 575              )
 576              raise
 577  
 578      async def _handle_message_async(self, message, topic: str) -> None:
 579          """
 580          Not used by gateway - we override _handle_message() instead.
 581  
 582          This is here to satisfy the abstract method requirement, but the
 583          gateway uses the queue-based pattern via _handle_message() override.
 584          """
 585          raise NotImplementedError(
 586              "Gateway uses queue-based message handling via _handle_message() override"
 587          )
 588  
 589      async def _handle_resolved_signals(
 590          self,
 591          external_request_context: Dict,
 592          signals: List[Tuple[None, str, Any]],
 593          original_rpc_id: Optional[str],
 594          is_finalizing_context: bool = False,
 595      ):
 596          log_id_prefix = f"{self.log_identifier}[SignalHandler]"
 597          if not signals:
 598              return
 599  
 600          for signal_tuple in signals:
 601              if (
 602                  isinstance(signal_tuple, tuple)
 603                  and len(signal_tuple) == 3
 604                  and signal_tuple[0] is None
 605              ):
 606                  signal_type = signal_tuple[1]
 607                  signal_data = signal_tuple[2]
 608  
 609                  if signal_type == "SIGNAL_STATUS_UPDATE":
 610                      status_text = signal_data
 611                      log.info(
 612                          "%s Handling SIGNAL_STATUS_UPDATE: '%s'",
 613                          log_id_prefix,
 614                          status_text,
 615                      )
 616                      if is_finalizing_context:
 617                          log.debug(
 618                              "%s Suppressing SIGNAL_STATUS_UPDATE ('%s') during finalizing context.",
 619                              log_id_prefix,
 620                              status_text,
 621                          )
 622                          continue
 623                      try:
 624                          signal_a2a_message = a2a.create_agent_data_message(
 625                              data={
 626                                  "type": "agent_progress_update",
 627                                  "status_text": status_text,
 628                              },
 629                              part_metadata={"source": "gateway_signal"},
 630                          )
 631                          a2a_task_id_for_signal = external_request_context.get(
 632                              "a2a_task_id_for_event", original_rpc_id
 633                          )
 634                          if not a2a_task_id_for_signal:
 635                              log.error(
 636                                  "%s Cannot determine A2A task ID for signal event. Skipping.",
 637                                  log_id_prefix,
 638                              )
 639                              continue
 640  
 641                          signal_event = a2a.create_status_update(
 642                              task_id=a2a_task_id_for_signal,
 643                              context_id=external_request_context.get("a2a_session_id"),
 644                              message=signal_a2a_message,
 645                              is_final=False,
 646                          )
 647                          await self._send_update_to_external(
 648                              external_request_context=external_request_context,
 649                              event_data=signal_event,
 650                              is_final_chunk_of_update=True,
 651                          )
 652                          log.debug(
 653                              "%s Sent status signal as TaskStatusUpdateEvent.",
 654                              log_id_prefix,
 655                          )
 656                      except Exception as e:
 657                          log.exception(
 658                              "%s Error sending status signal: %s", log_id_prefix, e
 659                          )
 660                  elif signal_type == "SIGNAL_ARTIFACT_RETURN":
 661                      # Handle artifact return signal for legacy gateways
 662                      # During finalizing context (final Task), suppress this to avoid duplicates
 663                      # since the same signal might appear in both streaming and final responses
 664                      if is_finalizing_context:
 665                          log.debug(
 666                              "%s Suppressing SIGNAL_ARTIFACT_RETURN during finalizing context to avoid duplicate: %s",
 667                              log_id_prefix,
 668                              signal_data,
 669                          )
 670                          continue
 671  
 672                      log.info(
 673                          "%s Handling SIGNAL_ARTIFACT_RETURN for legacy gateway: %s",
 674                          log_id_prefix,
 675                          signal_data,
 676                      )
 677                      try:
 678                          filename = signal_data.get("filename")
 679                          version = signal_data.get("version")
 680  
 681                          if not filename:
 682                              log.error(
 683                                  "%s SIGNAL_ARTIFACT_RETURN missing filename. Skipping.",
 684                                  log_id_prefix,
 685                              )
 686                              continue
 687  
 688                          # Load artifact content (not just metadata) for legacy gateways
 689                          # Legacy gateways like Slack need the actual bytes to upload files
 690                          artifact_data = await load_artifact_content_or_metadata(
 691                              self.shared_artifact_service,
 692                              app_name=external_request_context.get(
 693                                  "app_name_for_artifacts", self.gateway_id
 694                              ),
 695                              user_id=external_request_context.get("user_id_for_artifacts"),
 696                              session_id=external_request_context.get("a2a_session_id"),
 697                              filename=filename,
 698                              version=version,
 699                              load_metadata_only=False,  # Load full content for legacy gateways
 700                          )
 701  
 702                          if artifact_data.get("status") != "success":
 703                              log.error(
 704                                  "%s Failed to load artifact content for %s v%s",
 705                                  log_id_prefix,
 706                                  filename,
 707                                  version,
 708                              )
 709                              continue
 710  
 711                          # Get content and ensure it's bytes
 712                          content = artifact_data.get("content")
 713                          if not content:
 714                              log.error(
 715                                  "%s No content found in artifact %s v%s",
 716                                  log_id_prefix,
 717                                  filename,
 718                                  version,
 719                              )
 720                              continue
 721  
 722                          # Convert to bytes if it's a string (text-based artifacts)
 723                          if isinstance(content, str):
 724                              content_bytes = content.encode("utf-8")
 725                          elif isinstance(content, bytes):
 726                              content_bytes = content
 727                          else:
 728                              log.error(
 729                                  "%s Artifact content is neither string nor bytes: %s",
 730                                  log_id_prefix,
 731                                  type(content),
 732                              )
 733                              continue
 734  
 735                          # Resolve any late embeds inside the artifact content before returning.
 736                          content_bytes = await self._resolve_embeds_in_artifact_content(
 737                              content_bytes=content_bytes,
 738                              mime_type=artifact_data.get("metadata", {}).get(
 739                                  "mime_type"
 740                              ),
 741                              filename=filename,
 742                              external_request_context=external_request_context,
 743                              log_id_prefix=log_id_prefix,
 744                          )
 745  
 746                          # Create FilePart with bytes for legacy gateway to upload
 747                          file_part = a2a.create_file_part_from_bytes(
 748                              content_bytes=content_bytes,
 749                              name=filename,
 750                              mime_type=artifact_data.get("metadata", {}).get(
 751                                  "mime_type"
 752                              ),
 753                          )
 754  
 755                          # Create artifact with the file part
 756                          # Import Part type for wrapping
 757                          from a2a.types import Artifact, Part
 758                          artifact = Artifact(
 759                              artifact_id=str(uuid.uuid4().hex),
 760                              parts=[Part(root=file_part)],
 761                              name=filename,
 762                              description=f"Artifact: {filename}",
 763                          )
 764  
 765                          # Send as TaskArtifactUpdateEvent
 766                          a2a_task_id_for_signal = external_request_context.get(
 767                              "a2a_task_id_for_event", original_rpc_id
 768                          )
 769  
 770                          if not a2a_task_id_for_signal:
 771                              log.error(
 772                                  "%s Cannot determine A2A task ID for artifact signal. Skipping.",
 773                                  log_id_prefix,
 774                              )
 775                              continue
 776  
 777                          artifact_event = a2a.create_artifact_update(
 778                              task_id=a2a_task_id_for_signal,
 779                              context_id=external_request_context.get("a2a_session_id"),
 780                              artifact=artifact,
 781                          )
 782  
 783                          await self._send_update_to_external(
 784                              external_request_context=external_request_context,
 785                              event_data=artifact_event,
 786                              is_final_chunk_of_update=False,
 787                          )
 788                          log.info(
 789                              "%s Sent artifact signal as TaskArtifactUpdateEvent for %s",
 790                              log_id_prefix,
 791                              filename,
 792                          )
 793                      except Exception as e:
 794                          log.exception(
 795                              "%s Error sending artifact signal: %s", log_id_prefix, e
 796                          )
 797                  elif signal_type == "SIGNAL_ARTIFACT_CREATION_COMPLETE":
 798                      # Handle artifact creation completion for legacy gateways
 799                      # This is similar to SIGNAL_ARTIFACT_RETURN but for newly created artifacts
 800                      log.info(
 801                          "%s Handling SIGNAL_ARTIFACT_CREATION_COMPLETE for legacy gateway: %s",
 802                          log_id_prefix,
 803                          signal_data,
 804                      )
 805                      try:
 806                          filename = signal_data.get("filename")
 807                          version = signal_data.get("version")
 808  
 809                          if not filename:
 810                              log.error(
 811                                  "%s SIGNAL_ARTIFACT_CREATION_COMPLETE missing filename. Skipping.",
 812                                  log_id_prefix,
 813                              )
 814                              continue
 815  
 816                          # Load artifact content (not just metadata) for legacy gateways
 817                          # Legacy gateways like Slack need the actual bytes to upload files
 818                          artifact_data = await load_artifact_content_or_metadata(
 819                              self.shared_artifact_service,
 820                              app_name=external_request_context.get(
 821                                  "app_name_for_artifacts", self.gateway_id
 822                              ),
 823                              user_id=external_request_context.get("user_id_for_artifacts"),
 824                              session_id=external_request_context.get("a2a_session_id"),
 825                              filename=filename,
 826                              version=version,
 827                              load_metadata_only=False,  # Load full content for legacy gateways
 828                          )
 829  
 830                          if artifact_data.get("status") != "success":
 831                              log.error(
 832                                  "%s Failed to load artifact content for %s v%s",
 833                                  log_id_prefix,
 834                                  filename,
 835                                  version,
 836                              )
 837                              continue
 838  
 839                          # Get content and ensure it's bytes
 840                          content = artifact_data.get("content")
 841                          if not content:
 842                              log.error(
 843                                  "%s No content found in artifact %s v%s",
 844                                  log_id_prefix,
 845                                  filename,
 846                                  version,
 847                              )
 848                              continue
 849  
 850                          # Convert to bytes if it's a string (text-based artifacts)
 851                          if isinstance(content, str):
 852                              content_bytes = content.encode("utf-8")
 853                          elif isinstance(content, bytes):
 854                              content_bytes = content
 855                          else:
 856                              log.error(
 857                                  "%s Artifact content is neither string nor bytes: %s",
 858                                  log_id_prefix,
 859                                  type(content),
 860                              )
 861                              continue
 862  
 863                          # Create FilePart with bytes for legacy gateway to upload
 864                          file_part = a2a.create_file_part_from_bytes(
 865                              content_bytes=content_bytes,
 866                              name=filename,
 867                              mime_type=signal_data.get("mime_type") or artifact_data.get("metadata", {}).get("mime_type"),
 868                          )
 869  
 870                          # Create artifact with the file part
 871                          # Import Part type for wrapping
 872                          from a2a.types import Artifact, Part
 873                          artifact = Artifact(
 874                              artifact_id=str(uuid.uuid4().hex),
 875                              parts=[Part(root=file_part)],
 876                              name=filename,
 877                              description=f"Artifact: {filename}",
 878                          )
 879  
 880                          # Send as TaskArtifactUpdateEvent
 881                          a2a_task_id_for_signal = external_request_context.get(
 882                              "a2a_task_id_for_event", original_rpc_id
 883                          )
 884  
 885                          if not a2a_task_id_for_signal:
 886                              log.error(
 887                                  "%s Cannot determine A2A task ID for artifact creation signal. Skipping.",
 888                                  log_id_prefix,
 889                              )
 890                              continue
 891  
 892                          artifact_event = a2a.create_artifact_update(
 893                              task_id=a2a_task_id_for_signal,
 894                              context_id=external_request_context.get("a2a_session_id"),
 895                              artifact=artifact,
 896                          )
 897  
 898                          await self._send_update_to_external(
 899                              external_request_context=external_request_context,
 900                              event_data=artifact_event,
 901                              is_final_chunk_of_update=False,
 902                          )
 903                          log.info(
 904                              "%s Sent artifact creation completion as TaskArtifactUpdateEvent for %s",
 905                              log_id_prefix,
 906                              filename,
 907                          )
 908                      except Exception as e:
 909                          log.exception(
 910                              "%s Error sending artifact creation completion signal: %s", log_id_prefix, e
 911                          )
 912                  elif signal_type == "SIGNAL_DEEP_RESEARCH_REPORT":
 913                      # Handle deep research report signal for legacy gateways
 914                      # For legacy gateways, we send the report as a file attachment
 915                      if is_finalizing_context:
 916                          log.debug(
 917                              "%s Suppressing SIGNAL_DEEP_RESEARCH_REPORT during finalizing context to avoid duplicate: %s",
 918                              log_id_prefix,
 919                              signal_data,
 920                          )
 921                          continue
 922  
 923                      try:
 924                          filename = signal_data.get("filename")
 925                          version = signal_data.get("version")
 926  
 927                          if not filename:
 928                              log.error(
 929                                  "%s SIGNAL_DEEP_RESEARCH_REPORT missing filename. Skipping.",
 930                                  log_id_prefix,
 931                              )
 932                              continue
 933  
 934                          # Load artifact content for legacy gateways
 935                          artifact_data = await load_artifact_content_or_metadata(
 936                              self.shared_artifact_service,
 937                              app_name=external_request_context.get(
 938                                  "app_name_for_artifacts", self.gateway_id
 939                              ),
 940                              user_id=external_request_context.get("user_id_for_artifacts"),
 941                              session_id=external_request_context.get("a2a_session_id"),
 942                              filename=filename,
 943                              version=version,
 944                              load_metadata_only=False,
 945                          )
 946  
 947                          if artifact_data.get("status") != "success":
 948                              log.error(
 949                                  "%s Failed to load deep research report content for %s v%s",
 950                                  log_id_prefix,
 951                                  filename,
 952                                  version,
 953                              )
 954                              continue
 955  
 956                          content = artifact_data.get("content")
 957                          if not content:
 958                              log.error(
 959                                  "%s No content found in deep research report %s v%s",
 960                                  log_id_prefix,
 961                                  filename,
 962                                  version,
 963                              )
 964                              continue
 965  
 966                          # Convert to bytes if it's a string
 967                          if isinstance(content, str):
 968                              content_bytes = content.encode("utf-8")
 969                          elif isinstance(content, bytes):
 970                              content_bytes = content
 971                          else:
 972                              log.error(
 973                                  "%s Deep research report content is neither string nor bytes: %s",
 974                                  log_id_prefix,
 975                                  type(content),
 976                              )
 977                              continue
 978  
 979                          # Create FilePart with bytes for legacy gateway to upload
 980                          file_part = a2a.create_file_part_from_bytes(
 981                              content_bytes=content_bytes,
 982                              name=filename,
 983                              mime_type=artifact_data.get("metadata", {}).get(
 984                                  "mime_type", "text/markdown"
 985                              ),
 986                          )
 987  
 988                          # Create artifact with the file part
 989                          from a2a.types import Artifact, Part
 990                          artifact = Artifact(
 991                              artifact_id=str(uuid.uuid4().hex),
 992                              parts=[Part(root=file_part)],
 993                              name=filename,
 994                              description=f"Deep Research Report: {filename}",
 995                          )
 996  
 997                          # Send as TaskArtifactUpdateEvent
 998                          a2a_task_id_for_signal = external_request_context.get(
 999                              "a2a_task_id_for_event", original_rpc_id
1000                          )
1001  
1002                          if not a2a_task_id_for_signal:
1003                              log.error(
1004                                  "%s Cannot determine A2A task ID for deep research report signal. Skipping.",
1005                                  log_id_prefix,
1006                              )
1007                              continue
1008  
1009                          artifact_event = a2a.create_artifact_update(
1010                              task_id=a2a_task_id_for_signal,
1011                              context_id=external_request_context.get("a2a_session_id"),
1012                              artifact=artifact,
1013                          )
1014  
1015                          await self._send_update_to_external(
1016                              external_request_context=external_request_context,
1017                              event_data=artifact_event,
1018                              is_final_chunk_of_update=False,
1019                          )
1020                          log.info(
1021                              "%s Sent deep research report as TaskArtifactUpdateEvent for %s",
1022                              log_id_prefix,
1023                              filename,
1024                          )
1025                      except Exception as e:
1026                          log.exception(
1027                              "%s Error sending deep research report signal: %s", log_id_prefix, e
1028                          )
1029                  else:
1030                      log.warning(
1031                          "%s Received unhandled signal type during embed resolution: %s",
1032                          log_id_prefix,
1033                          signal_type,
1034                      )
1035  
1036      async def _resolve_embeds_in_artifact_content(
1037          self,
1038          content_bytes: bytes,
1039          mime_type: Optional[str],
1040          filename: str,
1041          external_request_context: Dict[str, Any],
1042          log_id_prefix: str,
1043      ) -> bytes:
1044          """
1045          Checks if content is text-based and, if so, resolves late embeds within it.
1046          Returns the (potentially modified) content as bytes.
1047          """
1048          if is_text_based_mime_type(mime_type):
1049              log.info(
1050                  "%s Artifact '%s' is text-based (%s). Resolving late embeds.",
1051                  log_id_prefix,
1052                  filename,
1053                  mime_type,
1054              )
1055              try:
1056                  decoded_content = content_bytes.decode("utf-8")
1057  
1058                  # Construct context and config for the resolver
1059                  embed_eval_context = {
1060                      "artifact_service": self.shared_artifact_service,
1061                      "session_context": {
1062                          "app_name": external_request_context.get(
1063                              "app_name_for_artifacts", self.gateway_id
1064                          ),
1065                          "user_id": external_request_context.get(
1066                              "user_id_for_artifacts"
1067                          ),
1068                          "session_id": external_request_context.get("a2a_session_id"),
1069                      },
1070                  }
1071                  embed_eval_config = {
1072                      "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes,
1073                      "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth,
1074                  }
1075  
1076                  resolved_string = await resolve_embeds_recursively_in_string(
1077                      text=decoded_content,
1078                      context=embed_eval_context,
1079                      resolver_func=evaluate_embed,
1080                      types_to_resolve=LATE_EMBED_TYPES,
1081                      resolution_mode=ResolutionMode.RECURSIVE_ARTIFACT_CONTENT,
1082                      log_identifier=f"{log_id_prefix}[RecursiveResolve]",
1083                      config=embed_eval_config,
1084                      max_depth=self.gateway_recursive_embed_depth,
1085                  )
1086                  resolved_bytes = resolved_string.encode("utf-8")
1087                  log.info(
1088                      "%s Successfully resolved embeds in '%s'. New size: %d bytes.",
1089                      log_id_prefix,
1090                      filename,
1091                      len(resolved_bytes),
1092                  )
1093                  return resolved_bytes
1094              except Exception as resolve_err:
1095                  log.error(
1096                      "%s Failed to resolve embeds within artifact '%s': %s. Returning raw content.",
1097                      log_id_prefix,
1098                      filename,
1099                      resolve_err,
1100                  )
1101          return content_bytes
1102  
1103      async def _resolve_uri_in_file_part(
1104          self, file_part: FilePart, external_request_context: Dict[str, Any]
1105      ):
1106          """
1107          Checks if a FilePart has a resolvable URI and, if so,
1108          resolves it and mutates the part in-place by calling the common utility.
1109          After resolving the URI, it also resolves any late embeds within the content.
1110          """
1111          await a2a.resolve_file_part_uri(
1112              part=file_part,
1113              artifact_service=self.shared_artifact_service,
1114              log_identifier=self.log_identifier,
1115          )
1116  
1117          # After resolving the URI to get the content, resolve any late embeds inside it.
1118          if file_part.file and isinstance(file_part.file, FileWithBytes):
1119              # The content is a base64 encoded string in the `bytes` attribute.
1120              # We need to decode it to raw bytes for processing.
1121              try:
1122                  content_bytes = base64.b64decode(file_part.file.bytes)
1123              except Exception as e:
1124                  log.error(
1125                      "%s Failed to base64 decode file content for embed resolution: %s",
1126                      f"{self.log_identifier}[UriResolve]",
1127                      e,
1128                  )
1129                  return
1130  
1131              resolved_bytes = await self._resolve_embeds_in_artifact_content(
1132                  content_bytes=content_bytes,
1133                  mime_type=file_part.file.mime_type,
1134                  filename=file_part.file.name,
1135                  external_request_context=external_request_context,
1136                  log_id_prefix=f"{self.log_identifier}[UriResolve]",
1137              )
1138              # Re-encode the resolved content back to a base64 string for the FileWithBytes model.
1139              file_part.file.bytes = base64.b64encode(resolved_bytes).decode("utf-8")
1140  
1141      async def _resolve_uris_in_parts_list(
1142          self, parts: List[ContentPart], external_request_context: Dict[str, Any]
1143      ):
1144          """Iterates over a list of part objects and resolves any FilePart URIs."""
1145          if not parts:
1146              return
1147          for part in parts:
1148              if isinstance(part, FilePart):
1149                  await self._resolve_uri_in_file_part(part, external_request_context)
1150  
1151      async def _resolve_uris_in_payload(
1152          self, parsed_event: Any, external_request_context: Dict[str, Any]
1153      ):
1154          """
1155          Dispatcher that calls the appropriate targeted URI resolver based on the
1156          Pydantic model type of the event.
1157          """
1158          parts_to_resolve: List[ContentPart] = []
1159          if isinstance(parsed_event, TaskStatusUpdateEvent):
1160              message = a2a.get_message_from_status_update(parsed_event)
1161              if message:
1162                  parts_to_resolve.extend(a2a.get_parts_from_message(message))
1163          elif isinstance(parsed_event, TaskArtifactUpdateEvent):
1164              artifact = a2a.get_artifact_from_artifact_update(parsed_event)
1165              if artifact:
1166                  parts_to_resolve.extend(a2a.get_parts_from_artifact(artifact))
1167          elif isinstance(parsed_event, Task):
1168              if parsed_event.status and parsed_event.status.message:
1169                  parts_to_resolve.extend(
1170                      a2a.get_parts_from_message(parsed_event.status.message)
1171                  )
1172              if parsed_event.artifacts:
1173                  for artifact in parsed_event.artifacts:
1174                      parts_to_resolve.extend(a2a.get_parts_from_artifact(artifact))
1175  
1176          if parts_to_resolve:
1177              await self._resolve_uris_in_parts_list(
1178                  parts_to_resolve, external_request_context
1179              )
1180          else:
1181              log.debug(
1182                  "%s Payload type '%s' did not yield any parts for URI resolution. Skipping.",
1183                  self.log_identifier,
1184                  type(parsed_event).__name__,
1185              )
1186  
1187      async def _handle_discovery_message(self, payload: Dict) -> bool:
1188          """Handles incoming agent and gateway discovery messages."""
1189          try:
1190              agent_card = AgentCard(**payload)
1191  
1192              # Route to appropriate registry based on card type
1193              if is_gateway_card(agent_card):
1194                  # This is a gateway card - track in gateway registry
1195                  is_new = self.gateway_registry.add_or_update_gateway(agent_card)
1196                  if is_new:
1197                      gateway_type = self.gateway_registry.get_gateway_type(agent_card.name)
1198                      log.info(
1199                          "%s New gateway discovered: %s (type: %s)",
1200                          self.log_identifier,
1201                          agent_card.name,
1202                          gateway_type or "unknown"
1203                      )
1204                  else:
1205                      log.debug(
1206                          "%s Gateway heartbeat received: %s",
1207                          self.log_identifier,
1208                          agent_card.name
1209                      )
1210              else:
1211                  # This is an agent card - use existing logic
1212                  self.core_a2a_service.process_discovery_message(agent_card)
1213  
1214              return True
1215          except Exception as e:
1216              log.error(
1217                  "%s Failed to process discovery message: %s. Payload: %s",
1218                  self.log_identifier,
1219                  e,
1220                  payload,
1221              )
1222              return False
1223  
1224      async def _prepare_parts_for_publishing(
1225          self,
1226          parts: List[ContentPart],
1227          user_id: str,
1228          session_id: str,
1229          target_agent_name: str,
1230      ) -> List[ContentPart]:
1231          """
1232          Prepares message parts for publishing according to the configured artifact_handling_mode
1233          by calling the common utility function.
1234          """
1235          processed_parts: List[ContentPart] = []
1236          for part in parts:
1237              if isinstance(part, FilePart):
1238                  processed_part = await a2a.prepare_file_part_for_publishing(
1239                      part=part,
1240                      mode=self.artifact_handling_mode,
1241                      artifact_service=self.shared_artifact_service,
1242                      user_id=user_id,
1243                      session_id=session_id,
1244                      target_agent_name=target_agent_name,
1245                      log_identifier=self.log_identifier,
1246                  )
1247                  if processed_part:
1248                      processed_parts.append(processed_part)
1249              else:
1250                  processed_parts.append(part)
1251          return processed_parts
1252  
1253      def _should_include_data_part_in_final_output(self, part: Any) -> bool:
1254          """
1255          Determines if a DataPart should be included in the final output sent to the gateway.
1256  
1257          This filters out internal/tool-related DataParts that shouldn't be shown to end users.
1258          Gateways can override this method for custom filtering logic.
1259  
1260          Args:
1261              part: The part to check (expected to be a DataPart)
1262  
1263          Returns:
1264              True if the part should be included, False if it should be filtered out
1265          """
1266          from a2a.types import DataPart
1267  
1268          if not isinstance(part, DataPart):
1269              return True
1270  
1271          # Check if this is a tool result by looking at metadata
1272          # Tool results have metadata.tool_name set
1273          if part.metadata and part.metadata.get("tool_name"):
1274              # This is a tool result - filter it out
1275              return False
1276  
1277          # Get the type of the data part
1278          data_type = part.data.get("type") if part.data else None
1279  
1280          # Filter out tool-related data parts that are internal
1281          tool_related_types = {
1282              "tool_call",
1283              "tool_result",
1284              "tool_error",
1285              "tool_execution",
1286          }
1287  
1288          if data_type in tool_related_types:
1289              return False
1290  
1291          # Handle artifact_creation_progress based on gateway capabilities
1292          if data_type == "artifact_creation_progress":
1293              # For modern gateways (HTTP SSE), keep these to display progress bubbles
1294              # For legacy gateways (Slack), filter them out as they'll be converted to FileParts
1295              if self.supports_inline_artifact_resolution:
1296                  return True  # Keep for HTTP SSE
1297              else:
1298                  return False  # Filter for Slack (will be converted to FileParts instead)
1299  
1300          # Keep user-facing data parts like general progress updates
1301          user_facing_types = {
1302              "agent_progress_update",
1303              "thinking_content",
1304          }
1305  
1306          if data_type in user_facing_types:
1307              return True
1308  
1309          # Default: include unknown types (to avoid hiding potentially useful info)
1310          return True
1311  
1312      async def _resolve_embeds_and_handle_signals(
1313          self,
1314          event_with_parts: Union[TaskStatusUpdateEvent, Task, TaskArtifactUpdateEvent],
1315          external_request_context: Dict[str, Any],
1316          a2a_task_id: str,
1317          original_rpc_id: Optional[str],
1318          is_finalizing_context: bool = False,
1319      ) -> bool:
1320          if not self.enable_embed_resolution:
1321              return False
1322  
1323          log_id_prefix = f"{self.log_identifier}[EmbedResolve:{a2a_task_id}]"
1324          content_modified = False
1325  
1326          embed_eval_context = {
1327              "artifact_service": self.shared_artifact_service,
1328              "session_context": {
1329                  "app_name": external_request_context.get(
1330                      "app_name_for_artifacts", self.gateway_id
1331                  ),
1332                  "user_id": external_request_context.get("user_id_for_artifacts"),
1333                  "session_id": external_request_context.get("a2a_session_id"),
1334              },
1335          }
1336          embed_eval_config = {
1337              "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes,
1338              "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth,
1339          }
1340  
1341          parts_owner: Optional[Union[A2AMessage, A2AArtifact]] = None
1342          if isinstance(event_with_parts, (TaskStatusUpdateEvent, Task)):
1343              if event_with_parts.status and event_with_parts.status.message:
1344                  parts_owner = event_with_parts.status.message
1345          elif isinstance(event_with_parts, TaskArtifactUpdateEvent):
1346              if event_with_parts.artifact:
1347                  parts_owner = event_with_parts.artifact
1348  
1349          if not (parts_owner and parts_owner.parts):
1350              return False
1351  
1352          is_streaming_status_update = isinstance(event_with_parts, TaskStatusUpdateEvent)
1353          stream_buffer_key = f"{a2a_task_id}_stream_buffer"
1354          current_buffer = ""
1355          if is_streaming_status_update:
1356              current_buffer = (
1357                  self.task_context_manager.get_context(stream_buffer_key) or ""
1358              )
1359  
1360          original_parts: List[ContentPart] = (
1361              a2a.get_parts_from_message(parts_owner)
1362              if isinstance(parts_owner, A2AMessage)
1363              else a2a.get_parts_from_artifact(parts_owner)
1364          )
1365  
1366          new_parts: List[ContentPart] = []
1367          other_signals = []
1368  
1369          for part in original_parts:
1370              if isinstance(part, TextPart) and part.text:
1371                  text_to_resolve = current_buffer + part.text
1372                  current_buffer = ""  # Buffer is now being processed
1373  
1374                  # Debug: Log the text before embed resolution
1375                  log.debug(
1376                      "%s Input text for embed resolution (len=%d): %s",
1377                      log_id_prefix,
1378                      len(text_to_resolve),
1379                      text_to_resolve[:500] + "..." if len(text_to_resolve) > 500 else text_to_resolve,
1380                  )
1381  
1382                  (
1383                      resolved_text,
1384                      processed_idx,
1385                      signals_with_placeholders,
1386                  ) = await resolve_embeds_in_string(
1387                      text=text_to_resolve,
1388                      context=embed_eval_context,
1389                      resolver_func=evaluate_embed,
1390                      types_to_resolve=LATE_EMBED_TYPES.union({"status_update"}),
1391                      resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER,
1392                      log_identifier=log_id_prefix,
1393                      config=embed_eval_config,
1394                  )
1395  
1396                  # Debug: Log the resolved text
1397                  log.debug(
1398                      "%s Resolved text (processed_idx=%d, signals=%d, len=%d): %s",
1399                      log_id_prefix,
1400                      processed_idx,
1401                      len(signals_with_placeholders),
1402                      len(resolved_text),
1403                      resolved_text[:500] + "..." if len(resolved_text) > 500 else resolved_text,
1404                  )
1405  
1406                  if not signals_with_placeholders:
1407                      new_parts.append(a2a.create_text_part(text=resolved_text))
1408                  else:
1409                      placeholder_map = {p: s for _, s, p in signals_with_placeholders}
1410                      split_pattern = (
1411                          f"({'|'.join(re.escape(p) for p in placeholder_map.keys())})"
1412                      )
1413                      text_fragments = re.split(split_pattern, resolved_text)
1414  
1415                      for i, fragment in enumerate(text_fragments):
1416                          if not fragment:
1417                              continue
1418                          if fragment in placeholder_map:
1419                              signal_tuple = placeholder_map[fragment]
1420                              signal_type, signal_data = signal_tuple[1], signal_tuple[2]
1421                              if signal_type == "SIGNAL_ARTIFACT_RETURN":
1422                                  # Only convert to FilePart if gateway supports inline artifact resolution
1423                                  if self.supports_inline_artifact_resolution:
1424                                      try:
1425                                          filename, version = (
1426                                              signal_data["filename"],
1427                                              signal_data["version"],
1428                                          )
1429                                          artifact_data = (
1430                                              await load_artifact_content_or_metadata(
1431                                                  self.shared_artifact_service,
1432                                                  **embed_eval_context["session_context"],
1433                                                  filename=filename,
1434                                                  version=version,
1435                                                  load_metadata_only=True,
1436                                              )
1437                                          )
1438                                          if artifact_data.get("status") == "success":
1439                                              uri = format_artifact_uri(
1440                                                  **embed_eval_context["session_context"],
1441                                                  filename=filename,
1442                                                  version=artifact_data.get("version"),
1443                                              )
1444                                              new_parts.append(
1445                                                  a2a.create_file_part_from_uri(
1446                                                      uri,
1447                                                      filename,
1448                                                      artifact_data.get("metadata", {}).get(
1449                                                          "mime_type"
1450                                                      ),
1451                                                  )
1452                                              )
1453                                          else:
1454                                              new_parts.append(
1455                                                  a2a.create_text_part(
1456                                                      f"[Error: Artifact '{filename}' v{version} not found.]"
1457                                                  )
1458                                              )
1459                                      except Exception as e:
1460                                          log.exception(
1461                                              "%s Error handling SIGNAL_ARTIFACT_RETURN: %s",
1462                                              log_id_prefix,
1463                                              e,
1464                                          )
1465                                          new_parts.append(
1466                                              a2a.create_text_part(
1467                                                  f"[Error: Could not retrieve artifact '{signal_data.get('filename')}'.]"
1468                                              )
1469                                          )
1470                                  else:
1471                                      # Legacy gateway mode: pass signal through for gateway to handle
1472                                      other_signals.append(signal_tuple)
1473                              elif signal_type == "SIGNAL_DEEP_RESEARCH_REPORT":
1474                                  # Deep research reports should be rendered by the frontend component
1475                                  # For modern gateways (HTTP SSE), create a DataPart with artifact reference
1476                                  # For legacy gateways, pass through as signal
1477                                  if self.supports_inline_artifact_resolution:
1478                                      try:
1479                                          filename = signal_data["filename"]
1480                                          version = signal_data["version"]
1481                                          log.info(
1482                                              "%s Converting SIGNAL_DEEP_RESEARCH_REPORT to DataPart for frontend rendering: %s v%s",
1483                                              log_id_prefix,
1484                                              filename,
1485                                              version,
1486                                          )
1487                                          # Create a DataPart that the frontend can use to render DeepResearchReportBubble
1488                                          # The frontend will fetch the artifact content separately
1489                                          artifact_data = (
1490                                              await load_artifact_content_or_metadata(
1491                                                  self.shared_artifact_service,
1492                                                  **embed_eval_context["session_context"],
1493                                                  filename=filename,
1494                                                  version=version,
1495                                                  load_metadata_only=True,
1496                                              )
1497                                          )
1498                                          if artifact_data.get("status") == "success":
1499                                              uri = format_artifact_uri(
1500                                                  **embed_eval_context["session_context"],
1501                                                  filename=filename,
1502                                                  version=artifact_data.get("version"),
1503                                              )
1504                                              # Create a DataPart with deep_research_report type
1505                                              # This will be rendered by DeepResearchReportBubble in the frontend
1506                                              data_part = a2a.create_data_part(
1507                                                  data={
1508                                                      "type": "deep_research_report",
1509                                                      "filename": filename,
1510                                                      "version": artifact_data.get("version"),
1511                                                      "uri": uri,
1512                                                  },
1513                                                  metadata={"source": "deep_research_tool"},
1514                                              )
1515                                              new_parts.append(data_part)
1516                                          else:
1517                                              new_parts.append(
1518                                                  a2a.create_text_part(
1519                                                      f"[Error: Deep research report '{filename}' v{version} not found.]"
1520                                                  )
1521                                              )
1522                                      except Exception as e:
1523                                          log.exception(
1524                                              "%s Error handling SIGNAL_DEEP_RESEARCH_REPORT: %s",
1525                                              log_id_prefix,
1526                                              e,
1527                                          )
1528                                          new_parts.append(
1529                                              a2a.create_text_part(
1530                                                  f"[Error: Could not retrieve deep research report '{signal_data.get('filename')}'.]"
1531                                              )
1532                                          )
1533                                  else:
1534                                      # Legacy gateway mode: pass signal through for gateway to handle
1535                                      other_signals.append(signal_tuple)
1536                              elif signal_type == "SIGNAL_INLINE_BINARY_CONTENT":
1537                                  signal_data["content_bytes"] = signal_data.get("bytes")
1538                                  del signal_data["bytes"]
1539                                  new_parts.append(
1540                                      a2a.create_file_part_from_bytes(**signal_data)
1541                                  )
1542                              else:
1543                                  other_signals.append(signal_tuple)
1544                          else:
1545                              # Check if the non-placeholder fragment is just whitespace
1546                              # and is between two placeholders. If so, drop it.
1547                              is_just_whitespace = not fragment.strip()
1548                              prev_fragment_was_placeholder = (
1549                                  i > 0 and text_fragments[i - 1] in placeholder_map
1550                              )
1551                              next_fragment_is_placeholder = (
1552                                  i < len(text_fragments) - 1
1553                                  and text_fragments[i + 1] in placeholder_map
1554                              )
1555  
1556                              if (
1557                                  is_just_whitespace
1558                                  and prev_fragment_was_placeholder
1559                                  and next_fragment_is_placeholder
1560                              ):
1561                                  log.debug(
1562                                      "%s Dropping whitespace fragment between two file signals.",
1563                                      log_id_prefix,
1564                                  )
1565                                  continue
1566  
1567                              new_parts.append(a2a.create_text_part(text=fragment))
1568  
1569                  if is_streaming_status_update:
1570                      current_buffer = text_to_resolve[processed_idx:]
1571  
1572              elif isinstance(part, FilePart) and part.file:
1573                  # Handle recursive embeds in text-based FileParts
1574                  new_parts.append(part)  # Placeholder for now
1575              elif isinstance(part, DataPart):
1576                  # Handle special DataPart types
1577                  data_type = part.data.get("type") if part.data else None
1578  
1579                  if data_type == "template_block":
1580                      # Resolve template block and replace with resolved text
1581                      try:
1582                          from ...common.utils.templates import resolve_template_blocks_in_string
1583  
1584                          # Reconstruct the template block syntax
1585                          data_artifact = part.data.get("data_artifact", "")
1586                          jsonpath = part.data.get("jsonpath")
1587                          limit = part.data.get("limit")
1588                          template_content = part.data.get("template_content", "")
1589  
1590                          # Build params string
1591                          params_parts = [f'data="{data_artifact}"']
1592                          if jsonpath:
1593                              params_parts.append(f'jsonpath="{jsonpath}"')
1594                          if limit is not None:
1595                              params_parts.append(f'limit="{limit}"')
1596                          params_str = " ".join(params_parts)
1597  
1598                          # Reconstruct full template block
1599                          template_block = f"«««template: {params_str}\n{template_content}\n»»»"
1600  
1601                          log.debug(
1602                              "%s Resolving template block inline: data=%s",
1603                              log_id_prefix,
1604                              data_artifact,
1605                          )
1606  
1607                          # Resolve the template
1608                          resolved_text = await resolve_template_blocks_in_string(
1609                              text=template_block,
1610                              artifact_service=self.shared_artifact_service,
1611                              session_context={
1612                                  "app_name": external_request_context.get(
1613                                      "app_name_for_artifacts", self.gateway_id
1614                                  ),
1615                                  "user_id": external_request_context.get("user_id_for_artifacts"),
1616                                  "session_id": external_request_context.get("a2a_session_id"),
1617                              },
1618                              log_identifier=f"{log_id_prefix}[TemplateResolve]",
1619                          )
1620  
1621                          log.info(
1622                              "%s Template resolved successfully. Output length: %d",
1623                              log_id_prefix,
1624                              len(resolved_text),
1625                          )
1626  
1627                          # Replace the DataPart with a TextPart containing the resolved content
1628                          new_parts.append(a2a.create_text_part(text=resolved_text))
1629  
1630                      except Exception as e:
1631                          log.error(
1632                              "%s Failed to resolve template block: %s",
1633                              log_id_prefix,
1634                              e,
1635                              exc_info=True,
1636                          )
1637                          # Send error message as TextPart
1638                          error_text = f"[Template rendering error: {str(e)}]"
1639                          new_parts.append(a2a.create_text_part(text=error_text))
1640  
1641                  elif (
1642                      data_type == "artifact_creation_progress"
1643                      and not self.supports_inline_artifact_resolution
1644                  ):
1645                      # Legacy gateway mode: convert completed artifact creation to FilePart
1646                      status = part.data.get("status")
1647                      if status == "completed" and not is_finalizing_context:
1648                          # Extract artifact info from the DataPart
1649                          filename = part.data.get("filename")
1650                          version = part.data.get("version")
1651                          mime_type = part.data.get("mime_type")
1652  
1653                          if filename and version is not None:
1654                              log.info(
1655                                  "%s Converting artifact creation completion to FilePart for legacy gateway: %s v%s",
1656                                  log_id_prefix,
1657                                  filename,
1658                                  version,
1659                              )
1660                              # This will be sent as an artifact signal, so don't add to new_parts
1661                              # Instead, add to other_signals for processing
1662                              signal_tuple = (
1663                                  None,
1664                                  "SIGNAL_ARTIFACT_CREATION_COMPLETE",
1665                                  {
1666                                      "filename": filename,
1667                                      "version": version,
1668                                      "mime_type": mime_type,
1669                                  },
1670                              )
1671                              other_signals.append(signal_tuple)
1672                          else:
1673                              # Missing required info, keep the DataPart as-is
1674                              new_parts.append(part)
1675                      elif status == "completed" and is_finalizing_context:
1676                          # Suppress during finalizing to avoid duplicates
1677                          log.debug(
1678                              "%s Suppressing artifact creation completion during finalizing context for %s",
1679                              log_id_prefix,
1680                              part.data.get("filename"),
1681                          )
1682                          continue
1683                      else:
1684                          # Keep in-progress or failed status DataParts
1685                          new_parts.append(part)
1686                  else:
1687                      # Not an artifact creation DataPart, or modern gateway - keep as-is
1688                      new_parts.append(part)
1689              else:
1690                  new_parts.append(part)
1691  
1692          if other_signals:
1693              await self._handle_resolved_signals(
1694                  external_request_context,
1695                  other_signals,
1696                  original_rpc_id,
1697                  is_finalizing_context,
1698              )
1699  
1700          if new_parts != original_parts:
1701              content_modified = True
1702              if isinstance(parts_owner, A2AMessage):
1703                  if isinstance(event_with_parts, TaskStatusUpdateEvent):
1704                      event_with_parts.status.message = a2a.update_message_parts(
1705                          parts_owner, new_parts
1706                      )
1707                  elif isinstance(event_with_parts, Task):
1708                      event_with_parts.status.message = a2a.update_message_parts(
1709                          parts_owner, new_parts
1710                      )
1711              elif isinstance(parts_owner, A2AArtifact):
1712                  event_with_parts.artifact = a2a.update_artifact_parts(
1713                      parts_owner, new_parts
1714                  )
1715  
1716          if is_streaming_status_update:
1717              self.task_context_manager.store_context(stream_buffer_key, current_buffer)
1718  
1719          return content_modified or bool(other_signals)
1720  
1721      async def _process_parsed_a2a_event(
1722          self,
1723          parsed_event: Union[
1724              Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError
1725          ],
1726          external_request_context: Dict[str, Any],
1727          a2a_task_id: str,
1728          original_rpc_id: Optional[str],
1729      ) -> None:
1730          """
1731          Processes a parsed A2A event: resolves embeds, handles signals,
1732          sends to external, and manages context.
1733          """
1734          log_id_prefix = f"{self.log_identifier}[ProcessParsed:{a2a_task_id}]"
1735          is_truly_final_event_for_context_cleanup = False
1736          is_finalizing_context_for_embeds = False
1737  
1738          if isinstance(parsed_event, JSONRPCError):
1739              log.warning(
1740                  "%s Handling JSONRPCError for task %s.", log_id_prefix, a2a_task_id
1741              )
1742              await self._send_error_to_external(external_request_context, parsed_event)
1743              is_truly_final_event_for_context_cleanup = True
1744          else:
1745              content_was_modified_or_signals_handled = False
1746  
1747              if isinstance(parsed_event, TaskStatusUpdateEvent) and parsed_event.final:
1748                  is_finalizing_context_for_embeds = True
1749              elif isinstance(parsed_event, Task):
1750                  is_finalizing_context_for_embeds = True
1751  
1752              if not isinstance(parsed_event, JSONRPCError):
1753                  content_was_modified_or_signals_handled = (
1754                      await self._resolve_embeds_and_handle_signals(
1755                          parsed_event,
1756                          external_request_context,
1757                          a2a_task_id,
1758                          original_rpc_id,
1759                          is_finalizing_context=is_finalizing_context_for_embeds,
1760                      )
1761                  )
1762  
1763              if self.resolve_artifact_uris_in_gateway:
1764                  log.debug(
1765                      "%s Resolving artifact URIs before sending to external...",
1766                      log_id_prefix,
1767                  )
1768                  await self._resolve_uris_in_payload(
1769                      parsed_event, external_request_context
1770                  )
1771  
1772              send_this_event_to_external = True
1773              is_final_chunk_of_status_update = False
1774  
1775              if isinstance(parsed_event, TaskStatusUpdateEvent):
1776                  # Try enterprise handling for input_required state (OAuth authentication)
1777                  if parsed_event.status and parsed_event.status.state == TaskState.input_required:
1778                      try:
1779                          from solace_agent_mesh_enterprise.auth.input_required import (
1780                              handle_input_required_request,
1781                          )
1782                          parsed_event = handle_input_required_request(
1783                              parsed_event,
1784                              a2a_task_id,
1785                              self  # Gateway component for caching
1786                          )
1787                      except ImportError:
1788                          pass  # Enterprise not installed
1789  
1790                  is_final_chunk_of_status_update = parsed_event.final
1791                  if (
1792                      not (
1793                          parsed_event.status
1794                          and parsed_event.status.message
1795                          and parsed_event.status.message.parts
1796                      )
1797                      and not parsed_event.metadata
1798                      and not is_final_chunk_of_status_update
1799                      and not content_was_modified_or_signals_handled
1800                  ):
1801                      send_this_event_to_external = False
1802                      log.debug(
1803                          "%s Suppressing empty intermediate status update.",
1804                          log_id_prefix,
1805                      )
1806              elif isinstance(parsed_event, TaskArtifactUpdateEvent):
1807                  if (
1808                      not (parsed_event.artifact and parsed_event.artifact.parts)
1809                      and not content_was_modified_or_signals_handled
1810                  ):
1811                      send_this_event_to_external = False
1812                      log.debug("%s Suppressing empty artifact update.", log_id_prefix)
1813              elif isinstance(parsed_event, Task):
1814                  is_truly_final_event_for_context_cleanup = True
1815  
1816                  if (
1817                      self._RESOLVE_EMBEDS_IN_FINAL_RESPONSE
1818                      and parsed_event.status
1819                      and parsed_event.status.message
1820                      and parsed_event.status.message.parts
1821                  ):
1822                      log.debug(
1823                          "%s Resolving embeds in final task response...", log_id_prefix
1824                      )
1825                      message = parsed_event.status.message
1826                      combined_text = a2a.get_text_from_message(message)
1827                      data_parts = a2a.get_data_parts_from_message(message)
1828                      file_parts = a2a.get_file_parts_from_message(message)
1829                      non_text_parts = data_parts + file_parts
1830  
1831                      if combined_text:
1832                          embed_eval_context = {
1833                              "artifact_service": self.shared_artifact_service,
1834                              "session_context": {
1835                                  "app_name": external_request_context.get(
1836                                      "app_name_for_artifacts", self.gateway_id
1837                                  ),
1838                                  "user_id": external_request_context.get(
1839                                      "user_id_for_artifacts"
1840                                  ),
1841                                  "session_id": external_request_context.get(
1842                                      "a2a_session_id"
1843                                  ),
1844                              },
1845                          }
1846                          embed_eval_config = {
1847                              "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes,
1848                              "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth,
1849                          }
1850                          all_embed_types = EARLY_EMBED_TYPES.union(LATE_EMBED_TYPES)
1851                          resolved_text, _, signals = await resolve_embeds_in_string(
1852                              text=combined_text,
1853                              context=embed_eval_context,
1854                              resolver_func=evaluate_embed,
1855                              types_to_resolve=all_embed_types,
1856                              resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER,
1857                              log_identifier=log_id_prefix,
1858                              config=embed_eval_config,
1859                          )
1860                          if signals:
1861                              log.debug(
1862                                  "%s Handling %d signals found during final response embed resolution.",
1863                                  log_id_prefix,
1864                                  len(signals),
1865                              )
1866                              await self._handle_resolved_signals(
1867                                  external_request_context,
1868                                  signals,
1869                                  original_rpc_id,
1870                                  is_finalizing_context=True,
1871                              )
1872  
1873                          new_parts = (
1874                              [a2a.create_text_part(text=resolved_text)]
1875                              if resolved_text
1876                              else []
1877                          )
1878                          new_parts.extend(non_text_parts)
1879                          parsed_event.status.message = a2a.update_message_parts(
1880                              message=parsed_event.status.message,
1881                              new_parts=new_parts,
1882                          )
1883                          log.info(
1884                              "%s Final response text updated with resolved embeds.",
1885                              log_id_prefix,
1886                          )
1887  
1888                  final_buffer_key = f"{a2a_task_id}_stream_buffer"
1889                  remaining_buffer = self.task_context_manager.get_context(
1890                      final_buffer_key
1891                  )
1892                  if remaining_buffer:
1893                      log.info(
1894                          "%s Flushing remaining buffer for task %s before final response.",
1895                          log_id_prefix,
1896                          a2a_task_id,
1897                      )
1898                      embed_eval_context = {
1899                          "artifact_service": self.shared_artifact_service,
1900                          "session_context": {
1901                              "app_name": external_request_context.get(
1902                                  "app_name_for_artifacts", self.gateway_id
1903                              ),
1904                              "user_id": external_request_context.get(
1905                                  "user_id_for_artifacts"
1906                              ),
1907                              "session_id": external_request_context.get(
1908                                  "a2a_session_id"
1909                              ),
1910                          },
1911                      }
1912                      embed_eval_config = {
1913                          "gateway_max_artifact_resolve_size_bytes": self.gateway_max_artifact_resolve_size_bytes,
1914                          "gateway_recursive_embed_depth": self.gateway_recursive_embed_depth,
1915                      }
1916                      resolved_remaining_text, _, signals = (
1917                          await resolve_embeds_in_string(
1918                              text=remaining_buffer,
1919                              context=embed_eval_context,
1920                              resolver_func=evaluate_embed,
1921                              types_to_resolve=LATE_EMBED_TYPES.copy(),
1922                              resolution_mode=ResolutionMode.A2A_MESSAGE_TO_USER,
1923                              log_identifier=log_id_prefix,
1924                              config=embed_eval_config,
1925                          )
1926                      )
1927                      await self._handle_resolved_signals(
1928                          external_request_context,
1929                          signals,
1930                          original_rpc_id,
1931                          is_finalizing_context=True,
1932                      )
1933                      if resolved_remaining_text:
1934                          flush_message = a2a.create_agent_text_message(
1935                              text=resolved_remaining_text
1936                          )
1937                          flush_event = a2a.create_status_update(
1938                              task_id=a2a_task_id,
1939                              context_id=external_request_context.get("a2a_session_id"),
1940                              message=flush_message,
1941                              is_final=False,
1942                          )
1943                          await self._send_update_to_external(
1944                              external_request_context, flush_event, True
1945                          )
1946                      self.task_context_manager.remove_context(final_buffer_key)
1947  
1948              if send_this_event_to_external:
1949                  if isinstance(parsed_event, Task):
1950                      # Filter DataParts from final Task if gateway has filtering enabled
1951                      # This prevents tool results and other internal data from appearing in user-facing output
1952                      if (
1953                          self.filter_tool_data_parts
1954                          and parsed_event.status
1955                          and parsed_event.status.message
1956                          and parsed_event.status.message.parts
1957                      ):
1958                          original_parts = a2a.get_parts_from_message(
1959                              parsed_event.status.message
1960                          )
1961                          filtered_parts = [
1962                              part
1963                              for part in original_parts
1964                              if self._should_include_data_part_in_final_output(part)
1965                          ]
1966                          if len(filtered_parts) != len(original_parts):
1967                              log.debug(
1968                                  "%s Filtered %d DataParts from final Task message",
1969                                  log_id_prefix,
1970                                  len(original_parts) - len(filtered_parts),
1971                              )
1972                              parsed_event.status.message = a2a.update_message_parts(
1973                                  parsed_event.status.message, filtered_parts
1974                              )
1975  
1976                      await self._send_final_response_to_external(
1977                          external_request_context, parsed_event
1978                      )
1979                  elif isinstance(
1980                      parsed_event, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
1981                  ):
1982                      final_chunk_flag = (
1983                          is_final_chunk_of_status_update
1984                          if isinstance(parsed_event, TaskStatusUpdateEvent)
1985                          else False
1986                      )
1987                      await self._send_update_to_external(
1988                          external_request_context, parsed_event, final_chunk_flag
1989                      )
1990  
1991          # Manage task timeout timer: cancel on final events, reset on intermediate activity
1992          if is_truly_final_event_for_context_cleanup:
1993              self._cancel_task_timeout(a2a_task_id)
1994          else:
1995              self._start_task_timeout(a2a_task_id)
1996  
1997          if is_truly_final_event_for_context_cleanup:
1998              log.info(
1999                  "%s Truly final event processed for task %s. Closing connections and removing context.",
2000                  log_id_prefix,
2001                  a2a_task_id,
2002              )
2003              await self._close_external_connections(external_request_context)
2004              self.task_context_manager.remove_context(a2a_task_id)
2005              self.task_context_manager.remove_context(f"{a2a_task_id}_stream_buffer")
2006  
2007      async def _handle_agent_event(
2008          self, topic: str, payload: Dict, task_id_from_topic: str
2009      ) -> bool:
2010          """
2011          Handles messages received on gateway response and status topics.
2012          Parses the payload, retrieves context using task_id_from_topic, and dispatches for processing.
2013          """
2014          try:
2015              rpc_response = JSONRPCResponse.model_validate(payload)
2016          except Exception as e:
2017              log.error(
2018                  "%s Failed to parse payload as JSONRPCResponse for topic %s (Task ID from topic: %s): %s. Payload: %s",
2019                  self.log_identifier,
2020                  topic,
2021                  task_id_from_topic,
2022                  e,
2023                  payload,
2024              )
2025              return False
2026  
2027          original_rpc_id = str(a2a.get_response_id(rpc_response))
2028  
2029          external_request_context = self.task_context_manager.get_context(
2030              task_id_from_topic
2031          )
2032          if not external_request_context:
2033              log.warning(
2034                  "%s No external context found for A2A Task ID: %s (from topic). Ignoring message. Topic: %s, RPC ID: %s",
2035                  self.log_identifier,
2036                  task_id_from_topic,
2037                  topic,
2038                  original_rpc_id,
2039              )
2040              return True
2041  
2042          external_request_context["a2a_task_id_for_event"] = task_id_from_topic
2043          external_request_context["original_rpc_id"] = original_rpc_id
2044  
2045          parsed_event_obj: Union[
2046              Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, JSONRPCError, None
2047          ] = None
2048          error = a2a.get_response_error(rpc_response)
2049          if error:
2050              parsed_event_obj = error
2051          else:
2052              result = a2a.get_response_result(rpc_response)
2053              if result:
2054                  # The result is already a parsed Pydantic model.
2055                  parsed_event_obj = result
2056  
2057              # Validate task ID match
2058              actual_task_id = None
2059              if isinstance(parsed_event_obj, Task):
2060                  actual_task_id = parsed_event_obj.id
2061              elif isinstance(
2062                  parsed_event_obj, (TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
2063              ):
2064                  actual_task_id = parsed_event_obj.task_id
2065  
2066              if (
2067                  task_id_from_topic
2068                  and actual_task_id
2069                  and actual_task_id != task_id_from_topic
2070              ):
2071                  log.error(
2072                      "%s Task ID mismatch! Expected: %s, Got from payload: %s.",
2073                      self.log_identifier,
2074                      task_id_from_topic,
2075                      actual_task_id,
2076                  )
2077                  parsed_event_obj = None
2078  
2079          if not parsed_event_obj:
2080              log.error(
2081                  "%s Failed to parse or validate A2A event from RPC result for task %s. Result: %s",
2082                  self.log_identifier,
2083                  task_id_from_topic,
2084                  a2a.get_response_result(rpc_response) or "N/A",
2085              )
2086              generic_error = JSONRPCError(
2087                  code=-32000, message="Invalid event structure received from agent."
2088              )
2089              await self._send_error_to_external(external_request_context, generic_error)
2090              await self._close_external_connections(external_request_context)
2091              self.task_context_manager.remove_context(task_id_from_topic)
2092              self.task_context_manager.remove_context(
2093                  f"{task_id_from_topic}_stream_buffer"
2094              )
2095              return False
2096  
2097          try:
2098              await self._process_parsed_a2a_event(
2099                  parsed_event_obj,
2100                  external_request_context,
2101                  task_id_from_topic,
2102                  original_rpc_id,
2103              )
2104              return True
2105          except Exception as e:
2106              log.exception(
2107                  "%s Error in _process_parsed_a2a_event for task %s: %s",
2108                  self.log_identifier,
2109                  task_id_from_topic,
2110                  e,
2111              )
2112              error_obj = JSONRPCError(
2113                  code=-32000, message=f"Gateway processing error: {e}"
2114              )
2115              await self._send_error_to_external(external_request_context, error_obj)
2116              await self._close_external_connections(external_request_context)
2117              self.task_context_manager.remove_context(task_id_from_topic)
2118              self.task_context_manager.remove_context(
2119                  f"{task_id_from_topic}_stream_buffer"
2120              )
2121              return False
2122  
2123      # --- Task Timeout Methods ---
2124  
2125      def _task_timeout_timer_id(self, task_id: str) -> str:
2126          return f"task_timeout_{task_id}"
2127  
2128      def _start_task_timeout(self, task_id: str) -> None:
2129          """Start (or restart) the idle timeout timer for a task."""
2130          if self.task_timeout_seconds <= 0:
2131              return
2132          timer_id = self._task_timeout_timer_id(task_id)
2133          # Cancel existing timer before starting a new one (reset)
2134          self.cancel_timer(timer_id)
2135          log.debug(
2136              "%s Starting task timeout timer for task %s (%d seconds)",
2137              self.log_identifier,
2138              task_id,
2139              self.task_timeout_seconds,
2140          )
2141          SamComponentBase.add_timer(
2142              self,
2143              delay_ms=self.task_timeout_seconds * 1000,
2144              timer_id=timer_id,
2145              interval_ms=None,
2146              callback=lambda td, tid=task_id: self._on_task_timeout(tid),
2147          )
2148  
2149      def _cancel_task_timeout(self, task_id: str) -> None:
2150          """Cancel the idle timeout timer for a task."""
2151          if self.task_timeout_seconds <= 0:
2152              return
2153          self.cancel_timer(self._task_timeout_timer_id(task_id))
2154  
2155      def _on_task_timeout(self, task_id: str) -> None:
2156          """Timer callback when a task times out. Schedules async cleanup on the event loop."""
2157          log.warning(
2158              "%s Task %s timed out after %d seconds of inactivity",
2159              self.log_identifier,
2160              task_id,
2161              self.task_timeout_seconds,
2162          )
2163          asyncio.run_coroutine_threadsafe(
2164              self._handle_task_timeout(task_id), self.get_async_loop()
2165          )
2166  
2167      async def _handle_task_timeout(self, task_id: str) -> None:
2168          """Handle a timed-out task. Subclasses should override to send errors to the adapter."""
2169          log.warning(
2170              "%s Task %s timed out but _handle_task_timeout is not overridden",
2171              self.log_identifier,
2172              task_id,
2173          )
2174  
2175      async def _async_setup_and_run(self) -> None:
2176          """Main async logic for the gateway component."""
2177          # Call base class to initialize Trust Manager
2178          await super()._async_setup_and_run()
2179  
2180          if self._gateway_card_publishing_config.get("enabled", True):
2181              self._start_gateway_card_publishing()
2182  
2183          log.info(
2184              "%s Starting _start_listener() to initiate external platform connection.",
2185              self.log_identifier,
2186          )
2187          self._start_listener()
2188  
2189          await self._message_processor_loop()
2190  
2191      def _pre_async_cleanup(self) -> None:
2192          """Pre-cleanup actions for the gateway component."""
2193          # Cleanup Trust Manager if present (ENTERPRISE FEATURE)
2194          if self.trust_manager:
2195              try:
2196                  log.info("%s Cleaning up Trust Manager...", self.log_identifier)
2197                  self.trust_manager.cleanup(self.cancel_timer)
2198                  log.info("%s Trust Manager cleanup complete", self.log_identifier)
2199              except Exception as e:
2200                  log.error(
2201                      "%s Error during Trust Manager cleanup: %s", self.log_identifier, e
2202                  )
2203  
2204          log.info("%s Calling _stop_listener()...", self.log_identifier)
2205          self._stop_listener()
2206  
2207          if self.internal_event_queue:
2208              log.info(
2209                  "%s Signaling _message_processor_loop to stop by putting sentinel on queue...",
2210                  self.log_identifier,
2211              )
2212              # This unblocks the `self.internal_event_queue.get()` call in the loop
2213              self.internal_event_queue.put(None)
2214  
2215      async def _message_processor_loop(self):
2216          log.debug("%s Starting message processor loop as an asyncio task...", self.log_identifier)
2217          loop = self.get_async_loop()
2218  
2219          while not self.stop_signal.is_set():
2220              original_broker_message: Optional[SolaceMessage] = None
2221              item = None
2222              processed_successfully = False
2223              topic = None
2224  
2225              try:
2226                  item = await loop.run_in_executor(None, self.internal_event_queue.get)
2227  
2228                  if item is None:
2229                      log.info(
2230                          "%s Received shutdown sentinel. Exiting message processor loop.",
2231                          self.log_identifier,
2232                      )
2233                      break
2234  
2235                  topic = item.get("topic")
2236                  payload = item.get("payload")
2237                  original_broker_message = item.get("_original_broker_message")
2238  
2239                  if not topic or payload is None or not original_broker_message:
2240                      log.warning(
2241                          "%s Invalid item received from internal queue: %s",
2242                          self.log_identifier,
2243                          item,
2244                      )
2245                      processed_successfully = False
2246                      continue
2247  
2248                  if a2a.topic_matches_subscription(
2249                      topic, a2a.get_discovery_subscription_topic(self.namespace)
2250                  ):
2251                      processed_successfully = await self._handle_discovery_message(
2252                          payload
2253                      )
2254                  elif (
2255                      hasattr(self, "trust_manager")
2256                      and self.trust_manager
2257                      and self.trust_manager.is_trust_card_topic(topic)
2258                  ):
2259                      await self.trust_manager.handle_trust_card_message(payload, topic)
2260                      processed_successfully = True
2261                  elif a2a.topic_matches_subscription(
2262                      topic,
2263                      a2a.get_gateway_response_subscription_topic(
2264                          self.namespace, self.gateway_id
2265                      ),
2266                  ) or a2a.topic_matches_subscription(
2267                      topic,
2268                      a2a.get_gateway_status_subscription_topic(
2269                          self.namespace, self.gateway_id
2270                      ),
2271                  ):
2272                      task_id_from_topic: Optional[str] = None
2273                      response_sub = a2a.get_gateway_response_subscription_topic(
2274                          self.namespace, self.gateway_id
2275                      )
2276                      status_sub = a2a.get_gateway_status_subscription_topic(
2277                          self.namespace, self.gateway_id
2278                      )
2279  
2280                      if a2a.topic_matches_subscription(topic, response_sub):
2281                          task_id_from_topic = a2a.extract_task_id_from_topic(
2282                              topic, response_sub, self.log_identifier
2283                          )
2284                      elif a2a.topic_matches_subscription(topic, status_sub):
2285                          task_id_from_topic = a2a.extract_task_id_from_topic(
2286                              topic, status_sub, self.log_identifier
2287                          )
2288  
2289                      if task_id_from_topic:
2290                          processed_successfully = await self._handle_agent_event(
2291                              topic, payload, task_id_from_topic
2292                          )
2293                      else:
2294                          log.error(
2295                              "%s Could not extract task_id from topic %s for _handle_agent_event. Ignoring.",
2296                              self.log_identifier,
2297                              topic,
2298                          )
2299                          processed_successfully = False
2300                  else:
2301                      log.warning(
2302                          "%s Received message on unhandled topic: %s. Acknowledging.",
2303                          self.log_identifier,
2304                          topic,
2305                      )
2306                      processed_successfully = True
2307  
2308              except queue.Empty:
2309                  continue
2310              except asyncio.CancelledError:
2311                  log.info("%s Message processor loop cancelled.", self.log_identifier)
2312                  break
2313              except Exception as e:
2314                  log.exception(
2315                      "%s Unhandled error in message processor loop: %s",
2316                      self.log_identifier,
2317                      e,
2318                  )
2319                  processed_successfully = False
2320                  await asyncio.sleep(1)
2321              finally:
2322                  if original_broker_message:
2323                      if processed_successfully:
2324                          original_broker_message.call_acknowledgements()
2325                      else:
2326                          original_broker_message.call_negative_acknowledgements()
2327                          log.warning(
2328                              "%s NACKed SolaceMessage for topic: %s",
2329                              self.log_identifier,
2330                              topic or "unknown",
2331                          )
2332  
2333                  if item and item is not None:
2334                      self.internal_event_queue.task_done()
2335  
2336          log.info("%s Message processor loop finished.", self.log_identifier)
2337  
2338  
2339  
2340      @abstractmethod
2341      async def _extract_initial_claims(
2342          self, external_event_data: Any
2343      ) -> Optional[Dict[str, Any]]:
2344          """
2345          Extracts the primary identity claims from a platform-specific event.
2346          This method MUST be implemented by derived gateway components.
2347  
2348          Args:
2349              external_event_data: Raw event data from the external platform
2350                                   (e.g., FastAPIRequest, Slack event dictionary).
2351  
2352          Returns:
2353              A dictionary of initial claims, which MUST include an 'id' key.
2354              Example: {"id": "user@example.com", "source": "slack_api"}
2355              Return None if authentication fails.
2356          """
2357          pass
2358  
2359      @abstractmethod
2360      async def _translate_external_input(
2361          self, external_event: Any
2362      ) -> Tuple[str, List[ContentPart], Dict[str, Any]]:
2363          """
2364          Translates raw platform-specific event data into A2A task parameters.
2365  
2366          Args:
2367              external_event: Raw event data from the external platform
2368                              (e.g., FastAPIRequest, Slack event dictionary).
2369  
2370          Returns:
2371              A tuple containing:
2372              - target_agent_name (str): The name of the A2A agent to target.
2373              - a2a_parts (List[ContentPart]): A list of A2A Part objects.
2374              - external_request_context (Dict[str, Any]): Context for TaskContextManager.
2375          """
2376          pass
2377  
2378      @abstractmethod
2379      def _start_listener(self) -> None:
2380          pass
2381  
2382      @abstractmethod
2383      def _stop_listener(self) -> None:
2384          pass
2385  
2386      @abstractmethod
2387      async def _send_update_to_external(
2388          self,
2389          external_request_context: Dict[str, Any],
2390          event_data: Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent],
2391          is_final_chunk_of_update: bool,
2392      ) -> None:
2393          pass
2394  
2395      @abstractmethod
2396      async def _send_final_response_to_external(
2397          self, external_request_context: Dict[str, Any], task_data: Task
2398      ) -> None:
2399          pass
2400  
2401      @abstractmethod
2402      async def _send_error_to_external(
2403          self, external_request_context: Dict[str, Any], error_data: JSONRPCError
2404      ) -> None:
2405          pass
2406  
2407      async def _close_external_connections(
2408          self, external_request_context: Dict[str, Any]
2409      ) -> None:
2410          """Close external connections (e.g., SSE) during context cleanup.
2411  
2412          Called after the final event is fully processed, ensuring any
2413          pending status updates are delivered before the connection closes.
2414          Subclasses should override to perform transport-specific cleanup.
2415          """
2416          pass
2417  
2418      def _detect_gateway_type(self) -> str:
2419          """Auto-detect gateway type from component class or configuration."""
2420          configured_type = self.get_config("gateway_type")
2421          if configured_type:
2422              return configured_type
2423  
2424          class_name = self.__class__.__name__
2425          if "WebUI" in class_name or "HttpSse" in class_name:
2426              return "http_sse"
2427  
2428          if hasattr(self, 'adapter') and self.adapter:
2429              adapter_name = self.adapter.__class__.__name__.lower()
2430              if "rest" in adapter_name:
2431                  return "rest"
2432              if "slack" in adapter_name:
2433                  return "slack"
2434              if "teams" in adapter_name:
2435                  return "teams"
2436  
2437          return "generic"
2438  
2439      def _build_gateway_card(self) -> AgentCard:
2440          """Build gateway discovery card as AgentCard with gateway extension."""
2441          from a2a.types import AgentCapabilities, AgentExtension
2442  
2443          gateway_type = self._detect_gateway_type()
2444          gateway_url = f"solace:{self.namespace}/a2a/v1/gateway/request/{self.gateway_id}"
2445          description = self._gateway_card_config.get(
2446              "description",
2447              f"{gateway_type.upper()} Gateway"
2448          )
2449  
2450          gateway_role_extension = AgentExtension(
2451              uri="https://solace.com/a2a/extensions/sam/gateway-role",
2452              required=False,
2453              params={
2454                  "gateway_id": self.gateway_id,
2455                  "gateway_type": gateway_type,
2456                  "namespace": self.namespace,
2457              }
2458          )
2459  
2460          extensions = [gateway_role_extension]
2461  
2462          deployment_config = self.get_config("deployment", {})
2463          deployment_id = deployment_config.get("id") if isinstance(deployment_config, dict) else None
2464          if deployment_id:
2465              deployment_extension = AgentExtension(
2466                  uri="https://solace.com/a2a/extensions/sam/deployment",
2467                  required=False,
2468                  params={
2469                      "deployment_id": deployment_id,
2470                  }
2471              )
2472              extensions.append(deployment_extension)
2473  
2474          try:
2475              from solace_agent_mesh import __version__ as sam_version
2476          except ImportError:
2477              sam_version = "unknown"
2478  
2479          gateway_card = AgentCard(
2480              name=self.gateway_id,
2481              url=gateway_url,
2482              description=description,
2483              version=sam_version,
2484              protocol_version="1.0",
2485              capabilities=AgentCapabilities(
2486                  supports_streaming=True,
2487                  supports_cancellation=True,
2488                  extensions=extensions
2489              ),
2490              default_input_modes=self._gateway_card_config.get("defaultInputModes", ["text"]),
2491              default_output_modes=self._gateway_card_config.get("defaultOutputModes", ["text"]),
2492              skills=self._gateway_card_config.get("skills", []),
2493          )
2494  
2495          return gateway_card
2496  
2497      def _publish_gateway_card(self) -> None:
2498          """Publish gateway card to gateway discovery topic."""
2499          try:
2500              gateway_card = self._build_gateway_card()
2501              discovery_topic = a2a.get_gateway_discovery_topic(self.namespace)
2502  
2503              payload = gateway_card.model_dump(by_alias=True, exclude_none=True)
2504              self.publish_a2a_message(payload, discovery_topic)
2505  
2506              log.debug(
2507                  "%s Published gateway card: gateway_id=%s, type=%s, topic=%s",
2508                  self.log_identifier,
2509                  self.gateway_id,
2510                  self._detect_gateway_type(),
2511                  discovery_topic
2512              )
2513          except Exception as e:
2514              log.error(
2515                  "%s Failed to publish gateway card: %s",
2516                  self.log_identifier,
2517                  e,
2518                  exc_info=True
2519              )
2520  
2521      def _start_gateway_card_publishing(self) -> None:
2522          """Start periodic gateway card publishing."""
2523          interval_seconds = self._gateway_card_publishing_config.get("interval_seconds", 30)
2524  
2525          if interval_seconds <= 0:
2526              log.info(
2527                  "%s Gateway card publishing disabled (interval_seconds=%d)",
2528                  self.log_identifier,
2529                  interval_seconds
2530              )
2531              return
2532  
2533          log.info(
2534              "%s Starting gateway card publishing every %d seconds",
2535              self.log_identifier,
2536              interval_seconds
2537          )
2538  
2539          SamComponentBase.add_timer(
2540              self,
2541              delay_ms=1000,
2542              timer_id=self._gateway_card_timer_id,
2543              interval_ms=interval_seconds * 1000,
2544              callback=lambda timer_data: self._publish_gateway_card()
2545          )
2546  
2547      def _get_component_id(self) -> str:
2548          """Returns the gateway ID as the component identifier."""
2549          return self.gateway_id
2550  
2551      def _get_component_type(self) -> str:
2552          """Returns 'gateway' as the component type."""
2553          return "gateway"
2554  
2555      def invoke(self, message, data):
2556          if isinstance(message, SolaceMessage):
2557              message.call_acknowledgements()
2558          log.warning("%s Invoke method called unexpectedly.", self.log_identifier)
2559          return None