/ src / solace_agent_mesh / agent / adk / callbacks.py
callbacks.py
   1  """
   2  ADK Callbacks for the A2A Host Component.
   3  Includes dynamic instruction injection, artifact metadata injection,
   4  embed resolution, and logging.
   5  """
   6  import logging
   7  import json
   8  import asyncio
   9  import time
  10  import uuid
  11  import re
  12  from typing import Any, Dict, Optional, TYPE_CHECKING, List
  13  from collections import defaultdict
  14  
  15  from google.adk.tools import BaseTool, ToolContext
  16  from google.adk.artifacts import BaseArtifactService
  17  from google.adk.agents.callback_context import CallbackContext
  18  from google.adk.models.llm_request import LlmRequest
  19  from google.adk.models.llm_response import LlmResponse
  20  from google.genai import types as adk_types
  21  from google.adk.tools.mcp_tool import MCPTool
  22  
  23  from .intelligent_mcp_callbacks import (
  24      save_mcp_response_as_artifact_intelligent,
  25      McpSaveStatus,
  26  )
  27  
  28  from ...agent.utils.artifact_helpers import (
  29      METADATA_SUFFIX,
  30      format_metadata_for_llm,
  31  )
  32  from ...agent.utils.context_helpers import (
  33      get_original_session_id,
  34      get_session_from_callback_context,
  35  )
  36  from ..tools.tool_definition import BuiltinTool
  37  from ..tools.workflow_tool import WorkflowAgentTool, WORKFLOW_TOOL_PREFIX
  38  from ..tools.peer_agent_tool import PEER_TOOL_PREFIX
  39  
  40  from ...common.utils.embeds import (
  41      EMBED_DELIMITER_OPEN,
  42      EMBED_DELIMITER_CLOSE,
  43      EMBED_CHAIN_DELIMITER,
  44      EARLY_EMBED_TYPES,
  45      evaluate_embed,
  46      resolve_embeds_in_string,
  47  )
  48  from ...common.utils.embeds.types import ResolutionMode
  49  
  50  from ...common.utils.embeds.modifiers import MODIFIER_IMPLEMENTATIONS
  51  
  52  from ...common import a2a
  53  from ...common.a2a.types import ArtifactInfo
  54  from ...common.constants import ARTIFACT_TAG_WORKING
  55  from ...common.data_parts import (
  56      AgentProgressUpdateData,
  57      ArtifactCreationProgressData,
  58      LlmInvocationData,
  59      ThinkingContentData,
  60      ToolInvocationStartData,
  61      ToolResultData,
  62      TemplateBlockData,
  63  )
  64  
  65  
  66  METADATA_RESPONSE_KEY = "appended_artifact_metadata"
  67  from ..tools.builtin_artifact_tools import _internal_create_artifact
  68  from ...agent.adk.tool_wrapper import ADKToolWrapper
  69  
  70  # Import the new parser and its events
  71  from pydantic import BaseModel
  72  from ...agent.adk.stream_parser import (
  73      FencedBlockStreamParser,
  74      BlockStartedEvent,
  75      BlockProgressedEvent,
  76      BlockCompletedEvent,
  77      BlockInvalidatedEvent,
  78      TemplateBlockStartedEvent,
  79      TemplateBlockCompletedEvent,
  80      ARTIFACT_BLOCK_DELIMITER_OPEN,
  81      ARTIFACT_BLOCK_DELIMITER_CLOSE,
  82      TEMPLATE_LIQUID_START_SEQUENCE,
  83  )
  84  
  85  
  86  log = logging.getLogger(__name__)
  87  
  88  A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY = "temp:llm_stream_chunks_processed"
  89  
  90  if TYPE_CHECKING:
  91      from ..sac.component import SamAgentComponent
  92  
  93  
  94  def _parse_tags_param(tags_str: Optional[str]) -> List[str]:
  95      """Parse comma-separated tags string into a list.
  96  
  97      Args:
  98          tags_str: Comma-separated string of tags, or None/empty string
  99  
 100      Returns:
 101          List of trimmed, non-empty tag strings (empty list if no valid tags)
 102      """
 103      if not tags_str:
 104          return []
 105      return [t.strip() for t in tags_str.split(",") if t.strip()]
 106  
 107  
 108  async def _publish_data_part_status_update(
 109      host_component: "SamAgentComponent",
 110      a2a_context: Dict[str, Any],
 111      data_part_model: BaseModel,
 112  ):
 113      """Helper to construct and publish a TaskStatusUpdateEvent with a DataPart.
 114  
 115      This function delegates to the host component's publish_data_signal_from_thread method,
 116      which handles the async loop check and scheduling internally.
 117      """
 118      host_component.publish_data_signal_from_thread(
 119          a2a_context=a2a_context,
 120          signal_data=data_part_model,
 121          skip_buffer_flush=False,
 122          log_identifier=host_component.log_identifier,
 123      )
 124  
 125  
 126  async def _resolve_early_embeds_in_chunk(
 127      chunk: str,
 128      callback_context: CallbackContext,
 129      host_component: "SamAgentComponent",
 130      log_identifier: str,
 131  ) -> str:
 132      """
 133      Resolves early embeds in an artifact chunk before streaming to the browser.
 134  
 135      Args:
 136          chunk: The text chunk containing potential embeds
 137          callback_context: The ADK callback context with services
 138          host_component: The host component instance
 139          log_identifier: Identifier for logging
 140  
 141      Returns:
 142          The chunk with early embeds resolved
 143      """
 144      if not chunk or EMBED_DELIMITER_OPEN not in chunk:
 145          return chunk
 146  
 147      try:
 148          # Build resolution context from callback_context (pattern from EmbedResolvingMCPToolset)
 149          invocation_context = callback_context._invocation_context
 150          if not invocation_context:
 151              log.warning(
 152                  "%s No invocation context available for embed resolution",
 153                  log_identifier,
 154              )
 155              return chunk
 156  
 157          session_context = invocation_context.session
 158          if not session_context:
 159              log.warning(
 160                  "%s No session context available for embed resolution", log_identifier
 161              )
 162              return chunk
 163  
 164          resolution_context = {
 165              "artifact_service": invocation_context.artifact_service,
 166              "session_context": {
 167                  "session_id": get_original_session_id(invocation_context),
 168                  "user_id": session_context.user_id,
 169                  "app_name": session_context.app_name,
 170              },
 171          }
 172  
 173          # Resolve only early embeds (math, datetime, uuid, artifact_meta)
 174          resolved_text, processed_until, _ = await resolve_embeds_in_string(
 175              text=chunk,
 176              context=resolution_context,
 177              resolver_func=evaluate_embed,
 178              types_to_resolve=EARLY_EMBED_TYPES,  # Only resolve early embeds
 179              resolution_mode=ResolutionMode.ARTIFACT_STREAMING,  # New mode
 180              log_identifier=log_identifier,
 181              config=None,  # Could pass host_component config if needed
 182          )
 183  
 184          # SAFETY CHECK: If resolver buffered something, parser has a bug
 185          if processed_until < len(chunk):
 186              log.error(
 187                  "%s PARSER BUG DETECTED: Resolver buffered partial embed. "
 188                  "Chunk ends with: %r. Returning unresolved chunk to avoid corruption.",
 189                  log_identifier,
 190                  chunk[-50:] if len(chunk) > 50 else chunk,
 191              )
 192              # Fallback: return original unresolved chunk (degraded but not corrupted)
 193              return chunk
 194  
 195          return resolved_text
 196  
 197      except Exception as e:
 198          log.error(
 199              "%s Error resolving embeds in chunk: %s", log_identifier, e, exc_info=True
 200          )
 201          return chunk  # Return original chunk on error
 202  
 203  
 204  async def process_thinking_content_callback(
 205      callback_context: CallbackContext,
 206      llm_response: LlmResponse,
 207      host_component: "SamAgentComponent",
 208  ) -> Optional[LlmResponse]:
 209      """
 210      ADK after_model_callback to intercept and stream thinking/reasoning content.
 211  
 212      Checks for thinking content in the LlmResponse custom_metadata (set by
 213      the LiteLlm adapter when the model produces reasoning tokens) and publishes
 214      it as a ThinkingContentData signal via the A2A data part mechanism.
 215  
 216      Thinking content is stripped from the LlmResponse so it does not appear
 217      in the main text stream. The frontend renders it in a collapsible block.
 218      """
 219      log_identifier = "[Callback:ProcessThinkingContent]"
 220  
 221      a2a_context = callback_context.state.get("a2a_context")
 222      if not a2a_context:
 223          return None
 224  
 225      # Track thinking phase state in session to detect transitions
 226      session = get_session_from_callback_context(callback_context)
 227      thinking_phase_key = "thinking_phase_active"
 228      thinking_just_yielded_key = "thinking_just_yielded"
 229  
 230      # Check for phase transition BEFORE the metadata guard so that
 231      # chunks with no custom_metadata (text after thinking) still close
 232      # the thinking phase.
 233      if not llm_response.custom_metadata:
 234          if session.state.get(thinking_just_yielded_key):
 235              session.state[thinking_just_yielded_key] = False
 236              log.debug(
 237                  "%s Skipping is_complete: text chunk follows thinking chunk in same delta",
 238                  log_identifier,
 239              )
 240              return None
 241  
 242          if session.state.get(thinking_phase_key):
 243              session.state[thinking_phase_key] = False
 244              thinking_complete = ThinkingContentData(
 245                  content="",
 246                  is_complete=True,
 247              )
 248              await _publish_data_part_status_update(
 249                  host_component, a2a_context, thinking_complete
 250              )
 251              log.debug("%s Thinking phase completed (no metadata), sent is_complete signal", log_identifier)
 252          return None
 253  
 254      is_thinking = llm_response.custom_metadata.get("is_thinking_content", False)
 255      thinking_text_full = llm_response.custom_metadata.get("thinking_content")
 256  
 257      # Bug 5 fix: reset thinking_just_yielded at the start of each metadata-bearing
 258      # response so stale flags from a previous turn don't carry over.
 259      session.state[thinking_just_yielded_key] = False
 260  
 261      if is_thinking and llm_response.content and llm_response.content.parts:
 262          # Streaming thinking chunk — publish each text part as a signal
 263          session.state[thinking_phase_key] = True
 264          # Bug 6 fix: mark that a thinking chunk was just yielded so the next
 265          # no-metadata yield (text portion of the same delta) is not mistaken
 266          # for a thinking-phase-end transition.
 267          session.state[thinking_just_yielded_key] = True
 268          for part in llm_response.content.parts:
 269              if part.text:
 270                  thinking_data = ThinkingContentData(
 271                      content=part.text,
 272                      is_complete=False,
 273                  )
 274                  await _publish_data_part_status_update(
 275                      host_component, a2a_context, thinking_data
 276                  )
 277                  log.debug(
 278                      "%s Published thinking chunk (%d chars)",
 279                      log_identifier,
 280                      len(part.text),
 281                  )
 282  
 283          # Strip thinking content from the LlmResponse so it does not
 284          # appear in the main text stream
 285          llm_response.content = adk_types.Content(role="model", parts=[])
 286          return None
 287  
 288      # Check if thinking phase just ended (transition from thinking to regular content)
 289      if session.state.get(thinking_phase_key) and not is_thinking:
 290          session.state[thinking_phase_key] = False
 291          thinking_complete = ThinkingContentData(
 292              content="",
 293              is_complete=True,
 294          )
 295          await _publish_data_part_status_update(
 296              host_component, a2a_context, thinking_complete
 297          )
 298          log.debug("%s Thinking phase completed, sent is_complete signal", log_identifier)
 299  
 300      elif thinking_text_full:
 301          # Non-streaming: full thinking content in metadata (from non-streaming response)
 302          thinking_data = ThinkingContentData(
 303              content=thinking_text_full,
 304              is_complete=True,
 305          )
 306          await _publish_data_part_status_update(
 307              host_component, a2a_context, thinking_data
 308          )
 309          log.debug(
 310              "%s Published full thinking content (%d chars)",
 311              log_identifier,
 312              len(thinking_text_full),
 313          )
 314          # Remove from metadata to avoid downstream confusion
 315          llm_response.custom_metadata.pop("thinking_content", None)
 316  
 317      return None
 318  
 319  
 320  async def process_artifact_blocks_callback(
 321      callback_context: CallbackContext,
 322      llm_response: LlmResponse,
 323      host_component: "SamAgentComponent",
 324  ) -> Optional[LlmResponse]:
 325      """
 326      Orchestrates the parsing of fenced artifact blocks from an LLM stream
 327      by delegating to a FencedBlockStreamParser instance.
 328      This callback is stateful across streaming chunks within a single turn.
 329      """
 330      log_identifier = "[Callback:ProcessArtifactBlocks]"
 331      parser_state_key = "fenced_block_parser"
 332      session = get_session_from_callback_context(callback_context)
 333  
 334      parser: FencedBlockStreamParser = session.state.get(parser_state_key)
 335      if parser is None:
 336          log.debug("%s New turn. Creating new FencedBlockStreamParser.", log_identifier)
 337          parser = FencedBlockStreamParser(progress_update_interval_bytes=250)
 338          session.state[parser_state_key] = parser
 339          session.state["completed_artifact_blocks_list"] = []
 340          session.state["completed_template_blocks_list"] = []
 341          session.state["artifact_chars_sent"] = (
 342              0  # Reset character tracking for new turn
 343          )
 344  
 345      stream_chunks_were_processed = callback_context.state.get(
 346          A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY, False
 347      )
 348      if llm_response.partial:
 349          callback_context.state[A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY] = True
 350  
 351      if llm_response.partial or not stream_chunks_were_processed:
 352          processed_parts: List[adk_types.Part] = []
 353          original_parts = llm_response.content.parts if llm_response.content else []
 354          a2a_context = callback_context.state.get("a2a_context")
 355  
 356          for part in original_parts:
 357              if part.text is not None:
 358                  parser_result = parser.process_chunk(part.text)
 359  
 360                  if llm_response.partial:
 361                      if parser_result.user_facing_text:
 362                          processed_parts.append(
 363                              adk_types.Part(text=parser_result.user_facing_text)
 364                          )
 365                  else:
 366                      processed_parts.append(part)
 367  
 368                  for event in parser_result.events:
 369                      if isinstance(event, BlockStartedEvent):
 370                          log.info(
 371                              "%s Event: BlockStarted. Params: %s",
 372                              log_identifier,
 373                              event.params,
 374                          )
 375                          # Reset character tracking for this new artifact block
 376                          session.state["artifact_chars_sent"] = 0
 377  
 378                          filename = event.params.get("filename", "unknown_artifact")
 379                          if filename == "unknown_artifact":
 380                              log.warning(
 381                                  "%s Fenced artifact block started without a 'filename' parameter.",
 382                                  log_identifier,
 383                              )
 384                          description = event.params.get("description")
 385                          if filename == "unknown_artifact":
 386                              log.warning(
 387                                  "%s Fenced artifact block started without a 'filename' parameter.",
 388                                  log_identifier,
 389                              )
 390                          # Extract tags from params (comma-separated string to list)
 391                          tags = _parse_tags_param(event.params.get("tags")) or None
 392                          if a2a_context:
 393                              status_text = f"Receiving artifact `{filename}`..."
 394                              if description:
 395                                  status_text = (
 396                                      f"Receiving artifact `{filename}`: {description}"
 397                                  )
 398                              progress_data = AgentProgressUpdateData(
 399                                  status_text=status_text
 400                              )
 401                              await _publish_data_part_status_update(
 402                                  host_component, a2a_context, progress_data
 403                              )
 404                              # Also send an initial in-progress event to create the UI bubble
 405                              artifact_progress_data = ArtifactCreationProgressData(
 406                                  filename=filename,
 407                                  description=description,
 408                                  status="in-progress",
 409                                  bytes_transferred=0,
 410                                  artifact_chunk=None,
 411                                  tags=tags,
 412                              )
 413  
 414                              await _publish_data_part_status_update(
 415                                  host_component, a2a_context, artifact_progress_data
 416                              )
 417                          params_str = " ".join(
 418                              [f'{k}="{v}"' for k, v in event.params.items()]
 419                          )
 420                          original_text = f"{ARTIFACT_BLOCK_DELIMITER_OPEN}save_artifact: {params_str}\n"
 421                          session.state["artifact_block_original_text"] = original_text
 422  
 423                      elif isinstance(event, BlockProgressedEvent):
 424                          log.debug(
 425                              "%s Event: BlockProgressed. Size: %d",
 426                              log_identifier,
 427                              event.buffered_size,
 428                          )
 429                          params = event.params
 430                          filename = params.get("filename", "unknown_artifact")
 431                          if filename == "unknown_artifact":
 432                              log.warning(
 433                                  "%s Fenced artifact block progressed without a 'filename' parameter.",
 434                                  log_identifier,
 435                              )
 436                          if a2a_context:
 437                              # Resolve early embeds in the chunk before streaming
 438                              resolved_chunk = await _resolve_early_embeds_in_chunk(
 439                                  chunk=event.chunk,
 440                                  callback_context=callback_context,
 441                                  host_component=host_component,
 442                                  log_identifier=f"{log_identifier}[ResolveChunk]",
 443                              )
 444                              # Extract tags from params (comma-separated string to list)
 445                              tags = _parse_tags_param(params.get("tags")) or None
 446  
 447                              progress_data = ArtifactCreationProgressData(
 448                                  filename=filename,
 449                                  description=params.get("description"),
 450                                  status="in-progress",
 451                                  bytes_transferred=event.buffered_size,
 452                                  artifact_chunk=resolved_chunk,  # Resolved chunk
 453                                  tags=tags,
 454                              )
 455  
 456                              # Track the cumulative character count of what we've sent
 457                              # We need character count (not bytes) to slice correctly later
 458                              previous_char_count = session.state.get(
 459                                  "artifact_chars_sent", 0
 460                              )
 461                              new_char_count = previous_char_count + len(event.chunk)
 462                              session.state["artifact_chars_sent"] = new_char_count
 463  
 464                              await _publish_data_part_status_update(
 465                                  host_component, a2a_context, progress_data
 466                              )
 467  
 468                      elif isinstance(event, BlockCompletedEvent):
 469                          log.debug(
 470                              "%s Event: BlockCompleted. Content length: %d",
 471                              log_identifier,
 472                              len(event.content),
 473                          )
 474                          original_text = session.state.get(
 475                              "artifact_block_original_text", ""
 476                          )
 477                          original_text += event.content
 478                          original_text += "»»»"
 479  
 480                          tool_context_for_call = ToolContext(
 481                              callback_context._invocation_context
 482                          )
 483  
 484                          params = event.params
 485                          filename = params.get("filename")
 486                          if not filename or not filename.strip():
 487                              log.warning(
 488                                  "%s Fenced artifact block is missing a valid 'filename'. Failing operation.",
 489                                  log_identifier,
 490                              )
 491                              session.state["completed_artifact_blocks_list"].append(
 492                                  {
 493                                      "filename": (
 494                                          "unknown_artifact"
 495                                          if filename is None
 496                                          else filename
 497                                      ),
 498                                      "version": 0,
 499                                      "status": "error",
 500                                      "original_text": original_text,
 501                                  }
 502                              )
 503                              if a2a_context:
 504                                  if not filename or not filename.strip():
 505                                      filename = "unknown_artifact"
 506                                  progress_data = ArtifactCreationProgressData(
 507                                      filename=filename or "unknown_artifact",
 508                                      description=params.get("description"),
 509                                      status="failed",
 510                                      bytes_transferred=0,
 511                                  )
 512                                  await _publish_data_part_status_update(
 513                                      host_component, a2a_context, progress_data
 514                                  )
 515                              continue
 516  
 517                          kwargs_for_call = {
 518                              "filename": filename,
 519                              "content": event.content,
 520                              "mime_type": params.get("mime_type"),
 521                              "description": params.get("description"),
 522                              "metadata_json": params.get("metadata"),
 523                              "tool_context": tool_context_for_call,
 524                          }
 525                          if "schema_max_keys" in params:
 526                              try:
 527                                  kwargs_for_call["schema_max_keys"] = int(
 528                                      params["schema_max_keys"]
 529                                  )
 530                              except (ValueError, TypeError):
 531                                  log.warning(
 532                                      "%s Invalid 'schema_max_keys' value '%s'. Ignoring.",
 533                                      log_identifier,
 534                                      params["schema_max_keys"],
 535                                  )
 536                          # Extract tags from params (comma-separated string to list)
 537                          tags_list = _parse_tags_param(params.get("tags"))
 538  
 539                          # Auto-tag artifacts as internal when created during structured invocation
 540                          logical_task_id_for_tags = a2a_context.get("logical_task_id")
 541                          if logical_task_id_for_tags:
 542                              with host_component.active_tasks_lock:
 543                                  task_ctx = host_component.active_tasks.get(logical_task_id_for_tags)
 544                              if task_ctx and task_ctx.get_flag("is_structured_invocation"):
 545                                  if ARTIFACT_TAG_WORKING not in tags_list:
 546                                      tags_list.append(ARTIFACT_TAG_WORKING)
 547  
 548                          if tags_list:
 549                              kwargs_for_call["tags"] = tags_list
 550                          wrapped_creator = ADKToolWrapper(
 551                              original_func=_internal_create_artifact,
 552                              tool_config=None,  # No specific config for this internal tool
 553                              tool_name="_internal_create_artifact",
 554                              origin="internal",
 555                              resolution_type="early",
 556                          )
 557                          save_result = await wrapped_creator(**kwargs_for_call)
 558  
 559                          if save_result.status in ["success", "partial"]:
 560                              status_for_tool = "success"
 561                              version_for_tool = save_result.data.get("data_version", 1) if save_result.data else 1
 562                              try:
 563                                  logical_task_id = a2a_context.get("logical_task_id")
 564                                  if logical_task_id:
 565                                      with host_component.active_tasks_lock:
 566                                          task_context = host_component.active_tasks.get(
 567                                              logical_task_id
 568                                          )
 569                                      if task_context:
 570                                          task_context.register_produced_artifact(
 571                                              filename, version_for_tool
 572                                          )
 573                                          log.info(
 574                                              "%s Registered inline artifact '%s' v%d for task %s.",
 575                                              log_identifier,
 576                                              filename,
 577                                              version_for_tool,
 578                                              logical_task_id,
 579                                          )
 580                                      else:
 581                                          log.warning(
 582                                              "%s TaskExecutionContext not found for task %s, cannot register inline artifact '%s'.",
 583                                              log_identifier,
 584                                              logical_task_id,
 585                                              filename,
 586                                          )
 587                                  else:
 588                                      log.warning(
 589                                          "%s No logical_task_id, cannot register inline artifact.",
 590                                          log_identifier,
 591                                      )
 592                              except Exception as e_track:
 593                                  log.error(
 594                                      "%s Failed to track inline artifact: %s",
 595                                      log_identifier,
 596                                      e_track,
 597                                  )
 598  
 599                              # Send final progress update with any remaining content not yet sent
 600                              if a2a_context:
 601                                  # Check if there's unsent content (content after last progress event)
 602                                  total_bytes = len(event.content.encode("utf-8"))
 603                                  chars_already_sent = session.state.get(
 604                                      "artifact_chars_sent", 0
 605                                  )
 606  
 607                                  if chars_already_sent < len(event.content):
 608                                      # There's unsent content - send it as a final progress update
 609                                      final_chunk = event.content[chars_already_sent:]
 610  
 611                                      # Resolve embeds in final chunk
 612                                      resolved_final_chunk = await _resolve_early_embeds_in_chunk(
 613                                          chunk=final_chunk,
 614                                          callback_context=callback_context,
 615                                          host_component=host_component,
 616                                          log_identifier=f"{log_identifier}[ResolveFinalChunk]",
 617                                      )
 618  
 619                                      final_progress_data = ArtifactCreationProgressData(
 620                                          filename=filename,
 621                                          description=params.get("description"),
 622                                          status="in-progress",
 623                                          bytes_transferred=total_bytes,
 624                                          artifact_chunk=resolved_final_chunk,  # Resolved final chunk
 625                                      )
 626  
 627                                      await _publish_data_part_status_update(
 628                                          host_component, a2a_context, final_progress_data
 629                                      )
 630  
 631                              # Publish completion status immediately via SSE
 632                              if a2a_context:
 633                                  # Get tags (already parsed above as kwargs_for_call["tags"])
 634                                  completion_tags = kwargs_for_call.get("tags")
 635                                  progress_data = ArtifactCreationProgressData(
 636                                      filename=filename,
 637                                      description=params.get("description"),
 638                                      status="completed",
 639                                      bytes_transferred=len(event.content),
 640                                      mime_type=params.get("mime_type"),
 641                                      version=version_for_tool,
 642                                      tags=completion_tags,
 643                                  )
 644                                  await _publish_data_part_status_update(
 645                                      host_component, a2a_context, progress_data
 646                                  )
 647                          else:
 648                              status_for_tool = "error"
 649                              version_for_tool = 0
 650                              # Publish failure status immediately via SSE
 651                              if a2a_context:
 652                                  # Get tags (already parsed above as kwargs_for_call["tags"])
 653                                  failure_tags = kwargs_for_call.get("tags")
 654                                  progress_data = ArtifactCreationProgressData(
 655                                      filename=filename,
 656                                      description=params.get("description"),
 657                                      status="failed",
 658                                      bytes_transferred=len(event.content),
 659                                      tags=failure_tags,
 660                                  )
 661                                  await _publish_data_part_status_update(
 662                                      host_component, a2a_context, progress_data
 663                                  )
 664  
 665                          session.state["completed_artifact_blocks_list"].append(
 666                              {
 667                                  "filename": filename,
 668                                  "version": version_for_tool,
 669                                  "status": status_for_tool,
 670                                  "description": params.get("description"),
 671                                  "mime_type": params.get("mime_type"),
 672                                  "bytes_transferred": len(event.content),
 673                                  "original_text": original_text,
 674                                  "tags": kwargs_for_call.get("tags"),
 675                              }
 676                          )
 677  
 678                      elif isinstance(event, TemplateBlockStartedEvent):
 679                          log.debug(
 680                              "%s Event: TemplateBlockStarted. Params: %s",
 681                              log_identifier,
 682                              event.params,
 683                          )
 684  
 685                      elif isinstance(event, TemplateBlockCompletedEvent):
 686                          log.debug(
 687                              "%s Event: TemplateBlockCompleted. Template length: %d",
 688                              log_identifier,
 689                              len(event.template_content),
 690                          )
 691  
 692                          # Create a TemplateBlockData message to send to the gateway
 693                          template_id = str(uuid.uuid4())
 694                          params = event.params
 695  
 696                          data_artifact = params.get("data")
 697                          if not data_artifact:
 698                              log.warning(
 699                                  "%s Template block is missing 'data' parameter. Skipping.",
 700                                  log_identifier,
 701                              )
 702                              continue
 703  
 704                          template_data = TemplateBlockData(
 705                              template_id=template_id,
 706                              data_artifact=data_artifact,
 707                              jsonpath=params.get("jsonpath"),
 708                              limit=(
 709                                  int(params.get("limit"))
 710                                  if params.get("limit")
 711                                  else None
 712                              ),
 713                              template_content=event.template_content,
 714                          )
 715  
 716                          # Publish A2A status update with template metadata
 717                          if a2a_context:
 718                              await _publish_data_part_status_update(
 719                                  host_component, a2a_context, template_data
 720                              )
 721                              log.info(
 722                                  "%s Published TemplateBlockData with ID: %s",
 723                                  log_identifier,
 724                                  template_id,
 725                              )
 726  
 727                          # Reconstruct the original template block text for peer-to-peer responses
 728                          # Peer agents don't receive TemplateBlockData signals, so they need
 729                          # the original block text to pass templates through to the gateway
 730                          params_str = " ".join([f'{k}="{v}"' for k, v in params.items()])
 731                          original_template_text = (
 732                              f"{TEMPLATE_LIQUID_START_SEQUENCE} {params_str}\n"
 733                              f"{event.template_content}"
 734                              f"{ARTIFACT_BLOCK_DELIMITER_CLOSE}"
 735                          )
 736  
 737                          # For RUN_BASED sessions (peer-to-peer agent requests), preserve the
 738                          # template block in the response text at its original position.
 739                          # This allows the calling agent to forward it to the gateway.
 740                          # Gateway requests use streaming sessions and receive TemplateBlockData
 741                          # signals instead.
 742                          is_run_based = a2a_context and a2a_context.get(
 743                              "is_run_based_session", False
 744                          )
 745                          if is_run_based and llm_response.partial:
 746                              processed_parts.append(
 747                                  adk_types.Part(text=original_template_text)
 748                              )
 749                              log.debug(
 750                                  "%s Preserved template block in RUN_BASED peer response. Template ID: %s",
 751                                  log_identifier,
 752                                  template_id,
 753                              )
 754  
 755                          # Store template_id and original text in session for potential future use
 756                          # (Gateway will handle the actual resolution via signals,
 757                          # but peer agents need the original text in their responses)
 758                          if (
 759                              "completed_template_blocks_list" not in session.state
 760                              or session.state["completed_template_blocks_list"] is None
 761                          ):
 762                              session.state["completed_template_blocks_list"] = []
 763                          session.state["completed_template_blocks_list"].append(
 764                              {
 765                                  "template_id": template_id,
 766                                  "data_artifact": data_artifact,
 767                                  "original_text": original_template_text,
 768                              }
 769                          )
 770  
 771                      elif isinstance(event, BlockInvalidatedEvent):
 772                          log.debug(
 773                              "%s Event: BlockInvalidated. Rolled back: '%s'",
 774                              log_identifier,
 775                              event.rolled_back_text,
 776                          )
 777              else:
 778                  processed_parts.append(part)
 779  
 780          if llm_response.partial:
 781              if llm_response.content:
 782                  llm_response.content.parts = processed_parts
 783              elif processed_parts:
 784                  llm_response.content = adk_types.Content(parts=processed_parts)
 785      else:
 786          log.debug(
 787              "%s Ignoring text content of final aggregated response because stream was already processed.",
 788              log_identifier,
 789          )
 790  
 791      if not llm_response.partial and not llm_response.interrupted:
 792          log.debug(
 793              "%s Final, non-interrupted stream chunk received. Finalizing parser.",
 794              log_identifier,
 795          )
 796          final_parser_result = parser.finalize()
 797  
 798          for event in final_parser_result.events:
 799              if isinstance(event, BlockInvalidatedEvent):
 800                  # This event is emitted when an unterminated block is detected at the end of a turn.
 801                  # The rolled_back_text contains the original opening line + buffered content.
 802                  # We need to:
 803                  # 1. Send a "cancelled" status to clean up any in-progress UI
 804                  # 2. Send the original text as a status update so the frontend can display it
 805                  log.warning(
 806                      "%s Unterminated artifact block detected at end of turn. Rolled back text length: %d",
 807                      log_identifier,
 808                      len(event.rolled_back_text),
 809                  )
 810                  
 811                  # Try to extract filename from the rolled back text for the cancelled status
 812                  # The format is: «««save_artifact: filename="test.md" ...\n...content...
 813                  filename = "unknown_artifact"
 814                  filename_match = re.search(r'filename="([^"]+)"', event.rolled_back_text)
 815                  if filename_match:
 816                      filename = filename_match.group(1)
 817                  
 818                  # Send a "cancelled" status to clean up any in-progress UI that was created
 819                  # when the block started. This tells the frontend to remove the artifact bubble.
 820                  a2a_context = callback_context.state.get("a2a_context")
 821                  if a2a_context:
 822                      cancelled_progress_data = ArtifactCreationProgressData(
 823                          filename=filename,
 824                          status="cancelled",  # Use "cancelled" instead of "failed" for unterminated blocks
 825                          bytes_transferred=0,
 826                          # Include the rolled back text so the frontend can display it
 827                          # This is sent along with the cancelled status so the frontend
 828                          # can show the original text that was incorrectly parsed as an artifact
 829                          rolled_back_text=event.rolled_back_text,
 830                      )
 831                      await _publish_data_part_status_update(
 832                          host_component, a2a_context, cancelled_progress_data
 833                      )
 834                      log.debug(
 835                          "%s Sent 'cancelled' status with rolled back text for unterminated artifact block: %s",
 836                          log_identifier,
 837                          filename,
 838                      )
 839                  
 840                  # Note: We no longer need to store the original text in completed_artifact_blocks_list
 841                  # because we're sending it directly with the cancelled status.
 842                  # The frontend will handle displaying the text when it receives the cancelled status.
 843  
 844          # If there was any rolled-back text from finalization, append it
 845          if final_parser_result.user_facing_text:
 846              if (
 847                  llm_response.content
 848                  and llm_response.content.parts
 849                  and llm_response.content.parts[-1].text is not None
 850              ):
 851                  llm_response.content.parts[
 852                      -1
 853                  ].text += final_parser_result.user_facing_text
 854              else:
 855                  if llm_response.content is None:
 856                      llm_response.content = adk_types.Content(parts=[])
 857                  elif llm_response.content.parts is None:
 858                      llm_response.content.parts = []
 859                  llm_response.content.parts.append(
 860                      adk_types.Part(text=final_parser_result.user_facing_text)
 861                  )
 862  
 863          # Check if any blocks were completed and need to be injected into the final response
 864          completed_blocks_list = session.state.get("completed_artifact_blocks_list")
 865          if completed_blocks_list:
 866              log.info(
 867                  "%s Injecting info for %d saved artifact(s) into final LlmResponse.",
 868                  log_identifier,
 869                  len(completed_blocks_list),
 870              )
 871  
 872              # Get a2a_context for sending signals
 873              a2a_context = callback_context.state.get("a2a_context")
 874  
 875              tool_call_parts = []
 876              for block_info in completed_blocks_list:
 877                  function_call_id = f"host-notify-{uuid.uuid4()}"
 878                  notify_tool_call = adk_types.FunctionCall(
 879                      name="_notify_artifact_save",
 880                      args={
 881                          "filename": block_info["filename"],
 882                          "version": block_info["version"],
 883                          "status": block_info["status"],
 884                      },
 885                      id=function_call_id,
 886                  )
 887                  tool_call_parts.append(adk_types.Part(function_call=notify_tool_call))
 888  
 889                  # Send artifact saved notification now that we have the function_call_id
 890                  # This ensures the signal and tool call arrive together
 891                  if block_info["status"] == "success" and a2a_context:
 892                      try:
 893                          artifact_info = ArtifactInfo(
 894                              filename=block_info["filename"],
 895                              version=block_info["version"],
 896                              mime_type=block_info.get("mime_type")
 897                              or "application/octet-stream",
 898                              size=block_info.get("bytes_transferred", 0),
 899                              description=block_info.get("description"),
 900                              version_count=None,  # Count not available in save context
 901                          )
 902                          await host_component.notify_artifact_saved(
 903                              artifact_info=artifact_info,
 904                              a2a_context=a2a_context,
 905                              function_call_id=function_call_id,
 906                          )
 907                          log.debug(
 908                              "%s Published artifact saved notification for fenced block: %s (function_call_id=%s)",
 909                              log_identifier,
 910                              block_info["filename"],
 911                              function_call_id,
 912                          )
 913                      except Exception as signal_err:
 914                          log.warning(
 915                              "%s Failed to publish artifact saved notification: %s",
 916                              log_identifier,
 917                              signal_err,
 918                          )
 919  
 920              existing_parts = llm_response.content.parts if llm_response.content else []
 921              final_existing_parts = existing_parts
 922  
 923              if llm_response.content is None:
 924                  llm_response.content = adk_types.Content(parts=[])
 925  
 926              llm_response.content.parts = tool_call_parts + final_existing_parts
 927  
 928              llm_response.turn_complete = True
 929              llm_response.partial = False
 930  
 931          session.state[parser_state_key] = None
 932          session.state["completed_artifact_blocks_list"] = None
 933          session.state["artifact_block_original_text"] = None
 934          session.state["completed_template_blocks_list"] = None
 935          log.debug("%s Cleaned up parser session state.", log_identifier)
 936  
 937      return None
 938  
 939  
 940  def create_dangling_tool_call_repair_content(
 941      dangling_calls: List[adk_types.FunctionCall], error_message: str
 942  ) -> adk_types.Content:
 943      """
 944      Creates a synthetic ADK Content object to repair a dangling tool call.
 945  
 946      Args:
 947          dangling_calls: The list of FunctionCall objects that need a response.
 948          error_message: The error message to include in the response.
 949  
 950      Returns:
 951          An ADK Content object with role='tool' containing the error response.
 952      """
 953      error_response_parts = []
 954      for fc in dangling_calls:
 955          error_response_part = adk_types.Part.from_function_response(
 956              name=fc.name,
 957              response={"status": "error", "message": error_message},
 958          )
 959          error_response_part.function_response.id = fc.id
 960          error_response_parts.append(error_response_part)
 961  
 962      return adk_types.Content(role="tool", parts=error_response_parts)
 963  
 964  
 965  def repair_history_callback(
 966      callback_context: CallbackContext, llm_request: LlmRequest
 967  ) -> Optional[LlmResponse]:
 968      """
 969      ADK before_model_callback to proactively check for and repair dangling
 970      tool calls in the conversation history before it's sent to the LLM.
 971      This acts as a "suspender" to catch any history corruption.
 972      """
 973      log_identifier = "[Callback:RepairHistory]"
 974      if not llm_request.contents:
 975          return None
 976  
 977      history_modified = False
 978      i = 0
 979      while i < len(llm_request.contents):
 980          content = llm_request.contents[i]
 981          function_calls = []
 982          if content and content.role == "model" and content.parts:
 983              function_calls = [p.function_call for p in content.parts if p.function_call]
 984  
 985          if function_calls:
 986              next_content_is_valid_response = False
 987              if (i + 1) < len(llm_request.contents):
 988                  next_content = llm_request.contents[i + 1]
 989                  if (
 990                      next_content.role in ["user", "tool"]
 991                      and next_content.parts
 992                      and any(p.function_response for p in next_content.parts)
 993                  ):
 994                      next_content_is_valid_response = True
 995  
 996              if not next_content_is_valid_response:
 997                  log.warning(
 998                      "%s Found dangling tool call in history for tool(s): %s. Repairing.",
 999                      log_identifier,
1000                      [fc.name for fc in function_calls],
1001                  )
1002                  repair_content = create_dangling_tool_call_repair_content(
1003                      dangling_calls=function_calls,
1004                      error_message="The previous tool call did not complete successfully and was automatically repaired.",
1005                  )
1006                  llm_request.contents.insert(i + 1, repair_content)
1007                  history_modified = True
1008                  i += 1
1009          i += 1
1010  
1011      if history_modified:
1012          log.info(
1013              "%s History was modified to repair dangling tool calls.", log_identifier
1014          )
1015  
1016      return None
1017  
1018  
1019  def _recursively_clean_pydantic_types(data: Any) -> Any:
1020      """
1021      Recursively traverses a data structure (dicts, lists) and converts
1022      Pydantic-specific types like AnyUrl to their primitive string representation
1023      to ensure JSON serializability.
1024      """
1025      if isinstance(data, dict):
1026          return {
1027              key: _recursively_clean_pydantic_types(value) for key, value in data.items()
1028          }
1029      elif isinstance(data, list):
1030          return [_recursively_clean_pydantic_types(item) for item in data]
1031      # Check for Pydantic's AnyUrl without a direct import to avoid dependency issues.
1032      elif type(data).__name__ == "AnyUrl" and hasattr(data, "__str__"):
1033          return str(data)
1034      return data
1035  
1036  
1037  def _mcp_response_contains_non_text(mcp_response_dict: Dict[str, Any]) -> bool:
1038      """
1039      Checks if the 'content' list in an MCP response dictionary contains any
1040      items that are not of type 'text'.
1041      """
1042      if not isinstance(mcp_response_dict, dict):
1043          return False
1044  
1045      content_list = mcp_response_dict.get("content")
1046      if not isinstance(content_list, list):
1047          return False
1048  
1049      for item in content_list:
1050          if isinstance(item, dict) and item.get("type") != "text":
1051              return True
1052      return False
1053  
1054  
1055  async def manage_large_mcp_tool_responses_callback(
1056      tool: BaseTool,
1057      args: Dict[str, Any],
1058      tool_context: ToolContext,
1059      tool_response: Any,
1060      host_component: "SamAgentComponent",
1061  ) -> Optional[Dict[str, Any]]:
1062      """
1063      Manages large or non-textual responses from MCP tools.
1064  
1065      This callback intercepts the response from an MCPTool. Based on the response's
1066      size and content type, it performs one or more of the following actions:
1067      1.  **Saves as Artifact:** If the response size exceeds a configured threshold,
1068          or if it contains non-textual content (like images), it calls the
1069          `save_mcp_response_as_artifact_intelligent` function to save the
1070          response as one or more typed artifacts.
1071      2.  **Truncates for LLM:** If the response size exceeds a configured limit for
1072          the LLM, it truncates the content to a preview string.
1073      3.  **Constructs Final Response:** It builds a new dictionary to be returned
1074          to the LLM, which includes:
1075          - A `message_to_llm` summarizing what was done (e.g., saved, truncated).
1076          - `saved_mcp_response_artifact_details` with the result of the save operation.
1077          - `mcp_tool_output` containing either the original response or the truncated preview.
1078          - A `status` field indicating the outcome (e.g., 'processed_and_saved').
1079  
1080      The `tool_response` is the direct output from the tool's `run_async` method.
1081      """
1082      log_identifier = f"[Callback:ManageLargeMCPResponse:{tool.name}]"
1083      log.info(
1084          "%s Starting callback for tool response, type: %s",
1085          log_identifier,
1086          type(tool_response).__name__,
1087      )
1088  
1089      if tool_response is None:
1090          return None
1091  
1092      if not isinstance(tool, MCPTool):
1093          log.debug(
1094              "%s Tool is not an MCPTool. Skipping large response handling.",
1095              log_identifier,
1096          )
1097          return (
1098              tool_response
1099              if isinstance(tool_response, dict)
1100              else {"result": tool_response}
1101          )
1102  
1103      log.debug(
1104          "%s Tool is an MCPTool. Proceeding with large response handling.",
1105          log_identifier,
1106      )
1107  
1108      if hasattr(tool_response, "model_dump"):
1109          mcp_response_dict = tool_response.model_dump(exclude_none=True)
1110          log.debug("%s Converted MCPTool response object to dictionary.", log_identifier)
1111      elif isinstance(tool_response, dict):
1112          mcp_response_dict = tool_response
1113          log.debug("%s MCPTool response is already a dictionary.", log_identifier)
1114      else:
1115          log.warning(
1116              "%s MCPTool response is not a Pydantic model or dict (type: %s). Attempting to proceed, but serialization might fail.",
1117              log_identifier,
1118              type(tool_response),
1119          )
1120          mcp_response_dict = tool_response
1121  
1122      # Clean any Pydantic-specific types before serialization
1123      mcp_response_dict = _recursively_clean_pydantic_types(mcp_response_dict)
1124      cleaned_args = _recursively_clean_pydantic_types(args)
1125  
1126      try:
1127          save_threshold = host_component.get_config(
1128              "mcp_tool_response_save_threshold_bytes", 2048
1129          )
1130          llm_max_bytes = host_component.get_config("mcp_tool_llm_return_max_bytes", 4096)
1131          log.debug(
1132              "%s Config: save_threshold=%d bytes, llm_max_bytes=%d bytes.",
1133              log_identifier,
1134              save_threshold,
1135              llm_max_bytes,
1136          )
1137      except Exception as e:
1138          log.error(
1139              "%s Error retrieving configuration: %s. Using defaults.", log_identifier, e
1140          )
1141          save_threshold = 2048
1142          llm_max_bytes = 4096
1143  
1144      contains_non_text_content = _mcp_response_contains_non_text(mcp_response_dict)
1145      if not contains_non_text_content:
1146          try:
1147              serialized_original_response_str = json.dumps(mcp_response_dict)
1148              original_response_bytes = len(
1149                  serialized_original_response_str.encode("utf-8")
1150              )
1151              log.debug(
1152                  "%s Original response size: %d bytes.",
1153                  log_identifier,
1154                  original_response_bytes,
1155              )
1156          except TypeError as e:
1157              log.error(
1158                  "%s Failed to serialize original MCP tool response dictionary: %s. Returning original response object.",
1159                  log_identifier,
1160                  e,
1161              )
1162              return tool_response
1163          needs_truncation_for_llm = original_response_bytes > llm_max_bytes
1164          needs_saving_as_artifact = (
1165              original_response_bytes > save_threshold
1166          ) or needs_truncation_for_llm
1167      else:
1168          needs_truncation_for_llm = False
1169          needs_saving_as_artifact = True
1170  
1171      save_result = None
1172      if needs_saving_as_artifact:
1173          save_result = await save_mcp_response_as_artifact_intelligent(
1174              tool, tool_context, host_component, mcp_response_dict, cleaned_args
1175          )
1176          if save_result.status == McpSaveStatus.ERROR:
1177              log.warning(
1178                  "%s Failed to save artifact: %s. Proceeding without saved artifact details.",
1179                  log_identifier,
1180                  save_result.message,
1181              )
1182  
1183      final_llm_response_dict: Dict[str, Any] = {}
1184      message_parts_for_llm: list[str] = []
1185  
1186      if needs_truncation_for_llm:
1187          # ALL-OR-NOTHING APPROACH: Do not include truncated data to prevent LLM hallucination.
1188          # When LLMs receive partial data, they tend to confidently fill in gaps with
1189          # hallucinated information. By withholding partial data entirely, we force the LLM
1190          # to use reliable mechanisms (template_liquid, load_artifact) to access the full data.
1191          final_llm_response_dict["mcp_tool_output"] = {
1192              "type": "data_in_artifact_only",
1193              "message": "Data exceeds size limit. Full data saved as artifact - use template_liquid, load_artifact or other artifact analysis tools to process and access.",
1194          }
1195          message_parts_for_llm.append(
1196              f"The response from tool '{tool.name}' was too large ({original_response_bytes} bytes) to display directly. "
1197              "The data has NOT been included here to prevent incomplete information. "
1198              "You MUST use template_liquid (for displaying to users) or load_artifact or other "
1199              "artifact analysis tools (for processing) to access the full data."
1200          )
1201          log.debug(
1202              "%s MCP tool output withheld from LLM (all-or-nothing approach).",
1203              log_identifier,
1204          )
1205  
1206      if needs_saving_as_artifact:
1207          if save_result and save_result.status in [
1208              McpSaveStatus.SUCCESS,
1209              McpSaveStatus.PARTIAL_SUCCESS,
1210          ]:
1211              final_llm_response_dict["saved_mcp_response_artifact_details"] = (
1212                  save_result.model_dump(exclude_none=True)
1213              )
1214  
1215              total_artifacts = len(save_result.artifacts_saved)
1216              if total_artifacts > 0:
1217                  first_artifact = save_result.artifacts_saved[0]
1218                  filename = first_artifact.data_filename
1219                  version = first_artifact.data_version
1220                  if total_artifacts > 1:
1221                      artifact_msg = f"The full response has been saved as {total_artifacts} artifacts, starting with '{filename}' (version {version})."
1222                  else:
1223                      artifact_msg = f"The full response has been saved as artifact '{filename}' (version {version})."
1224  
1225                  # When data was too large and truncated, provide explicit guidance
1226                  if needs_truncation_for_llm:
1227                      artifact_msg += (
1228                          f' To display this data to the user, use template_liquid with data="{filename}". '
1229                          f'To process the data yourself, use load_artifact("{filename}").'
1230                      )
1231                  message_parts_for_llm.append(artifact_msg)
1232              elif save_result.fallback_artifact:
1233                  filename = save_result.fallback_artifact.data_filename
1234                  version = save_result.fallback_artifact.data_version
1235                  artifact_msg = f"The full response has been saved as artifact '{filename}' (version {version})."
1236                  if needs_truncation_for_llm:
1237                      artifact_msg += (
1238                          f' To display this data to the user, use template_liquid with data="{filename}". '
1239                          f'To process the data yourself, use load_artifact("{filename}").'
1240                      )
1241                  message_parts_for_llm.append(artifact_msg)
1242  
1243              log.debug(
1244                  "%s Added saved artifact details to LLM response.", log_identifier
1245              )
1246          else:
1247              message_parts_for_llm.append(
1248                  "Saving the full response as an artifact failed."
1249              )
1250              if save_result:
1251                  final_llm_response_dict["saved_mcp_response_artifact_details"] = (
1252                      save_result.model_dump(exclude_none=True)
1253                  )
1254              log.warning(
1255                  "%s Artifact save failed, error details included in LLM response.",
1256                  log_identifier,
1257              )
1258      else:
1259          final_llm_response_dict["mcp_tool_output"] = mcp_response_dict
1260  
1261      if needs_saving_as_artifact and (
1262          save_result
1263          and save_result.status in [McpSaveStatus.SUCCESS, McpSaveStatus.PARTIAL_SUCCESS]
1264      ):
1265          if needs_truncation_for_llm:
1266              # Data was too large - withheld from LLM, only available via artifact
1267              final_llm_response_dict["status"] = "processed_saved_artifact_only"
1268          else:
1269              final_llm_response_dict["status"] = "processed_and_saved"
1270      elif needs_saving_as_artifact:
1271          if needs_truncation_for_llm:
1272              final_llm_response_dict["status"] = "processed_artifact_only_save_failed"
1273          else:
1274              final_llm_response_dict["status"] = "processed_save_failed"
1275      elif needs_truncation_for_llm:
1276          # This case shouldn't happen (truncation implies saving), but handle it
1277          final_llm_response_dict["status"] = "processed_data_withheld"
1278      else:
1279          final_llm_response_dict["status"] = "processed"
1280  
1281      if not message_parts_for_llm:
1282          message_parts_for_llm.append(f"Response from tool '{tool.name}' processed.")
1283      final_llm_response_dict["message_to_llm"] = " ".join(message_parts_for_llm)
1284  
1285      log.info(
1286          "%s Returning processed response for LLM. Final status: %s",
1287          log_identifier,
1288          final_llm_response_dict.get("status", "unknown"),
1289      )
1290      return final_llm_response_dict
1291  
1292  
1293  def _generate_fenced_block_syntax_rules() -> str:
1294      """Generates the shared syntax rules for all fenced blocks."""
1295      open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN
1296      close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE
1297      return f"""
1298  **Fenced Block Syntax Rules (Applies to `save_artifact` and `template_liquid`):**
1299  To create content blocks, you MUST use the EXACT syntax shown below.
1300  
1301  **EXACT SYNTAX (copy this pattern exactly):**
1302  {open_delim}keyword: parameter="value" ...
1303  The content for the block goes here.
1304  It can span multiple lines.
1305  {close_delim}
1306  
1307  **CRITICAL FORMATTING RULES:**
1308    1. The opening delimiter MUST be EXACTLY `{open_delim}`.
1309    2. Immediately after the delimiter, write the keyword (`save_artifact` or `template_liquid`) followed by a colon, with NO space before the colon (e.g., `{open_delim}save_artifact:`).
1310    3. All parameters (like `filename`, `data`, `mime_type`) must be on the SAME line as the opening delimiter.
1311    4. All parameter values **MUST** be enclosed in double quotes (e.g., `filename="example.txt"`).
1312    5. You **MUST NOT** use double quotes `"` inside parameter values. Use single quotes or rephrase instead.
1313    6. The block's content begins on the line immediately following the parameters.
1314    7. Close the block with EXACTLY `{close_delim}` (three angle brackets) on its own line.
1315    8. Do NOT surround the block with triple backticks (```). The `{open_delim}` and `{close_delim}` delimiters are sufficient.
1316  
1317  **COMMON ERRORS TO AVOID:**
1318    ❌ WRONG: `{open_delim[0:1]}template_liquid:` (only 1 angle brackets)
1319    ❌ WRONG: `{open_delim[0:2]}save_artifact:` (only 2 angle brackets)
1320    ❌ WRONG: `{open_delim}save_artifact` (missing colon)
1321    ✅ CORRECT: `{open_delim}save_artifact: filename="test.txt" mime_type="text/plain"`
1322  """
1323  
1324  
1325  def _generate_fenced_artifact_instruction() -> str:
1326      """Generates the instruction text for using fenced artifact blocks."""
1327      open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN
1328      return f"""\
1329  **Creating Text-Based Artifacts (`{open_delim}save_artifact: ...`):**
1330  
1331  When to Create Artifacts:
1332  Create an artifact when the content provides value as a standalone file, such as:
1333  - Content with special formatting (HTML, Markdown, CSS).
1334  - Documents intended for use outside the conversation (reports, emails).
1335  - Structured reference content (schedules, guides, templates).
1336  - Substantial text documents or technical documentation.
1337  
1338  When NOT to Create Artifacts:
1339  - Simple answers, explanations, or conversational responses.
1340  - Brief advice, opinions, or short lists.
1341  
1342  Behavior of Created Artifacts:
1343  - They are sent to the user as an interactive file component.
1344  - The user can see the content, so there is no need to return or embed it again.
1345  
1346  Parameters for `{open_delim}save_artifact: ...`:
1347  - `filename="your_filename.ext"` (REQUIRED)
1348  - `mime_type="text/plain"` (optional, defaults to text/plain)
1349  - `description="A brief description."` (optional)
1350  - `tags="tag1,tag2"` (optional, comma-separated list of tags for categorization)
1351  
1352  Tagging Working Files:
1353  Add `tags="{ARTIFACT_TAG_WORKING}"` to artifacts that are intermediate or internal (e.g., scratch data, temp files, intermediate results used as input to further processing). These are hidden from the user's file list by default. Do NOT tag artifacts that are the final deliverable for the user.
1354  
1355  The system will automatically save the content and confirm it in the next turn.
1356  """
1357  
1358  
1359  def _generate_inline_template_instruction() -> str:
1360      """Generates the instruction text for using inline Liquid templates."""
1361      open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN
1362      close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE
1363      return f"""\
1364  **Inline Liquid Templates (`{open_delim}template_liquid: ...`):**
1365  
1366  Use inline Liquid templates to dynamically render data from artifacts for user-friendly display. This is faster and more accurate than reading the artifact and reformatting it yourself.
1367  
1368  IMPORTANT: Template Format
1369  - Templates use Liquid template syntax (same as Shopify templates - NOTE that Jekyll extensions are NOT supported).
1370  
1371  When to Use Inline Templates:
1372  - Formatting CSV, JSON, or YAML data into tables or lists.
1373  - Applying simple transformations (filtering, limiting rows).
1374  
1375  Parameters for `{open_delim}template_liquid: ...`:
1376  - `data="filename.ext"` (REQUIRED): The data artifact to render. Can include version: `data="file.csv:2"`.
1377  - `jsonpath="$.expression"` (optional): JSONPath to extract a subset of JSON/YAML data.
1378  - `limit="N"` (optional): Limit to the first N rows (CSV) or items (JSON/YAML arrays).
1379  
1380  Data Context for Liquid Templates:
1381  - CSV data: Available as `headers` (array of column names) and `data_rows` (array of row arrays).
1382  - JSON/YAML arrays: Available as `items`.
1383  - JSON/YAML objects: Keys are directly available (e.g., `name`, `email`).
1384  
1385  Example - CSV Table:
1386  {open_delim}template_liquid: data="sales_data.csv" limit="5"
1387  | {{% for h in headers %}}{{{{ h }}}} | {{% endfor %}}
1388  |{{% for h in headers %}}---|{{% endfor %}}
1389  {{% for row in data_rows %}}| {{% for cell in row %}}{{{{ cell }}}} | {{% endfor %}}{{% endfor %}}
1390  {close_delim}
1391  
1392  **IMPORTANT - Pipe Characters in Markdown Tables:**
1393  Text data may contain "|" characters which will break markdown table rendering by pushing data into wrong columns. For text fields that might contain pipes, use the `replace` filter to escape them:
1394  `{{{{ item.summary | replace: "|", "&#124;" }}}}`
1395  Only apply this to text fields that might contain pipes - numerical columns don't need it.
1396  
1397  Negative Examples
1398  Use {{ issues.size }} instead of {{ issues|length }}
1399  Use {{ forloop.index }} instead of {{ loop.index }} (Liquid uses forloop not loop)
1400  Use {{ issue.fields.description | truncate: 200 }} instead of slicing with [:200]
1401  Do not use Jekyll-specific tags or filters (e.g., `{{% assign %}}`, `{{% capture %}}`, `where`, `sort`, `where_exp`, etc.)
1402  
1403  The rendered output will appear inline in your response automatically.
1404  
1405  **IMPORTANT - No Math Embeds Inside template_liquid:**
1406  Never place math embeds (e.g., `{open_delim}math:...{close_delim}`) inside a `template_liquid` block. Math embeds are resolved at a different stage and will not work correctly within Liquid templates. If you need to perform calculations on data, do the math outside the template_liquid block using separate math embeds, or use Liquid's built-in arithmetic filters (e.g., `| plus:`, `| minus:`, `| times:`, `| divided_by:`).
1407  """
1408  
1409  
1410  def _generate_artifact_creation_instruction() -> str:
1411      return """
1412      **Creating Text-Based Artifacts:**
1413  
1414      When to Create Text-based Artifacts:
1415      Create an artifact when the content provides value as a standalone file:
1416      - Content with special formatting (HTML, Markdown, CSS, structured markup) that requires proper rendering
1417      - Content explicitly intended for use outside this conversation (reports, emails, presentations, reference documents)
1418      - Structured reference content users will save or follow (schedules, guides, templates)
1419      - Content that will be edited, expanded, or reused
1420      - Substantial text documents
1421      - Technical documentation meant as reference material
1422  
1423      When NOT to Create Text-based Artifacts:
1424      - Simple answers, explanations, or conversational responses
1425      - Brief advice, opinions, or quick information
1426      - Short lists, summaries, or single paragraphs
1427      - Temporary content only relevant to the immediate conversation
1428      - Basic explanations that don't require reference material
1429      """
1430  
1431  
1432  def _generate_examples_instruction() -> str:
1433      open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN
1434      close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE
1435      embed_open_delim = EMBED_DELIMITER_OPEN
1436      embed_close_delim = EMBED_DELIMITER_CLOSE
1437  
1438      return (
1439          f"""\
1440      Example 1:
1441      - User: "Create a markdown file with your two csv files as tables."
1442      <note>There are two csv files already uploaded: data1.csv and data2.csv</note>
1443      - OrchestratorAgent:
1444      {embed_open_delim}status_update:Creating Markdown tables from CSV files...{embed_close_delim}
1445      {open_delim}save_artifact: filename="data_tables.md" mime_type="text/markdown" description="Markdown tables from CSV files"
1446      # Data Tables
1447      ## Data 1
1448      {open_delim}template_liquid: data="data1.csv"
1449      """
1450          + """| {% for h in headers %}{{ h }} | {% endfor %}
1451      |{% for h in headers %}---|{% endfor %}
1452      {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %}
1453      """
1454          + f"""{close_delim}
1455      ## Data 2
1456      {open_delim}template_liquid: data="data2.csv"
1457      """
1458          + """| {% for h in headers %}{{ h }} | {% endfor %}
1459      |{% for h in headers %}---|{% endfor %}
1460      {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %}
1461      """
1462          + f"""{close_delim}
1463      {close_delim}
1464      Example 2:
1465      - User: "Create a text file with the result of sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680)."
1466      - OrchestratorAgent:
1467      {embed_open_delim}status_update:Calculating and creating text file...{embed_close_delim}
1468      {open_delim}save_artifact: filename="math.txt" mime_type="text/plain" description="Result of sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680)"
1469      result = {embed_open_delim}math: sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680) | .2f{embed_close_delim}
1470      {close_delim}
1471      
1472      Example 3:
1473      - User: "Show me the first 10 entries from data1.csv"
1474      - OrchestratorAgent:
1475      {embed_open_delim}status_update:Loading CSV data...{embed_close_delim}
1476      {open_delim}template_liquid: data="data1.csv" limit="10"
1477      """
1478          + """| {% for h in headers %}{{ h }} | {% endfor %}
1479      |{% for h in headers %}---|{% endfor %}
1480      {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %}
1481      """
1482          + f"""{close_delim}
1483  
1484      Example 4:
1485      - User: "Show me the Jira issues as a table"
1486      <note>There is a JSON artifact jira_issues.json with items containing key, summary, status, type, assignee, updated fields</note>
1487      - OrchestratorAgent:
1488      {embed_open_delim}status_update:Rendering Jira issues table...{embed_close_delim}
1489      {open_delim}template_liquid: data="jira_issues.json" limit="10"
1490      """
1491          + """| Key | Summary | Status | Type | Assignee | Updated |
1492      |-----|---------|--------|------|----------|---------|
1493      {% for item in items %}| [{{ item.key }}](https://jira.example.com/browse/{{ item.key }}) | {{ item.summary | replace: "|", "&#124;" }} | {{ item.status }} | {{ item.type }} | {{ item.assignee }} | {{ item.updated }} |
1494      {% endfor %}"""
1495          + f"""
1496      {close_delim}
1497      <note>The replace filter on item.summary escapes any pipe characters that would break the markdown table. Only apply to text fields that might contain pipes.</note>
1498  
1499      Example 5:
1500      - User: "Search the database for all orders from last month"
1501      - OrchestratorAgent:
1502      {embed_open_delim}status_update:Querying order database...{embed_close_delim}
1503      [calls search_database tool with no visible text]
1504      [After getting results:]
1505      Found 247 orders from last month totaling $45,231.
1506  
1507      Example 6:
1508      - User: "Create an HTML with the chart image you just generated with the customer data."
1509      - OrchestratorAgent:
1510      {embed_open_delim}status_update:Generating HTML report with chart...{embed_close_delim}
1511      {open_delim}save_artifact: filename="customer_analysis.html" mime_type="text/html" description="Interactive customer analysis dashboard"
1512      <!DOCTYPE html>
1513      <html>
1514      <head>
1515          <title>Customer Chart - {embed_open_delim}datetime:%Y-%m-%d{embed_close_delim}</title>
1516      """
1517          + """
1518          <style>
1519              body { font-family: Arial, sans-serif; margin: 20px; }
1520              .metric { background: #f0f0f0; padding: 10px; margin: 10px 0; }
1521              img { max-width: 100%; height: auto; }
1522      """
1523          + f"""    </style>
1524          </head>
1525      <body>
1526      <h1>Customer Analysis Report</h1>
1527      <p>Generated: {embed_open_delim}datetime:iso{embed_close_delim}</p>
1528          
1529      <h2>Customer Distribution Chart</h2>
1530      <img src="{embed_open_delim}artifact_content:customer_chart.png >>> format:datauri{embed_close_delim}" alt="Customer Distribution">
1531      
1532      </body>
1533      </html>
1534      {close_delim}
1535  
1536      """
1537      )
1538  
1539  
1540  def _generate_embed_instruction(
1541      include_artifact_content: bool,
1542      log_identifier: str,
1543      suppress_artifact_return: bool = False,
1544  ) -> Optional[str]:
1545      """Generates the instruction text for using embeds.
1546  
1547      Args:
1548          include_artifact_content: Whether to include artifact_content embed instructions.
1549          log_identifier: Logging prefix.
1550          suppress_artifact_return: If True, omit artifact_return directives (used in SI mode
1551              where only the result embed matters).
1552      """
1553      open_delim = EMBED_DELIMITER_OPEN
1554      close_delim = EMBED_DELIMITER_CLOSE
1555      chain_delim = EMBED_CHAIN_DELIMITER
1556      early_types = "`math`, `datetime`, `uuid`, `artifact_meta`"
1557  
1558      modifier_list = MODIFIER_IMPLEMENTATIONS.keys()
1559      # Remove apply_to_template from the modifier list as it's been deprecated
1560      if "apply_to_template" in modifier_list:
1561          modifier_list = list(modifier_list)
1562          modifier_list.remove("apply_to_template")
1563      modifier_list = ", ".join([f"`{prefix}`" for prefix in modifier_list])
1564  
1565      base_instruction = f"""\
1566  **Using Dynamic Embeds in Responses:**
1567  
1568  You can use dynamic embeds in your text responses and tool parameters using the syntax {open_delim}type:expression {chain_delim} format{close_delim}. NOTE that this differs from 'save_artifact', which has  different delimiters. This allows you to
1569  always have correct information in your output. Specifically, make sure you always use embeds for math, even if it is simple. You will make mistakes if you try to do math yourself.
1570  Use HTML entities to escape the delimiters.
1571  This host resolves the following embed types *early* (before sending to the LLM or tool): {early_types}. This means the embed is replaced with its resolved value.
1572  - `{open_delim}math:expression | .2f{close_delim}`: Evaluates the math expression using asteval - this must just be plain math (plus random(), randint() and uniform()), don't import anything. Optional format specifier follows Python's format(). Use this for all math calculations rather than doing it yourself. Don't give approximations.
1573  - `{open_delim}datetime:format_or_keyword{close_delim}`: Inserts current date/time. Use Python strftime format (e.g., `%Y-%m-%d`) or keywords (`iso`, `timestamp`, `date`, `time`, `now`).
1574  - `{open_delim}uuid:{close_delim}`: Inserts a random UUID.
1575  - `{open_delim}artifact_meta:filename[:version]{close_delim}`: Inserts a summary of the artifact's metadata (latest version if unspecified).
1576  - `{open_delim}status_update:Your message here{close_delim}`: Generates an immediate, distinct status message event that is displayed to the user (e.g., 'Thinking...', 'Searching database...'). This message appears in a status area, not as part of the main chat conversation. Use this to provide interim feedback during processing.
1577  
1578  Examples:
1579  - `{open_delim}status_update:Analyzing data...{close_delim}` (Shows 'Analyzing data...' as a status update)
1580  - `The result of 23.5 * 4.2 is {open_delim}math:23.5 * 4.2 | .2f{close_delim}` (Embeds calculated result with 2 decimal places)
1581  """
1582  
1583      if not suppress_artifact_return:
1584          base_instruction += f"""\
1585  The following embeds are resolved *late* (by the gateway before final display):
1586  - `{open_delim}artifact_return:filename[:version]{close_delim}`: Attaches an artifact to your message so the user receives the file. The embed itself is removed from the text.
1587  
1588    **CRITICAL - Returning Artifacts to Users:**
1589    Only artifacts created with the `{open_delim}save_artifact:...{close_delim}` fenced block syntax are automatically sent to the user.
1590  
1591    **You MUST use artifact_return for:**
1592    - Artifacts created by tools (e.g., image generation, chart creation, file conversion)
1593    - Artifacts created by other agents you called
1594    - Artifacts from MCP servers
1595  
1596    **When deciding whether to return an artifact:**
1597    - Return artifacts the user explicitly requested or that answer their question
1598    - Return final outputs (charts, reports, images, documents)
1599    - Do NOT return intermediate/temporary artifacts (e.g., temp files, internal data)
1600  
1601    **Example - Tool creates an image:**
1602    User: "Create a chart of sales data"
1603    [You call a charting tool that creates sales_chart.png]
1604    Your response: "Here's the sales chart. {open_delim}artifact_return:sales_chart.png{close_delim}"
1605  
1606    **Example - Agent creates a report:**
1607    User: "Generate a quarterly report"
1608    [You call ReportAgent which creates quarterly_report.pdf]
1609    Your response: "The quarterly report is ready. {open_delim}artifact_return:quarterly_report.pdf{close_delim}"
1610  """
1611  
1612      artifact_content_instruction = f"""
1613  - `{open_delim}artifact_content:filename[:version] {chain_delim} modifier1:value1 {chain_delim} ... {chain_delim} format:output_format{close_delim}`: Embeds artifact content after applying a chain of modifiers. This is resolved *late* (typically by a gateway before final display).
1614      - If this embed resolves to binary content (like an image), it will be automatically converted into an attached file, similar to `artifact_return`.
1615      - Use `{chain_delim}` to separate the artifact identifier from the modifier steps and the final format step.
1616      - Available modifiers: {modifier_list}.
1617      - The `format:output_format` step *must* be the last step in the chain. Supported formats include `text`, `datauri`, `json`, `json_pretty`, `csv`. Formatting as datauri, will include the data URI prefix, so do not add it yourself.
1618      - Use `artifact_meta` first to check size; embedding large files may fail.
1619      - Efficient workflows for large artifacts:
1620          - To extract specific line ranges: `load_artifact(filename, version, include_line_numbers=True)` to identify lines, then use `slice_lines:start:end` modifier to extract that range.
1621          - To fill templates with many placeholders: use `artifact_search_and_replace_regex` with `replacements` array (single atomic operation instead of multiple calls).
1622          - Line numbers are display-only; `slice_lines` always operates on original content.
1623      - Examples:
1624          - `<img src="{open_delim}artifact_content:image.png {chain_delim} format:datauri{close_delim}`"> (Embed image as data URI - NOTE that this includes the datauri prefix. Do not add it yourself.)
1625          - `{open_delim}artifact_content:data.json {chain_delim} jsonpath:$.items[*] {chain_delim} select_fields:name,status {chain_delim} format:json_pretty{close_delim}` (Extract and format JSON fields)
1626          - `{open_delim}artifact_content:logs.txt {chain_delim} grep:ERROR {chain_delim} head:10 {chain_delim} format:text{close_delim}` (Get first 10 error lines)
1627          - `{open_delim}artifact_content:config.json {chain_delim} jsonpath:$.userPreferences.theme {chain_delim} format:text{close_delim}` (Extract a single value from a JSON artifact)
1628          - `{open_delim}artifact_content:server.log {chain_delim} tail:100 {chain_delim} grep:WARN {chain_delim} format:text{close_delim}` (Get warning lines from the last 100 lines of a log file)
1629          - `{open_delim}artifact_content:template.html {chain_delim} slice_lines:10:50 {chain_delim} format:text{close_delim}` (Extract lines 10-50 from a large file)
1630          - `<img src="{open_delim}artifact_content:diagram.png {chain_delim} format:datauri{close_delim}`"> (Embed an PNG diagram as a data URI)`
1631  """
1632  
1633      final_instruction = base_instruction
1634      if include_artifact_content:
1635          final_instruction += artifact_content_instruction
1636  
1637      final_instruction += f"""
1638  Ensure the syntax is exactly `{open_delim}type:expression{close_delim}` or `{open_delim}type:expression {chain_delim} ... {chain_delim} format:output_format{close_delim}` with no extra spaces around delimiters (`{open_delim}`, `{close_delim}`, `{chain_delim}`, `:`, `|`). Malformed directives will be ignored."""
1639  
1640      return final_instruction
1641  
1642  
1643  def _generate_conversation_flow_instruction() -> str:
1644      """Generates instruction text for conversation flow and response formatting."""
1645      open_delim = EMBED_DELIMITER_OPEN
1646      close_delim = EMBED_DELIMITER_CLOSE
1647      return f"""\
1648  **Conversation Flow and Response Formatting:**
1649  
1650  **CRITICAL: Minimize Narration - Maximize Results**
1651  
1652  You do NOT need to produce visible text on every turn. Many turns should contain ONLY status updates and tool calls, with NO visible text at all.
1653  Only produce visible text when you have actual results, answers, or insights to share with the user.
1654  
1655  Response Content Rules:
1656  1. Visible responses should contain ONLY:
1657     - Direct answers to the user's question
1658     - Analysis and insights derived from tool results
1659     - Final results and data
1660     - Follow-up questions when needed
1661     - Plans for complex multi-step tasks
1662  
1663  2. DO NOT include visible text for:
1664     - Process narration ("Let me...", "I'll...", "Now I will...")
1665     - Acknowledgments of tool calls ("I'm calling...", "Searching...")
1666     - Descriptions of what you're about to do
1667     - Play-by-play commentary on your actions
1668     - Transitional phrases between tool calls
1669  
1670  3. **MANDATORY: Emit a status_update embed BEFORE EVERY tool call.** This is required — the user sees a progress timeline and needs to know what is happening. The status text must be:
1671     - Objective and user-centric (describe what is happening, not what tool is being called)
1672     - Contextual (include the specific topic, query, or subject being worked on)
1673     - Concise (one short sentence, no more than 80 characters)
1674     - NEVER expose internal tool names, agent names, or implementation details
1675  
1676     Good status_update examples:
1677     - "{open_delim}status_update:Searching for current TSLA stock price...{close_delim}"
1678     - "{open_delim}status_update:Looking up weather in Ottawa...{close_delim}"
1679     - "{open_delim}status_update:Analyzing the quarterly sales data...{close_delim}"
1680     - "{open_delim}status_update:Creating a summary report...{close_delim}"
1681     - "{open_delim}status_update:Checking available flight options...{close_delim}"
1682  
1683     Bad status_update examples (DO NOT use these patterns):
1684     - "{open_delim}status_update:Calling web_search_google tool...{close_delim}" (exposes tool name)
1685     - "{open_delim}status_update:Delegating to ResearchAgent...{close_delim}" (exposes agent name)
1686     - "{open_delim}status_update:Processing...{close_delim}" (too vague, no context)
1687     - "{open_delim}status_update:Using peer_ResearchAgent...{close_delim}" (exposes internal name)
1688  
1689  4. NEVER mix process narration with status updates - if you use a status_update embed, do NOT repeat that information in visible text.
1690  
1691  5. When delegating to another agent (peer_ tools), still emit a status_update describing what you're asking them to do, NOT which agent you're calling.
1692  
1693  Examples:
1694  
1695  **Excellent (status update before tool call, no visible text):**
1696  "{open_delim}status_update:Searching for current TSLA stock price...{close_delim}" [then calls web_search_google tool, no visible text]
1697  
1698  **Excellent (status update before peer delegation):**
1699  "{open_delim}status_update:Researching Tesla stock performance...{close_delim}" [then calls peer_ResearchAgent, no visible text]
1700  
1701  **Good (visible text only contains results):**
1702  "{open_delim}status_update:Analyzing Q4 sales data...{close_delim}" [calls tool]
1703  "Sales increased 23% in Q4, driven primarily by enterprise accounts."
1704  
1705  **Bad (no status_update before tool call):**
1706  [calls tool directly without any status_update embed]
1707  
1708  **Bad (unnecessary narration instead of status_update):**
1709  "Let me retrieve the sales data for you." [then calls tool]
1710  
1711  **Bad (exposes internal names):**
1712  "{open_delim}status_update:Calling web_search_google...{close_delim}" [then calls tool]
1713  
1714  Remember: The user sees a progress timeline. Every tool call MUST be preceded by a contextual status_update embed. Never expose tool or agent names in status updates.
1715  """
1716  
1717  
1718  def _generate_tool_instructions_from_registry(
1719      active_tools: List[BuiltinTool],
1720      log_identifier: str,
1721  ) -> str:
1722      """Generates instruction text from a list of BuiltinTool definitions."""
1723      if not active_tools:
1724          return ""
1725  
1726      instructions_by_category = defaultdict(list)
1727      for tool in sorted(active_tools, key=lambda t: (t.category, t.name)):
1728          # Skip internal tools (those starting with underscore)
1729          if tool.name.startswith("_"):
1730              continue
1731  
1732          param_parts = []
1733          if tool.parameters and tool.parameters.properties:
1734              for name, schema in tool.parameters.properties.items():
1735                  is_optional = name not in (tool.parameters.required or [])
1736                  type_name = "any"
1737                  if schema and hasattr(schema, "type") and schema.type:
1738                      type_name = schema.type.name.lower()
1739  
1740                  param_str = f"{name}: {type_name}"
1741                  if is_optional:
1742                      param_str = f"Optional[{param_str}]"
1743                  param_parts.append(param_str)
1744  
1745          signature = f"`{tool.name}({', '.join(param_parts)})`"
1746          description = tool.description or "No description available."
1747  
1748          instructions_by_category[tool.category].append(f"- {signature}: {description}")
1749  
1750      full_instruction_list = []
1751      for category, tool_instructions in sorted(instructions_by_category.items()):
1752          category_display_name = category.replace("_", " ").title()
1753          full_instruction_list.append(
1754              f"You have access to the following '{category_display_name}' tools:"
1755          )
1756          full_instruction_list.extend(tool_instructions)
1757  
1758      return "\n".join(full_instruction_list)
1759  
1760  
1761  def inject_dynamic_instructions_callback(
1762      callback_context: CallbackContext,
1763      llm_request: LlmRequest,
1764      host_component: "SamAgentComponent",
1765      active_builtin_tools: List[BuiltinTool],
1766  ) -> Optional[LlmResponse]:
1767      """
1768      ADK before_model_callback to inject instructions based on host config.
1769      Modifies the llm_request directly.
1770      """
1771      log_identifier = "[Callback:InjectInstructions]"
1772      log.debug("%s Running instruction injection callback...", log_identifier)
1773  
1774      session = get_session_from_callback_context(callback_context)
1775      session.state["thinking_phase_active"] = False
1776      session.state["thinking_just_yielded"] = False
1777  
1778      if not host_component:
1779          log.error(
1780              "%s Host component instance not provided. Cannot inject instructions.",
1781              log_identifier,
1782          )
1783          return None
1784  
1785      injected_instructions = []
1786  
1787      planning_instruction = """\
1788  Parallel Tool Calling:
1789  The system is capable of calling multiple tools in parallel to speed up processing. Please try to run tools in parallel when they don't depend on each other. This saves money and time, providing faster results to the user.
1790  
1791  **Response Formatting - CRITICAL**:
1792  In most cases when calling tools, you should produce NO visible text at all - only status_update embeds and the tool calls themselves.
1793  The user can see a progress timeline showing your status updates, so narrating your actions is redundant and creates noise.
1794  
1795  **MANDATORY: You MUST emit a status_update embed BEFORE EVERY tool call.** The user's progress timeline depends on these. Status text must describe what you're doing in user-friendly terms — never expose tool names or agent names.
1796  
1797  If you do include visible text:
1798  - It must contain actual results, insights, or answers - NOT process narration
1799  - Do NOT end with a colon (":") before tool calls, as this leaves it hanging
1800  - Prefer ending with a period (".") if you must include visible text
1801  
1802  Examples:
1803   - BEST: "{open_delim}status_update:Looking up current TSLA stock price...{close_delim}" [then calls tool, NO visible text]
1804   - BAD: "Let me search for that information." [then calls tool]
1805   - BAD: "Searching for information..." [then calls tool]
1806   - BAD: [calls tool directly without any status_update embed]
1807  
1808  **CRITICAL - No Links From Training Data**:
1809  - DO NOT include URLs, links, or markdown links from your training data in responses
1810  - NEVER include markdown links like [text](url) or raw URLs like https://example.com unless they came from a tool result
1811  - If a delegated agent's response contains [[cite:searchN]] citations, those are properly formatted - preserve them exactly
1812  - If a delegated agent's response has no links, do NOT add any links yourself
1813  - The ONLY acceptable links are those returned by tools (web search, deep research, etc.) with proper citation format
1814  - Your role is to coordinate and present results, not to augment them with links from your training data
1815  
1816  Embeds in responses from agents:
1817  To be efficient, peer agents may respond with artifact_content in their responses. These will not be resolved until they are sent back to a gateway. If it makes
1818  sense, just carry that embed forward to your response to the user. For example, if you ask for an org chart from another agent and its response contains an embed like
1819  `{open_delim}artifact_content:org_chart.md{close_delim}`, you can just include that embed in your response to the user. The gateway will resolve it and display the org chart.
1820  
1821  Similarly, template_liquid blocks in peer agent responses can be carried forward to your response to the user for resolution by the gateway.
1822  
1823  When faced with a complex goal or request that involves multiple steps, data retrieval, or artifact summarization to produce a new report or document, you MUST first create a plan.
1824  Simple, direct requests like 'create an image of a dog' or 'write an email to thank my boss' do not require a plan.
1825  
1826  If a plan is created:
1827  1. It should be a terse, hierarchical list describing the steps needed, with each checkbox item on its own line.
1828  2. Use '⬜' for pending items, '✅' for completed items, and '❌' for cancelled items.
1829  3. If the plan changes significantly during execution, restate the updated plan.
1830  4. As items are completed, update the plan to check them off.
1831  
1832  """
1833      injected_instructions.append(planning_instruction)
1834  
1835      # Inject LLM self-awareness: tell the agent which model it is running as.
1836      _model_config = host_component.get_config("model", {})
1837      if isinstance(_model_config, dict):
1838          _llm_model_name = _model_config.get("model", "")
1839      elif isinstance(_model_config, str):
1840          _llm_model_name = _model_config
1841      else:
1842          _llm_model_name = ""
1843      if _llm_model_name:
1844          _llm_model_name_display = _llm_model_name.rsplit("/", 1)[-1]
1845          injected_instructions.append(
1846              f"**Your LLM Identity:**\n"
1847              f"You are running as the `{_llm_model_name_display}` language model. "
1848              "If a user asks which AI model or LLM you are, you may truthfully state this."
1849          )
1850          log.debug("%s Injected LLM self-awareness instruction (model: %s).", log_identifier, _llm_model_name_display)
1851  
1852      # Add the consolidated block instructions
1853      injected_instructions.append(_generate_fenced_artifact_instruction())
1854      injected_instructions.append(_generate_inline_template_instruction())
1855      injected_instructions.append(_generate_fenced_block_syntax_rules())
1856  
1857      agent_instruction_str: Optional[str] = None
1858      if host_component._agent_system_instruction_callback:
1859          log.debug(
1860              "%s Calling agent-provided system instruction callback.", log_identifier
1861          )
1862          try:
1863              agent_instruction_str = host_component._agent_system_instruction_callback(
1864                  callback_context, llm_request
1865              )
1866              if agent_instruction_str and isinstance(agent_instruction_str, str):
1867                  injected_instructions.append(agent_instruction_str)
1868                  log.info(
1869                      "%s Injected instructions from agent callback.", log_identifier
1870                  )
1871              elif agent_instruction_str:
1872                  log.warning(
1873                      "%s Agent instruction callback returned non-string type: %s. Ignoring.",
1874                      log_identifier,
1875                      type(agent_instruction_str),
1876                  )
1877          except Exception as e_cb:
1878              log.error(
1879                  "%s Error in agent-provided system instruction callback: %s. Skipping.",
1880                  log_identifier,
1881                  e_cb,
1882              )
1883      if host_component._agent_system_instruction_string:
1884          log.debug(
1885              "%s Using agent-provided static system instruction string.", log_identifier
1886          )
1887          agent_instruction_str = host_component._agent_system_instruction_string
1888          if agent_instruction_str and isinstance(agent_instruction_str, str):
1889              injected_instructions.append(agent_instruction_str)
1890              log.info("%s Injected static instructions from agent.", log_identifier)
1891  
1892      contents = llm_request.contents
1893      if contents:
1894          log.debug("\n\n### LLM Request Contents ###")
1895          for content in contents:
1896              if content.parts:
1897                  for part in content.parts:
1898                      if part.text:
1899                          log.debug("Content part: %s", part.text)
1900                      elif part.function_call:
1901                          log.debug("Function call: %s", part.function_call.name)
1902                      elif part.function_response:
1903                          log.debug("Function response: %s", part.function_response)
1904                      else:
1905                          log.debug("raw: %s", part)
1906          log.debug("### End LLM Request Contents ###\n\n")
1907  
1908      if host_component.get_config("enable_embed_resolution", True):
1909          include_artifact_content_instr = host_component.get_config(
1910              "enable_artifact_content_instruction", True
1911          )
1912          # In structured invocation mode, suppress artifact_return directives
1913          # since only the result embed matters
1914          is_si_mode = False
1915          a2a_context = callback_context.state.get("a2a_context")
1916          if a2a_context:
1917              logical_task_id = a2a_context.get("logical_task_id")
1918              if logical_task_id:
1919                  with host_component.active_tasks_lock:
1920                      task_context = host_component.active_tasks.get(logical_task_id)
1921                  if task_context:
1922                      is_si_mode = task_context.get_flag("structured_invocation", False)
1923          instruction = _generate_embed_instruction(
1924              include_artifact_content_instr, log_identifier,
1925              suppress_artifact_return=is_si_mode,
1926          )
1927          if instruction:
1928              injected_instructions.append(instruction)
1929              log.debug(
1930                  "%s Prepared embed instructions (artifact_content included: %s).",
1931                  log_identifier,
1932                  include_artifact_content_instr,
1933              )
1934  
1935          instruction = _generate_conversation_flow_instruction()
1936          if instruction:
1937              injected_instructions.append(instruction)
1938              log.debug("%s Prepared conversation flow instructions.", log_identifier)
1939  
1940      if active_builtin_tools:
1941          instruction = _generate_tool_instructions_from_registry(
1942              active_builtin_tools, log_identifier
1943          )
1944          if instruction:
1945              injected_instructions.append(instruction)
1946              log.debug(
1947                  "%s Prepared instructions for %d active built-in tools.",
1948                  log_identifier,
1949                  len(active_builtin_tools),
1950              )
1951  
1952      peer_instructions = callback_context.state.get("peer_tool_instructions")
1953      if peer_instructions and isinstance(peer_instructions, str):
1954          injected_instructions.append(peer_instructions)
1955          log.debug(
1956              "%s Injected peer discovery instructions from callback state.",
1957              log_identifier,
1958          )
1959  
1960      project_tool_instructions = callback_context.state.get(
1961          "project_tool_instructions"
1962      )
1963      if project_tool_instructions and isinstance(project_tool_instructions, str):
1964          injected_instructions.append(project_tool_instructions)
1965          log.debug(
1966              "%s Injected project tool instructions from callback state.",
1967              log_identifier,
1968          )
1969  
1970      # Check for WorkflowAgentTool instances and inject specific instructions
1971      has_workflow_tools = False
1972      if llm_request.tools_dict:
1973          for tool in llm_request.tools_dict.values():
1974              if isinstance(tool, WorkflowAgentTool):
1975                  has_workflow_tools = True
1976                  break
1977  
1978      if has_workflow_tools:
1979          workflow_instruction = (
1980              "**Workflow Execution:**\n"
1981              "You have access to workflow tools (prefixed with `workflow_`). These tools represent structured business processes.\n"
1982              "They support two modes of invocation:\n"
1983              "1. **Parameter Mode:** Provide arguments directly matching the tool's schema. Use this for new data or simple inputs.\n"
1984              "2. **Artifact Mode:** Provide a single `input_artifact` argument with the filename of an existing JSON artifact. "
1985              "Use this when passing large datasets or outputs from previous steps to avoid re-tokenizing.\n"
1986              "Do NOT provide both parameters and `input_artifact` simultaneously."
1987          )
1988          injected_instructions.append(workflow_instruction)
1989          log.debug("%s Injected workflow execution instructions.", log_identifier)
1990  
1991      last_call_notification_message_added = False
1992      try:
1993          invocation_context = callback_context._invocation_context
1994          if invocation_context and invocation_context.run_config:
1995              current_llm_calls = (
1996                  invocation_context._invocation_cost_manager._number_of_llm_calls
1997              )
1998              max_llm_calls = invocation_context.run_config.max_llm_calls
1999  
2000              log.debug(
2001                  "%s Checking for last LLM call: current_calls=%d, max_calls=%s",
2002                  log_identifier,
2003                  current_llm_calls,
2004                  max_llm_calls,
2005              )
2006  
2007              if (
2008                  max_llm_calls
2009                  and max_llm_calls > 0
2010                  and current_llm_calls >= (max_llm_calls - 1)
2011              ):
2012                  last_call_text = (
2013                      "IMPORTANT: This is your final allowed interaction for the current request. "
2014                      "Please inform the user that to continue this line of inquiry, they will need to "
2015                      "make a new request or explicitly ask to continue if the interface supports it. "
2016                      "Summarize your current findings and conclude your response."
2017                  )
2018                  if llm_request.contents is None:
2019                      llm_request.contents = []
2020  
2021                  last_call_content = adk_types.Content(
2022                      role="model",
2023                      parts=[adk_types.Part(text=last_call_text)],
2024                  )
2025                  llm_request.contents.append(last_call_content)
2026                  last_call_notification_message_added = True
2027                  log.info(
2028                      "%s Added 'last LLM call' notification as a 'model' message to llm_request.contents. Current calls (%d) reached max_llm_calls (%d).",
2029                      log_identifier,
2030                      current_llm_calls,
2031                      max_llm_calls,
2032                  )
2033      except Exception as e_last_call:
2034          log.error(
2035              "%s Error checking/injecting last LLM call notification message: %s",
2036              log_identifier,
2037              e_last_call,
2038          )
2039  
2040      injected_instructions.append(_generate_examples_instruction())
2041  
2042      if injected_instructions:
2043          combined_instructions = "\n\n---\n\n".join(injected_instructions)
2044          if llm_request.config is None:
2045              log.warning(
2046                  "%s llm_request.config is None, cannot append system instructions.",
2047                  log_identifier,
2048              )
2049          else:
2050              if llm_request.config.system_instruction is None:
2051                  llm_request.config.system_instruction = ""
2052  
2053              if llm_request.config.system_instruction:
2054                  llm_request.config.system_instruction += (
2055                      "\n\n---\n\n" + combined_instructions
2056                  )
2057              else:
2058                  llm_request.config.system_instruction = combined_instructions
2059              log.info(
2060                  "%s Injected %d dynamic instruction block(s) into llm_request.config.system_instruction.",
2061                  log_identifier,
2062                  len(injected_instructions),
2063              )
2064      elif not last_call_notification_message_added:
2065          log.debug(
2066              "%s No dynamic instructions (system or last_call message) were injected based on config.",
2067              log_identifier,
2068          )
2069  
2070      return None
2071  
2072  
2073  async def after_tool_callback_inject_metadata(
2074      tool: BaseTool,
2075      args: Dict,
2076      tool_context: ToolContext,
2077      tool_response: Dict,
2078      host_component: "SamAgentComponent",
2079  ) -> Optional[Dict]:
2080      """
2081      ADK after_tool_callback to automatically load and inject metadata for
2082      newly created artifacts into the tool's response dictionary.
2083      """
2084      log_identifier = f"[Callback:InjectMetadata:{tool.name}]"
2085      log.info(
2086          "%s Starting metadata injection for tool response, type: %s",
2087          log_identifier,
2088          type(tool_response).__name__,
2089      )
2090  
2091      if not host_component:
2092          log.error(
2093              "%s Host component instance not provided. Cannot proceed.",
2094              log_identifier,
2095          )
2096          return None
2097  
2098      if not tool_context.actions.artifact_delta:
2099          log.debug(
2100              "%s No artifact delta found. Skipping metadata injection.", log_identifier
2101          )
2102          return None
2103  
2104      artifact_service: Optional[BaseArtifactService] = (
2105          tool_context._invocation_context.artifact_service
2106      )
2107      if not artifact_service:
2108          log.error(
2109              "%s ArtifactService not available. Cannot load metadata.",
2110              log_identifier,
2111          )
2112          return None
2113  
2114      app_name = tool_context._invocation_context.app_name
2115      user_id = tool_context._invocation_context.user_id
2116      session_id = get_original_session_id(tool_context._invocation_context)
2117  
2118      metadata_texts = []
2119  
2120      for filename, version in tool_context.actions.artifact_delta.items():
2121          if filename.endswith(METADATA_SUFFIX):
2122              log.debug(
2123                  "%s Skipping metadata artifact '%s' itself.", log_identifier, filename
2124              )
2125              continue
2126  
2127          metadata_filename = f"{filename}{METADATA_SUFFIX}"
2128          log.debug(
2129              "%s Found data artifact '%s' v%d. Attempting to load metadata '%s' v%d.",
2130              log_identifier,
2131              filename,
2132              version,
2133              metadata_filename,
2134              version,
2135          )
2136  
2137          try:
2138              metadata_part = await artifact_service.load_artifact(
2139                  app_name=app_name,
2140                  user_id=user_id,
2141                  session_id=session_id,
2142                  filename=metadata_filename,
2143                  version=version,
2144              )
2145  
2146              if metadata_part and metadata_part.inline_data:
2147                  try:
2148                      metadata_dict = json.loads(
2149                          metadata_part.inline_data.data.decode("utf-8")
2150                      )
2151                      metadata_dict["version"] = version
2152                      metadata_dict["filename"] = filename
2153                      formatted_text = format_metadata_for_llm(metadata_dict)
2154                      metadata_texts.append(formatted_text)
2155                      log.info(
2156                          "%s Successfully loaded and formatted metadata for '%s' v%d.",
2157                          log_identifier,
2158                          filename,
2159                          version,
2160                      )
2161                  except json.JSONDecodeError as json_err:
2162                      log.warning(
2163                          "%s Failed to parse metadata JSON for '%s' v%d: %s",
2164                          log_identifier,
2165                          metadata_filename,
2166                          version,
2167                          json_err,
2168                      )
2169                  except Exception as fmt_err:
2170                      log.warning(
2171                          "%s Failed to format metadata for '%s' v%d: %s",
2172                          log_identifier,
2173                          metadata_filename,
2174                          version,
2175                          fmt_err,
2176                      )
2177              else:
2178                  log.warning(
2179                      "%s Companion metadata artifact '%s' v%d not found or empty.",
2180                      log_identifier,
2181                      metadata_filename,
2182                      version,
2183                  )
2184  
2185          except Exception as load_err:
2186              log.error(
2187                  "%s Error loading companion metadata artifact '%s' v%d: %s",
2188                  log_identifier,
2189                  metadata_filename,
2190                  version,
2191                  load_err,
2192              )
2193  
2194      if metadata_texts:
2195          if not isinstance(tool_response, dict):
2196              log.error(
2197                  "%s Tool response is not a dictionary. Cannot inject metadata. Type: %s",
2198                  log_identifier,
2199                  type(tool_response),
2200              )
2201              return None
2202  
2203          combined_metadata_text = "\n\n".join(metadata_texts)
2204          tool_response[METADATA_RESPONSE_KEY] = combined_metadata_text
2205          log.info(
2206              "%s Injected metadata for %d artifact(s) into tool response key '%s'.",
2207              log_identifier,
2208              len(metadata_texts),
2209              METADATA_RESPONSE_KEY,
2210          )
2211          return tool_response
2212      else:
2213          log.debug(
2214              "%s No metadata loaded or formatted. Returning original tool response.",
2215              log_identifier,
2216          )
2217          return None
2218  
2219  
2220  async def track_produced_artifacts_callback(
2221      tool: BaseTool,
2222      args: Dict,
2223      tool_context: ToolContext,
2224      tool_response: Dict,
2225      host_component: "SamAgentComponent",
2226  ) -> Optional[Dict]:
2227      """
2228      ADK after_tool_callback to automatically track all artifacts created by a tool.
2229      It inspects the artifact_delta and registers the created artifacts in the
2230      TaskExecutionContext.
2231      """
2232      log_identifier = f"[Callback:TrackArtifacts:{tool.name}]"
2233      log.debug("%s Starting artifact tracking for tool response.", log_identifier)
2234  
2235      if not tool_context.actions.artifact_delta:
2236          log.debug("%s No artifact delta found. Skipping tracking.", log_identifier)
2237          return None
2238  
2239      if not host_component:
2240          log.error(
2241              "%s Host component instance not provided. Cannot proceed.", log_identifier
2242          )
2243          return None
2244  
2245      try:
2246          a2a_context = tool_context.state.get("a2a_context", {})
2247          logical_task_id = a2a_context.get("logical_task_id")
2248          if not logical_task_id:
2249              log.warning(
2250                  "%s Could not find logical_task_id in tool_context. Cannot track artifacts.",
2251                  log_identifier,
2252              )
2253              return None
2254  
2255          with host_component.active_tasks_lock:
2256              task_context = host_component.active_tasks.get(logical_task_id)
2257  
2258          if not task_context:
2259              log.warning(
2260                  "%s TaskExecutionContext not found for task %s. Cannot track artifacts.",
2261                  log_identifier,
2262                  logical_task_id,
2263              )
2264              return None
2265  
2266          for filename, version in tool_context.actions.artifact_delta.items():
2267              if filename.endswith(METADATA_SUFFIX):
2268                  continue
2269              log.info(
2270                  "%s Registering produced artifact '%s' v%d for task %s.",
2271                  log_identifier,
2272                  filename,
2273                  version,
2274                  logical_task_id,
2275              )
2276              task_context.register_produced_artifact(filename, version)
2277  
2278      except Exception as e:
2279          log.exception(
2280              "%s Error during artifact tracking callback: %s", log_identifier, e
2281          )
2282  
2283      return None
2284  
2285  
2286  def log_streaming_chunk_callback(
2287      callback_context: CallbackContext,
2288      llm_response: LlmResponse,
2289      host_component: "SamAgentComponent",
2290  ) -> Optional[LlmResponse]:
2291      """
2292      ADK after_model_callback to log the content of each LLM response chunk
2293      *after* potential modification by other callbacks (like embed resolution).
2294      """
2295      log_identifier = "[Callback:LogChunk]"
2296      try:
2297          content_str = "None"
2298          is_partial = llm_response.partial
2299          is_final = llm_response.turn_complete
2300          if llm_response.content and llm_response.content.parts:
2301              texts = [p.text for p in llm_response.content.parts if p.text]
2302              content_str = '"' + "".join(texts) + '"' if texts else "[Non-text parts]"
2303          elif llm_response.error_message:
2304              content_str = f"[ERROR: {llm_response.error_message}]"
2305  
2306      except Exception as e:
2307          log.error("%s Error logging LLM chunk: %s", log_identifier, e)
2308  
2309      return None
2310  
2311  
2312  def _sanitize_bytes_in_dict_inplace(obj):
2313      """Recursively replace bytes values **in place** in a dict/list with a placeholder string.
2314  
2315      Mutates *obj* directly — callers should not rely on a return value.
2316  
2317      This is needed because LLM request dumps may contain inline_data with raw
2318      bytes (e.g., from inline vision images) which cannot be JSON-serialized
2319      for publishing over the Solace broker.
2320      """
2321      if isinstance(obj, dict):
2322          for key, value in obj.items():
2323              if isinstance(value, (bytes, bytearray)):
2324                  obj[key] = f"<binary data: {len(value)} bytes>"
2325              elif isinstance(value, (dict, list)):
2326                  _sanitize_bytes_in_dict_inplace(value)
2327      elif isinstance(obj, list):
2328          for i, item in enumerate(obj):
2329              if isinstance(item, (bytes, bytearray)):
2330                  obj[i] = f"<binary data: {len(item)} bytes>"
2331              elif isinstance(item, (dict, list)):
2332                  _sanitize_bytes_in_dict_inplace(item)
2333  
2334  
2335  def solace_llm_invocation_callback(
2336      callback_context: CallbackContext,
2337      llm_request: LlmRequest,
2338      host_component: "SamAgentComponent",
2339  ) -> Optional[LlmResponse]:
2340      """
2341      ADK before_model_callback to send a Solace message when an LLM is invoked,
2342      using the host_component's process_and_publish_adk_event method.
2343      """
2344      log_identifier = "[Callback:SolaceLLMInvocation]"
2345      log.debug(
2346          "%s Running Solace LLM invocation notification callback...", log_identifier
2347      )
2348  
2349      if not host_component:
2350          log.error(
2351              "%s Host component instance not provided. Cannot send Solace message.",
2352              log_identifier,
2353          )
2354          return None
2355  
2356      callback_context.state[A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY] = False
2357      log.debug(
2358          "%s Reset %s to False.", log_identifier, A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY
2359      )
2360  
2361      try:
2362          a2a_context = callback_context.state.get("a2a_context")
2363          if not a2a_context:
2364              log.error(
2365                  "%s a2a_context not found in callback_context.state. Cannot send Solace message.",
2366                  log_identifier,
2367              )
2368              return None
2369  
2370          logical_task_id = a2a_context.get("logical_task_id")
2371          context_id = a2a_context.get("contextId")
2372  
2373          # Store model name in callback state for later use in response callback
2374          model_name = host_component.model_config
2375          if isinstance(model_name, dict):
2376              model_name = model_name.get("model", "unknown")
2377          callback_context.state["model_name"] = model_name
2378  
2379          request_dump = llm_request.model_dump(exclude_none=True)
2380          # Sanitize binary data (e.g., inline_data from images) to make it JSON-serializable.
2381          # The raw bytes cannot be sent over the Solace broker in status events.
2382          _sanitize_bytes_in_dict_inplace(request_dump)
2383          llm_data = LlmInvocationData(request=request_dump)
2384          status_update_event = a2a.create_data_signal_event(
2385              task_id=logical_task_id,
2386              context_id=context_id,
2387              signal_data=llm_data,
2388              agent_name=host_component.agent_name,
2389          )
2390  
2391          loop = host_component.get_async_loop()
2392          if loop and loop.is_running():
2393              asyncio.run_coroutine_threadsafe(
2394                  host_component._publish_status_update_with_buffer_flush(
2395                      status_update_event,
2396                      a2a_context,
2397                      skip_buffer_flush=False,
2398                  ),
2399                  loop,
2400              )
2401              log.debug(
2402                  "%s Scheduled LLM invocation status update with buffer flush.",
2403                  log_identifier,
2404              )
2405          else:
2406              log.error(
2407                  "%s Async loop not available. Cannot publish LLM invocation status update.",
2408                  log_identifier,
2409              )
2410  
2411      except Exception as e:
2412          log.error(
2413              "%s Error during Solace LLM invocation notification: %s", log_identifier, e
2414          )
2415  
2416      return None
2417  
2418  
2419  def solace_llm_response_callback(
2420      callback_context: CallbackContext,
2421      llm_response: LlmResponse,
2422      host_component: "SamAgentComponent",
2423  ) -> Optional[LlmResponse]:
2424      """
2425      ADK after_model_callback to send a Solace message with the LLM's response
2426      and token usage information.
2427      """
2428      log_identifier = "[Callback:SolaceLLMResponse]"
2429      if llm_response.partial:  # Don't send partial responses for this notification
2430          log.debug("%s Skipping partial response", log_identifier)
2431          return None
2432  
2433      if not host_component:
2434          log.error(
2435              "%s Host component instance not provided. Cannot send Solace message.",
2436              log_identifier,
2437          )
2438          return None
2439  
2440      try:
2441          a2a_context = callback_context.state.get("a2a_context")
2442          if not a2a_context:
2443              log.error(
2444                  "%s a2a_context not found in callback_context.state. Cannot send Solace message.",
2445                  log_identifier,
2446              )
2447              return None
2448  
2449          agent_name = host_component.get_config("agent_name", "unknown_agent")
2450          logical_task_id = a2a_context.get("logical_task_id")
2451  
2452          # Check for parallel tool calls - if multiple function_calls in this response,
2453          # generate a parallel_group_id for the frontend to group them visually
2454          function_calls = []
2455          if llm_response.content and llm_response.content.parts:
2456              function_calls = [
2457                  p for p in llm_response.content.parts if p.function_call
2458              ]
2459  
2460          if len(function_calls) > 1:
2461              import uuid
2462              parallel_group_id = f"llm_batch_{uuid.uuid4().hex[:8]}"
2463              callback_context.state["parallel_group_id"] = parallel_group_id
2464              log.debug(
2465                  "%s Detected %d parallel tool calls, assigned parallel_group_id=%s",
2466                  log_identifier,
2467                  len(function_calls),
2468                  parallel_group_id,
2469              )
2470          else:
2471              # Clear any previous parallel_group_id
2472              callback_context.state["parallel_group_id"] = None
2473  
2474          llm_response_data = {
2475              "type": "llm_response",
2476              "data": llm_response.model_dump(exclude_none=True),
2477          }
2478  
2479          # Extract and record token usage
2480          if llm_response.usage_metadata:
2481              usage = llm_response.usage_metadata
2482              model_name = callback_context.state.get("model_name", "unknown")
2483  
2484              usage_dict = {
2485                  "input_tokens": usage.prompt_token_count,
2486                  "output_tokens": usage.candidates_token_count,
2487                  "model": model_name,
2488              }
2489  
2490              # Check for cached tokens (provider-specific)
2491              cached_tokens = 0
2492              if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
2493                  cached_tokens = getattr(usage.prompt_tokens_details, "cached_tokens", 0)
2494                  if cached_tokens > 0:
2495                      usage_dict["cached_input_tokens"] = cached_tokens
2496  
2497              # Add to response data
2498              llm_response_data["usage"] = usage_dict
2499  
2500              # Record in task context for aggregation
2501              with host_component.active_tasks_lock:
2502                  task_context = host_component.active_tasks.get(logical_task_id)
2503  
2504              if task_context:
2505                  # Resolve context-window for this model: admin UI value wins,
2506                  # LiteLLM registry is fallback, else None. The result is
2507                  # persisted on the task record so the gateway's indicator can
2508                  # render without cross-service lookups.
2509                  max_input_tokens: Optional[int] = None
2510                  try:
2511                      agent_model = getattr(getattr(host_component, "adk_agent", None), "model", None)
2512                      if agent_model is not None and hasattr(agent_model, "get_max_input_tokens"):
2513                          max_input_tokens = agent_model.get_max_input_tokens()
2514                      if log.isEnabledFor(logging.DEBUG):
2515                          # Diagnostic-only path. Config dict keys are filtered
2516                          # to exclude auth-adjacent names (api_key, token,
2517                          # secret, credential, password) to avoid leaking
2518                          # sensitive config shape into logs.
2519                          component_model_cfg = host_component.get_config("model", None)
2520                          if isinstance(component_model_cfg, dict):
2521                              _SENSITIVE = ("key", "token", "secret", "credential", "password", "auth")
2522                              cfg_keys = [
2523                                  k for k in component_model_cfg.keys()
2524                                  if not any(s in str(k).lower() for s in _SENSITIVE)
2525                              ]
2526                              cfg_max = component_model_cfg.get("max_input_tokens")
2527                          else:
2528                              cfg_keys = type(component_model_cfg).__name__
2529                              cfg_max = None
2530                          log.debug(
2531                              "%s Resolved max_input_tokens=%s for model=%s (agent_model_type=%s, cfg_keys=%s, cfg_max=%s)",
2532                              log_identifier,
2533                              max_input_tokens,
2534                              model_name,
2535                              type(agent_model).__name__ if agent_model else "None",
2536                              cfg_keys,
2537                              cfg_max,
2538                          )
2539                  except Exception:
2540                      log.exception("%s Could not resolve max_input_tokens", log_identifier)
2541                  task_context.record_token_usage(
2542                      input_tokens=usage.prompt_token_count,
2543                      output_tokens=usage.candidates_token_count,
2544                      model=model_name,
2545                      source="agent",
2546                      cached_input_tokens=cached_tokens,
2547                      max_input_tokens=max_input_tokens,
2548                  )
2549                  log.debug(
2550                      "%s Recorded token usage: input=%d, output=%d, cached=%d, model=%s",
2551                      log_identifier,
2552                      usage.prompt_token_count,
2553                      usage.candidates_token_count,
2554                      cached_tokens,
2555                      model_name,
2556                  )
2557  
2558          # This signal doesn't have a dedicated Pydantic model, so we create the
2559          # DataPart directly and use the lower-level helpers.
2560          data_part = a2a.create_data_part(data=llm_response_data)
2561          a2a_message = a2a.create_agent_parts_message(
2562              parts=[data_part],
2563              task_id=logical_task_id,
2564              context_id=a2a_context.get("contextId"),
2565          )
2566          status_update_event = a2a.create_status_update(
2567              task_id=logical_task_id,
2568              context_id=a2a_context.get("contextId"),
2569              message=a2a_message,
2570              is_final=False,
2571              metadata={"agent_name": agent_name},
2572          )
2573          loop = host_component.get_async_loop()
2574          if loop and loop.is_running():
2575              asyncio.run_coroutine_threadsafe(
2576                  host_component._publish_status_update_with_buffer_flush(
2577                      status_update_event,
2578                      a2a_context,
2579                      skip_buffer_flush=False,
2580                  ),
2581                  loop,
2582              )
2583              log.debug(
2584                  "%s Scheduled LLM response status update with buffer flush (final_chunk=%s).",
2585                  log_identifier,
2586                  llm_response.turn_complete,
2587              )
2588          else:
2589              log.error(
2590                  "%s Async loop not available. Cannot publish LLM response status update.",
2591                  log_identifier,
2592              )
2593  
2594      except Exception as e:
2595          log.error(
2596              "%s Error during Solace LLM response notification: %s", log_identifier, e
2597          )
2598  
2599      return None
2600  
2601  
2602  def notify_tool_invocation_start_callback(
2603      tool: BaseTool,
2604      args: Dict[str, Any],
2605      tool_context: ToolContext,
2606      host_component: "SamAgentComponent",
2607  ) -> None:
2608      """
2609      ADK before_tool_callback to send an A2A status message indicating
2610      that a tool is about to be invoked.
2611      """
2612      log_identifier = f"[Callback:NotifyToolInvocationStart:{tool.name}]"
2613      log.debug(
2614          "%s Triggered for tool '%s' with args: %s", log_identifier, tool.name, args
2615      )
2616  
2617      if not host_component:
2618          log.error(
2619              "%s Host component instance not provided. Cannot send notification.",
2620              log_identifier,
2621          )
2622          return
2623  
2624      a2a_context = tool_context.state.get("a2a_context")
2625      if not a2a_context:
2626          log.error(
2627              "%s a2a_context not found in tool_context.state. Cannot send notification.",
2628              log_identifier,
2629          )
2630          return
2631  
2632      try:
2633          serializable_args = {}
2634          for k, v in args.items():
2635              try:
2636                  json.dumps(v)
2637                  serializable_args[k] = v
2638              except TypeError:
2639                  serializable_args[k] = str(v)
2640  
2641          # Get parallel_group_id from callback state if this is part of a parallel batch
2642          parallel_group_id = tool_context.state.get("parallel_group_id")
2643  
2644          tool_data = ToolInvocationStartData(
2645              tool_name=tool.name,
2646              tool_args=serializable_args,
2647              function_call_id=tool_context.function_call_id,
2648              parallel_group_id=parallel_group_id,
2649          )
2650          host_component.publish_data_signal_from_thread(
2651              a2a_context=a2a_context,
2652              signal_data=tool_data,
2653              skip_buffer_flush=False,
2654              log_identifier=log_identifier,
2655          )
2656          log.debug(
2657              "%s Scheduled tool_invocation_start notification.",
2658              log_identifier,
2659          )
2660  
2661      except Exception as e:
2662          log.exception(
2663              "%s Error publishing tool_invocation_start status update: %s",
2664              log_identifier,
2665              e,
2666          )
2667  
2668      return None
2669  
2670  
2671  def notify_tool_execution_result_callback(
2672      tool: BaseTool,
2673      args: Dict[str, Any],
2674      tool_context: ToolContext,
2675      tool_response: Any,
2676      host_component: "SamAgentComponent",
2677  ) -> None:
2678      """
2679      ADK after_tool_callback to send an A2A status message with the result
2680      of a tool's execution.
2681      """
2682      log_identifier = f"[Callback:NotifyToolResult:{tool.name}]"
2683      log.debug("%s Triggered for tool '%s'", log_identifier, tool.name)
2684  
2685      if not host_component:
2686          log.error(
2687              "%s Host component instance not provided. Cannot send notification.",
2688              log_identifier,
2689          )
2690          return
2691  
2692      a2a_context = tool_context.state.get("a2a_context")
2693      if not a2a_context:
2694          log.error(
2695              "%s a2a_context not found in tool_context.state. Cannot send notification.",
2696              log_identifier,
2697          )
2698          return
2699  
2700      if tool.is_long_running and not tool_response:
2701          log.debug(
2702              "%s Tool is long-running and is not yet complete. Don't notify its completion",
2703              log_identifier,
2704          )
2705          return
2706  
2707      try:
2708          # Attempt to make the response JSON serializable
2709          serializable_response = tool_response
2710          if hasattr(tool_response, "model_dump"):
2711              serializable_response = tool_response.model_dump(exclude_none=True)
2712          else:
2713              try:
2714                  # A simple check to see if it can be dumped.
2715                  # This isn't perfect but catches many non-serializable types.
2716                  json.dumps(tool_response)
2717              except (TypeError, OverflowError):
2718                  serializable_response = str(tool_response)
2719  
2720          tool_data = ToolResultData(
2721              tool_name=tool.name,
2722              result_data=serializable_response,
2723              function_call_id=tool_context.function_call_id,
2724          )
2725          host_component.publish_data_signal_from_thread(
2726              a2a_context=a2a_context,
2727              signal_data=tool_data,
2728              skip_buffer_flush=False,
2729              log_identifier=log_identifier,
2730          )
2731          log.debug(
2732              "%s Scheduled tool_result notification for function call ID %s.",
2733              log_identifier,
2734              tool_context.function_call_id,
2735          )
2736  
2737      except Exception as e:
2738          log.exception(
2739              "%s Error publishing tool_result status update: %s",
2740              log_identifier,
2741              e,
2742          )
2743  
2744      return None
2745  
2746  
2747  def auto_continue_on_max_tokens_callback(
2748      callback_context: CallbackContext,
2749      llm_response: LlmResponse,
2750      host_component: "SamAgentComponent",
2751  ) -> Optional[LlmResponse]:
2752      """
2753      ADK after_model_callback to automatically continue an LLM response that
2754      was interrupted. This handles two interruption signals:
2755      1. The explicit `llm_response.interrupted` flag from the ADK.
2756      2. An implicit signal where the model itself calls a `_continue` tool.
2757      """
2758      log_identifier = "[Callback:AutoContinue]"
2759  
2760      if not host_component.get_config("enable_auto_continuation", True):
2761          log.debug("%s Auto-continuation is disabled. Skipping.", log_identifier)
2762          return None
2763  
2764      # An interruption is signaled by either the explicit flag or an implicit tool call.
2765      was_explicitly_interrupted = llm_response.interrupted
2766      was_implicitly_interrupted = False
2767      if llm_response.content and llm_response.content.parts:
2768          if any(
2769              p.function_call and p.function_call.name == "_continue"
2770              for p in llm_response.content.parts
2771          ):
2772              was_implicitly_interrupted = True
2773  
2774      if not was_explicitly_interrupted and not was_implicitly_interrupted:
2775          return None
2776  
2777      log.info(
2778          "%s Interruption signal detected (explicit: %s, implicit: %s). Triggering auto-continuation.",
2779          log_identifier,
2780          was_explicitly_interrupted,
2781          was_implicitly_interrupted,
2782      )
2783  
2784      # Get existing parts from the response, but filter out any `_continue` calls
2785      # the model might have added.
2786      existing_parts = []
2787      if llm_response.content and llm_response.content.parts:
2788          existing_parts = [
2789              p
2790              for p in llm_response.content.parts
2791              if not (p.function_call and p.function_call.name == "_continue")
2792          ]
2793          if was_implicitly_interrupted:
2794              log.debug(
2795                  "%s Removed implicit '_continue' tool call from response parts.",
2796                  log_identifier,
2797              )
2798  
2799      continue_tool_call = adk_types.FunctionCall(
2800          name="_continue_generation",
2801          args={},
2802          id=f"host-continue-{uuid.uuid4()}",
2803      )
2804      continue_part = adk_types.Part(function_call=continue_tool_call)
2805  
2806      all_parts = existing_parts + [continue_part]
2807  
2808      # If there was no text content in the interrupted part, add a space to ensure
2809      # the event is not filtered out by history processing logic.
2810      if not any(p.text for p in existing_parts):
2811          all_parts.insert(0, adk_types.Part(text=" "))
2812          log.debug(
2813              "%s Prepended empty text part to ensure event is preserved.", log_identifier
2814          )
2815  
2816      # Create a new, non-interrupted LlmResponse containing all parts.
2817      # This ensures the partial text is saved to history and the tool call is executed.
2818      hijacked_response = LlmResponse(
2819          content=adk_types.Content(role="model", parts=all_parts),
2820          partial=False,
2821          custom_metadata={
2822              "was_interrupted": True,
2823          },
2824      )
2825  
2826      return hijacked_response
2827  
2828  
2829  def preregister_long_running_tools_callback(
2830      callback_context: CallbackContext,
2831      llm_response: LlmResponse,
2832      host_component: "SamAgentComponent",
2833  ) -> Optional[LlmResponse]:
2834      """
2835      ADK after_model_callback to pre-register all long-running tool calls
2836      before any tool execution begins. This prevents race conditions where
2837      one tool completes before another has registered.
2838  
2839      The race condition occurs because tools are executed via asyncio.gather
2840      (non-deterministic order) and each tool calls register_parallel_call_sent()
2841      inside its run_async(). If Tool A completes before Tool B even registers,
2842      the system thinks all calls are done (completed=1, total=1).
2843  
2844      By pre-registering all long-running tools in this callback (which runs
2845      BEFORE tool execution), we ensure the total count is set correctly upfront.
2846      """
2847      log_identifier = "[Callback:PreregisterLongRunning]"
2848  
2849      # Only process non-partial responses with function calls
2850      if llm_response.partial:
2851          return None
2852  
2853      if not llm_response.content or not llm_response.content.parts:
2854          return None
2855  
2856      # Find all long-running tool calls (identified by peer_ or workflow_ prefix)
2857      long_running_calls = []
2858      for part in llm_response.content.parts:
2859          if part.function_call:
2860              tool_name = part.function_call.name
2861              if tool_name.startswith(PEER_TOOL_PREFIX) or tool_name.startswith(WORKFLOW_TOOL_PREFIX):
2862                  long_running_calls.append(part.function_call)
2863  
2864      if not long_running_calls:
2865          return None
2866  
2867      # Get task context
2868      a2a_context = callback_context.state.get("a2a_context")
2869      if not a2a_context:
2870          log.warning("%s No a2a_context, cannot pre-register tools", log_identifier)
2871          return None
2872  
2873      logical_task_id = a2a_context.get("logical_task_id")
2874      invocation_id = callback_context._invocation_context.invocation_id
2875  
2876      with host_component.active_tasks_lock:
2877          task_context = host_component.active_tasks.get(logical_task_id)
2878  
2879      if not task_context:
2880          log.warning(
2881              "%s TaskContext not found for %s, cannot pre-register",
2882              log_identifier,
2883              logical_task_id,
2884          )
2885          return None
2886  
2887      # Pre-register ALL long-running calls atomically
2888      for fc in long_running_calls:
2889          task_context.register_parallel_call_sent(invocation_id)
2890  
2891      log.info(
2892          "%s Pre-registered %d long-running tool call(s) for invocation %s (task %s)",
2893          log_identifier,
2894          len(long_running_calls),
2895          invocation_id,
2896          logical_task_id,
2897      )
2898  
2899      return None  # Don't alter the response
2900  
2901  
2902  # ============================================================================
2903  # Tool Name Sanitization Callback
2904  # ============================================================================
2905  
2906  # Bedrock tool name validation pattern: must start with letter or underscore, then alphanumeric/underscore/hyphen
2907  # Note: We allow underscore prefix for internal tools like _notify_artifact_save
2908  VALID_TOOL_NAME_PATTERN = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_-]*$')
2909  
2910  
2911  def sanitize_tool_names_callback(
2912      callback_context: CallbackContext,
2913      llm_response: LlmResponse,
2914      host_component: "SamAgentComponent",
2915  ) -> Optional[LlmResponse]:
2916      """
2917      ADK after_model_callback to sanitize and validate tool names in LLM responses.
2918      
2919      This callback catches hallucinated tool names that would cause 
2920      errors when sent back to the LLM provider. 
2921      
2922      When an invalid tool name is detected:
2923      1. The invalid function_call is removed from the response
2924      2. A synthetic error message is injected to inform the LLM
2925      3. The response is modified to continue the conversation gracefully
2926      
2927      This prevents the entire request from failing with a provider error.
2928      """
2929      log_identifier = "[Callback:SanitizeToolNames]"
2930      
2931      # Only process non-partial responses with function calls
2932      if llm_response.partial:
2933          return None
2934      
2935      if not llm_response.content or not llm_response.content.parts:
2936          return None
2937      
2938      # Find all function calls and check for invalid tool names
2939      invalid_calls = []  # Calls that need error responses
2940      silently_dropped_calls = []  # Calls to drop without error (e.g., redundant internal tool calls)
2941      valid_parts = []
2942  
2943      for part in llm_response.content.parts:
2944          if part.function_call:
2945              tool_name = part.function_call.name
2946              function_call_id = part.function_call.id or ""
2947  
2948              # Check for placeholder patterns (tool names starting with $)
2949              # This catches $FUNCTION_NAME, $ARTIFACT_TOOL, $TOOL_NAME, etc.
2950              is_placeholder = tool_name.startswith('$')
2951  
2952              # Check against Bedrock's validation pattern
2953              is_valid_format = VALID_TOOL_NAME_PATTERN.match(tool_name) is not None
2954  
2955              # Check for unauthorized _notify_artifact_save calls
2956              # This tool should only be called by the system with host-notify- prefix
2957              # LLM-generated calls (without this prefix) should be silently dropped
2958              # since the artifact was already saved and a system call was injected
2959              is_unauthorized_internal_tool = (
2960                  tool_name == "_notify_artifact_save"
2961                  and not function_call_id.startswith("host-notify-")
2962              )
2963  
2964              if is_placeholder:
2965                  log.warning(
2966                      "%s Detected hallucinated placeholder tool name: '%s'. "
2967                      "This is a known LLM hallucination from training data examples.",
2968                      log_identifier,
2969                      tool_name,
2970                  )
2971                  invalid_calls.append((part.function_call, "placeholder_hallucination"))
2972              elif is_unauthorized_internal_tool:
2973                  # Silently drop LLM-generated _notify_artifact_save calls
2974                  # The system already injected a valid call, so this is redundant
2975                  log.info(
2976                      "%s Silently dropping redundant LLM-generated call to internal tool: '%s' (id=%s). "
2977                      "The system already injected a valid call for this artifact save.",
2978                      log_identifier,
2979                      tool_name,
2980                      function_call_id,
2981                  )
2982                  silently_dropped_calls.append(part.function_call)
2983                  # Don't add to invalid_calls - we don't want to send an error response
2984              elif not is_valid_format:
2985                  log.warning(
2986                      "%s Detected invalid tool name format: '%s'. "
2987                      "Tool names must match pattern: ^[a-zA-Z][a-zA-Z0-9_-]*$",
2988                      log_identifier,
2989                      tool_name,
2990                  )
2991                  invalid_calls.append((part.function_call, "invalid_format"))
2992              else:
2993                  # Valid tool call, keep it
2994                  valid_parts.append(part)
2995          else:
2996              # Non-function-call parts (text, etc.) are always kept
2997              valid_parts.append(part)
2998  
2999      if not invalid_calls and not silently_dropped_calls:
3000          # All tool names are valid, no modification needed
3001          return None
3002  
3003      # If we only have silently dropped calls (no invalid calls needing error responses),
3004      # just return a modified response without forcing another turn
3005      if not invalid_calls and silently_dropped_calls:
3006          log.info(
3007              "%s Silently dropped %d redundant internal tool call(s). No error response needed.",
3008              log_identifier,
3009              len(silently_dropped_calls),
3010          )
3011          # Return modified response with the dropped calls removed, preserving turn_complete
3012          return LlmResponse(
3013              content=adk_types.Content(
3014                  role="model",
3015                  parts=valid_parts if valid_parts else [adk_types.Part(text=" ")],
3016              ),
3017              partial=False,
3018              turn_complete=llm_response.turn_complete,  # Preserve original turn_complete
3019          )
3020      
3021      # Log the invalid calls for debugging
3022      for fc, reason in invalid_calls:
3023          log.error(
3024              "%s Removing invalid tool call: name='%s', reason='%s', args=%s",
3025              log_identifier,
3026              fc.name,
3027              reason,
3028              fc.args,
3029          )
3030      
3031      # Create synthetic error responses for the invalid calls
3032      # This informs the LLM that its tool call was invalid
3033      error_response_parts = []
3034      for fc, reason in invalid_calls:
3035          if reason == "placeholder_hallucination":
3036              error_message = (
3037                  f"ERROR: '{fc.name}' is not a valid tool name. "
3038                  "You appear to have hallucinated a placeholder. "
3039                  "Please use only the actual tools available to you. "
3040                  "Review the available tools and try again with a real tool name."
3041              )
3042          else:
3043              error_message = (
3044                  f"ERROR: '{fc.name}' is not a valid tool name format. "
3045                  "Tool names must start with a letter and contain only letters, numbers, underscores, and hyphens. "
3046                  "Please use only the actual tools available to you."
3047              )
3048          
3049          error_response_part = adk_types.Part.from_function_response(
3050              name=fc.name,
3051              response={"status": "error", "message": error_message},
3052          )
3053          # Preserve the function call ID for proper pairing
3054          if fc.id:
3055              error_response_part.function_response.id = fc.id
3056          error_response_parts.append(error_response_part)
3057      
3058      # If there are still valid parts, keep them and add error responses
3059      if valid_parts or error_response_parts:
3060          # Reconstruct the response with valid parts
3061          # The error responses will be added as a follow-up content
3062          modified_response = LlmResponse(
3063              content=adk_types.Content(
3064                  role="model",
3065                  parts=valid_parts if valid_parts else [adk_types.Part(text=" ")],
3066              ),
3067              partial=False,
3068              turn_complete=False,  # Force another turn to handle the error
3069          )
3070          
3071          # Store the error responses in callback state for the framework to handle
3072          # The ADK will automatically pair these with the function calls
3073          if error_response_parts:
3074              # Create a synthetic tool response content to inject
3075              error_content = adk_types.Content(
3076                  role="tool",
3077                  parts=error_response_parts,
3078              )
3079              # Store in callback state for potential use by other callbacks
3080              callback_context.state["sanitized_tool_error_content"] = error_content
3081              
3082              log.info(
3083                  "%s Sanitized %d invalid tool call(s). Response modified to continue gracefully.",
3084                  log_identifier,
3085                  len(invalid_calls),
3086              )
3087          
3088          return modified_response
3089      
3090      # Edge case: all parts were invalid function calls
3091      # Return a response asking the LLM to try again
3092      log.warning(
3093          "%s All function calls in response were invalid. Returning error guidance.",
3094          log_identifier,
3095      )
3096      
3097      return LlmResponse(
3098          content=adk_types.Content(
3099              role="model",
3100              parts=[
3101                  adk_types.Part(
3102                      text="I apologize, but I made an error in my tool usage. "
3103                      "Let me review the available tools and try again with the correct tool names."
3104                  )
3105              ],
3106          ),
3107          partial=False,
3108          turn_complete=True,
3109      )
3110  
3111  
3112  # ============================================================================
3113  # OpenAPI Tool Audit Logging Callbacks
3114  # ============================================================================
3115  
3116  
3117  def _is_openapi_tool(tool: BaseTool) -> bool:
3118      """
3119      Check if a tool is an OpenAPI-based RestApiTool.
3120  
3121      Args:
3122          tool: The tool to check
3123  
3124      Returns:
3125          True if the tool is OpenAPI-based, False otherwise
3126      """
3127      # Check the origin attribute set by SAM at initialization
3128      return getattr(tool, "origin", None) == "openapi"
3129  
3130  
3131  def _extract_openapi_base_url(tool: BaseTool) -> Optional[str]:
3132      """Extract base URL from an OpenAPI tool."""
3133      try:
3134          # Check for endpoint.base_url attribute (RestApiTool has this)
3135          if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "base_url"):
3136              return str(tool.endpoint.base_url)
3137  
3138          if hasattr(tool, "base_url") and tool.base_url:
3139              return str(tool.base_url)
3140  
3141          if hasattr(tool, "_base_url") and tool._base_url:
3142              return str(tool._base_url)
3143  
3144          if hasattr(tool, "_config") and isinstance(tool._config, dict):
3145              return tool._config.get("base_url")
3146  
3147      except Exception as e:
3148          log.debug("Could not extract base URL: %s", e)
3149  
3150      return None
3151  
3152  
3153  def _extract_openapi_http_method(tool: BaseTool) -> Optional[str]:
3154      """Extract HTTP method from OpenAPI tool."""
3155      # Get from tool's endpoint (RestApiTool)
3156      if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "method"):
3157          return str(tool.endpoint.method).upper()
3158      return None
3159  
3160  
3161  def _extract_openapi_operation_id(tool: BaseTool) -> Optional[str]:
3162      """Extract operation ID from OpenAPI tool."""
3163      # Get from tool's operation (RestApiTool)
3164      if hasattr(tool, "operation") and hasattr(tool.operation, "operationId"):
3165          return tool.operation.operationId
3166      return None
3167  
3168  
3169  def _extract_openapi_metadata(tool: BaseTool) -> Dict[str, Optional[str]]:
3170      """
3171      Extract all OpenAPI metadata from tool in one pass.
3172  
3173      Returns:
3174          Dict with keys: operation_id, base_url, http_method, endpoint_path, tool_uri
3175      """
3176      operation_id = _extract_openapi_operation_id(tool)
3177      base_url = _extract_openapi_base_url(tool)
3178      http_method = _extract_openapi_http_method(tool)
3179  
3180      # Extract endpoint path template (safe, non-sensitive)
3181      endpoint_path = None
3182      if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "path"):
3183          endpoint_path = tool.endpoint.path
3184  
3185      # Construct URI from base URL + path template
3186      tool_uri = base_url
3187      if base_url and endpoint_path:
3188          base_clean = base_url.rstrip('/')
3189          path_clean = endpoint_path.lstrip('/') if endpoint_path.startswith('/') else endpoint_path
3190          tool_uri = f"{base_clean}/{path_clean}"
3191  
3192      return {
3193          "operation_id": operation_id,
3194          "base_url": base_url,
3195          "http_method": http_method,
3196          "endpoint_path": endpoint_path,
3197          "tool_uri": tool_uri,
3198      }
3199  
3200  
3201  def audit_log_openapi_tool_invocation_start(
3202      tool: BaseTool,
3203      args: Dict[str, Any],
3204      tool_context: ToolContext,
3205      host_component: "SamAgentComponent",
3206  ) -> None:
3207      """
3208      ADK before_tool_callback for OpenAPI tools - logs invocation start.
3209  
3210      Args:
3211          tool: The tool being invoked
3212          args: Tool arguments (NOT logged)
3213          tool_context: ADK tool context
3214          host_component: The SamAgentComponent host
3215      """
3216      # Only process OpenAPI tools (RestApiTool instances created from OpenAPI specs)
3217      if not _is_openapi_tool(tool):
3218          return
3219  
3220      # Extract context
3221      invocation_context = tool_context._invocation_context
3222      session_id = None
3223      user_id = None
3224  
3225      if invocation_context and invocation_context.session:
3226          session_id = invocation_context.session.id
3227          user_id = invocation_context.session.user_id
3228  
3229      # Extract all OpenAPI metadata
3230      metadata = _extract_openapi_metadata(tool)
3231  
3232      # Build action field and correlation tag
3233      action = f"{metadata['http_method']}: {metadata['operation_id']}" if metadata['http_method'] and metadata['operation_id'] else (metadata['operation_id'] or "unknown")
3234      correlation_tag = f"corr:{session_id}" if session_id else "corr:unknown"
3235  
3236      # Store start time for latency calculation
3237      tool_context.state["audit_start_time_ms"] = int(time.time() * 1000)
3238  
3239      # Log in MCP-style format: [openapi-tool] [corr:xxx] message
3240      log.info(
3241          "[openapi-tool] [%s] Tool call: %s - User: %s, Agent: %s, URI: %s",
3242          correlation_tag,
3243          action,
3244          user_id,
3245          host_component.agent_name,
3246          metadata['tool_uri'],
3247          extra={
3248              "user_id": user_id,
3249              "agent_id": host_component.agent_name,
3250              "tool_name": tool.name,
3251              "session_id": session_id,
3252              "operation_id": metadata['operation_id'],
3253              "tool_uri": metadata['tool_uri'],
3254          },
3255      )
3256  
3257  
3258  async def audit_log_openapi_tool_execution_result(
3259      tool: BaseTool,
3260      args: Dict[str, Any],
3261      tool_context: ToolContext,
3262      tool_response: Any,
3263      host_component: "SamAgentComponent",
3264  ) -> Optional[Dict[str, Any]]:
3265      """
3266      ADK after_tool_callback for OpenAPI tools - logs execution result.
3267  
3268      Args:
3269          tool: The tool that was executed
3270          args: Tool arguments (NOT logged)
3271          tool_context: ADK tool context
3272          tool_response: Tool response (NOT logged for sensitive data)
3273          host_component: The SamAgentComponent host
3274  
3275      Returns:
3276          None (does not modify the response)
3277      """
3278      # Only process OpenAPI tools (RestApiTool instances created from OpenAPI specs)
3279      if not _is_openapi_tool(tool):
3280          return None
3281  
3282      # Extract context
3283      invocation_context = tool_context._invocation_context
3284      session_id = None
3285      user_id = None
3286  
3287      if invocation_context and invocation_context.session:
3288          session_id = invocation_context.session.id
3289          user_id = invocation_context.session.user_id
3290  
3291      # Extract all OpenAPI metadata
3292      metadata = _extract_openapi_metadata(tool)
3293  
3294      # Check if request failed or is pending auth
3295      has_error = False
3296      is_pending_auth = False
3297      error_type = None
3298  
3299      if isinstance(tool_response, dict):
3300          has_error = "error" in tool_response
3301          is_pending_auth = tool_response.get("pending") == True
3302  
3303          # Extract error type if present (safe, non-sensitive)
3304          if has_error:
3305              error_data = tool_response.get("error", {})
3306              if isinstance(error_data, dict):
3307                  error_type = error_data.get("type") or error_data.get("code")
3308              elif isinstance(error_data, str):
3309                  # If error is just a string, try to classify it
3310                  error_lower = error_data.lower()
3311                  if "auth" in error_lower or "unauthorized" in error_lower:
3312                      error_type = "auth_error"
3313                  elif "not found" in error_lower or "404" in error_lower:
3314                      error_type = "not_found"
3315                  elif "timeout" in error_lower:
3316                      error_type = "timeout"
3317                  elif "network" in error_lower or "connection" in error_lower:
3318                      error_type = "network_error"
3319  
3320      # Calculate latency
3321      latency_ms = None
3322      start_time = tool_context.state.get("audit_start_time_ms")
3323      if start_time:
3324          latency_ms = int(time.time() * 1000) - start_time
3325  
3326      # Build action and correlation tag
3327      action = f"{metadata['http_method']}: {metadata['operation_id']}" if metadata['http_method'] and metadata['operation_id'] else (metadata['operation_id'] or "unknown")
3328      correlation_tag = f"corr:{session_id}" if session_id else "corr:unknown"
3329  
3330      if has_error:
3331          # Build error message with optional error type
3332          error_msg = f"[openapi-tool] [{correlation_tag}] {action} failed - Path: {metadata['endpoint_path'] or 'unknown'}"
3333          if error_type:
3334              error_msg += f", Error Type: {error_type}"
3335          error_msg += f", Latency: {latency_ms}ms, User: {user_id}"
3336  
3337          log.error(
3338              error_msg,
3339              extra={
3340                  "user_id": user_id,
3341                  "agent_id": host_component.agent_name,
3342                  "tool_name": tool.name,
3343                  "session_id": session_id,
3344                  "operation_id": metadata['operation_id'],
3345                  "tool_uri": metadata['tool_uri'],
3346                  "endpoint_path": metadata['endpoint_path'],
3347                  "error_type": error_type,
3348              },
3349          )
3350      elif is_pending_auth:
3351          log.warning(
3352              "[openapi-tool] [%s] %s pending auth - Latency: %sms, User: %s",
3353              correlation_tag,
3354              action,
3355              latency_ms,
3356              user_id,
3357              extra={
3358                  "user_id": user_id,
3359                  "agent_id": host_component.agent_name,
3360                  "tool_name": tool.name,
3361                  "session_id": session_id,
3362                  "operation_id": metadata['operation_id'],
3363                  "tool_uri": metadata['tool_uri'],
3364                  "status": "pending_auth",
3365              },
3366          )
3367      else:
3368          # SUCCESS format
3369          log.info(
3370              "[openapi-tool] [%s] %s completed - Latency: %sms, User: %s",
3371              correlation_tag,
3372              action,
3373              latency_ms,
3374              user_id,
3375              extra={
3376                  "user_id": user_id,
3377                  "agent_id": host_component.agent_name,
3378                  "tool_name": tool.name,
3379                  "session_id": session_id,
3380                  "operation_id": metadata['operation_id'],
3381                  "tool_uri": metadata['tool_uri'],
3382              },
3383          )
3384  
3385      return None
3386  
3387  
3388  def apply_model_override_callback(
3389      callback_context: CallbackContext,
3390      llm_request: LlmRequest,
3391      host_component: "SamAgentComponent",
3392  ) -> Optional[LlmResponse]:
3393      """Read model_override from A2A task metadata.
3394  
3395      model_override is a full LiteLLM-ready config dict (same format as
3396      ``ModelConfigService.get_by_alias(raw=True)``).  Applied via a
3397      ``ContextVar`` that ``LiteLlm.generate_content_async`` consumes.
3398      """
3399      # Inline import to avoid circular dependency (callbacks ← setup → lite_llm)
3400      from .models.lite_llm import set_model_override
3401  
3402      a2a_context = callback_context.state.get("a2a_context")
3403      if not a2a_context:
3404          set_model_override(None)
3405          return None
3406  
3407      metadata = a2a_context.get("original_message_metadata") or {}
3408  
3409      model_override = metadata.get("model_override")
3410      if isinstance(model_override, dict) and model_override.get("model"):
3411          set_model_override(model_override)
3412          log.info(
3413              "[Callback:ModelOverride] Set per-request model override: %s",
3414              model_override["model"],
3415          )
3416      else:
3417          set_model_override(None)
3418  
3419      return None