callbacks.py
1 """ 2 ADK Callbacks for the A2A Host Component. 3 Includes dynamic instruction injection, artifact metadata injection, 4 embed resolution, and logging. 5 """ 6 import logging 7 import json 8 import asyncio 9 import time 10 import uuid 11 import re 12 from typing import Any, Dict, Optional, TYPE_CHECKING, List 13 from collections import defaultdict 14 15 from google.adk.tools import BaseTool, ToolContext 16 from google.adk.artifacts import BaseArtifactService 17 from google.adk.agents.callback_context import CallbackContext 18 from google.adk.models.llm_request import LlmRequest 19 from google.adk.models.llm_response import LlmResponse 20 from google.genai import types as adk_types 21 from google.adk.tools.mcp_tool import MCPTool 22 23 from .intelligent_mcp_callbacks import ( 24 save_mcp_response_as_artifact_intelligent, 25 McpSaveStatus, 26 ) 27 28 from ...agent.utils.artifact_helpers import ( 29 METADATA_SUFFIX, 30 format_metadata_for_llm, 31 ) 32 from ...agent.utils.context_helpers import ( 33 get_original_session_id, 34 get_session_from_callback_context, 35 ) 36 from ..tools.tool_definition import BuiltinTool 37 from ..tools.workflow_tool import WorkflowAgentTool, WORKFLOW_TOOL_PREFIX 38 from ..tools.peer_agent_tool import PEER_TOOL_PREFIX 39 40 from ...common.utils.embeds import ( 41 EMBED_DELIMITER_OPEN, 42 EMBED_DELIMITER_CLOSE, 43 EMBED_CHAIN_DELIMITER, 44 EARLY_EMBED_TYPES, 45 evaluate_embed, 46 resolve_embeds_in_string, 47 ) 48 from ...common.utils.embeds.types import ResolutionMode 49 50 from ...common.utils.embeds.modifiers import MODIFIER_IMPLEMENTATIONS 51 52 from ...common import a2a 53 from ...common.a2a.types import ArtifactInfo 54 from ...common.constants import ARTIFACT_TAG_WORKING 55 from ...common.data_parts import ( 56 AgentProgressUpdateData, 57 ArtifactCreationProgressData, 58 LlmInvocationData, 59 ThinkingContentData, 60 ToolInvocationStartData, 61 ToolResultData, 62 TemplateBlockData, 63 ) 64 65 66 METADATA_RESPONSE_KEY = "appended_artifact_metadata" 67 from ..tools.builtin_artifact_tools import _internal_create_artifact 68 from ...agent.adk.tool_wrapper import ADKToolWrapper 69 70 # Import the new parser and its events 71 from pydantic import BaseModel 72 from ...agent.adk.stream_parser import ( 73 FencedBlockStreamParser, 74 BlockStartedEvent, 75 BlockProgressedEvent, 76 BlockCompletedEvent, 77 BlockInvalidatedEvent, 78 TemplateBlockStartedEvent, 79 TemplateBlockCompletedEvent, 80 ARTIFACT_BLOCK_DELIMITER_OPEN, 81 ARTIFACT_BLOCK_DELIMITER_CLOSE, 82 TEMPLATE_LIQUID_START_SEQUENCE, 83 ) 84 85 86 log = logging.getLogger(__name__) 87 88 A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY = "temp:llm_stream_chunks_processed" 89 90 if TYPE_CHECKING: 91 from ..sac.component import SamAgentComponent 92 93 94 def _parse_tags_param(tags_str: Optional[str]) -> List[str]: 95 """Parse comma-separated tags string into a list. 96 97 Args: 98 tags_str: Comma-separated string of tags, or None/empty string 99 100 Returns: 101 List of trimmed, non-empty tag strings (empty list if no valid tags) 102 """ 103 if not tags_str: 104 return [] 105 return [t.strip() for t in tags_str.split(",") if t.strip()] 106 107 108 async def _publish_data_part_status_update( 109 host_component: "SamAgentComponent", 110 a2a_context: Dict[str, Any], 111 data_part_model: BaseModel, 112 ): 113 """Helper to construct and publish a TaskStatusUpdateEvent with a DataPart. 114 115 This function delegates to the host component's publish_data_signal_from_thread method, 116 which handles the async loop check and scheduling internally. 117 """ 118 host_component.publish_data_signal_from_thread( 119 a2a_context=a2a_context, 120 signal_data=data_part_model, 121 skip_buffer_flush=False, 122 log_identifier=host_component.log_identifier, 123 ) 124 125 126 async def _resolve_early_embeds_in_chunk( 127 chunk: str, 128 callback_context: CallbackContext, 129 host_component: "SamAgentComponent", 130 log_identifier: str, 131 ) -> str: 132 """ 133 Resolves early embeds in an artifact chunk before streaming to the browser. 134 135 Args: 136 chunk: The text chunk containing potential embeds 137 callback_context: The ADK callback context with services 138 host_component: The host component instance 139 log_identifier: Identifier for logging 140 141 Returns: 142 The chunk with early embeds resolved 143 """ 144 if not chunk or EMBED_DELIMITER_OPEN not in chunk: 145 return chunk 146 147 try: 148 # Build resolution context from callback_context (pattern from EmbedResolvingMCPToolset) 149 invocation_context = callback_context._invocation_context 150 if not invocation_context: 151 log.warning( 152 "%s No invocation context available for embed resolution", 153 log_identifier, 154 ) 155 return chunk 156 157 session_context = invocation_context.session 158 if not session_context: 159 log.warning( 160 "%s No session context available for embed resolution", log_identifier 161 ) 162 return chunk 163 164 resolution_context = { 165 "artifact_service": invocation_context.artifact_service, 166 "session_context": { 167 "session_id": get_original_session_id(invocation_context), 168 "user_id": session_context.user_id, 169 "app_name": session_context.app_name, 170 }, 171 } 172 173 # Resolve only early embeds (math, datetime, uuid, artifact_meta) 174 resolved_text, processed_until, _ = await resolve_embeds_in_string( 175 text=chunk, 176 context=resolution_context, 177 resolver_func=evaluate_embed, 178 types_to_resolve=EARLY_EMBED_TYPES, # Only resolve early embeds 179 resolution_mode=ResolutionMode.ARTIFACT_STREAMING, # New mode 180 log_identifier=log_identifier, 181 config=None, # Could pass host_component config if needed 182 ) 183 184 # SAFETY CHECK: If resolver buffered something, parser has a bug 185 if processed_until < len(chunk): 186 log.error( 187 "%s PARSER BUG DETECTED: Resolver buffered partial embed. " 188 "Chunk ends with: %r. Returning unresolved chunk to avoid corruption.", 189 log_identifier, 190 chunk[-50:] if len(chunk) > 50 else chunk, 191 ) 192 # Fallback: return original unresolved chunk (degraded but not corrupted) 193 return chunk 194 195 return resolved_text 196 197 except Exception as e: 198 log.error( 199 "%s Error resolving embeds in chunk: %s", log_identifier, e, exc_info=True 200 ) 201 return chunk # Return original chunk on error 202 203 204 async def process_thinking_content_callback( 205 callback_context: CallbackContext, 206 llm_response: LlmResponse, 207 host_component: "SamAgentComponent", 208 ) -> Optional[LlmResponse]: 209 """ 210 ADK after_model_callback to intercept and stream thinking/reasoning content. 211 212 Checks for thinking content in the LlmResponse custom_metadata (set by 213 the LiteLlm adapter when the model produces reasoning tokens) and publishes 214 it as a ThinkingContentData signal via the A2A data part mechanism. 215 216 Thinking content is stripped from the LlmResponse so it does not appear 217 in the main text stream. The frontend renders it in a collapsible block. 218 """ 219 log_identifier = "[Callback:ProcessThinkingContent]" 220 221 a2a_context = callback_context.state.get("a2a_context") 222 if not a2a_context: 223 return None 224 225 # Track thinking phase state in session to detect transitions 226 session = get_session_from_callback_context(callback_context) 227 thinking_phase_key = "thinking_phase_active" 228 thinking_just_yielded_key = "thinking_just_yielded" 229 230 # Check for phase transition BEFORE the metadata guard so that 231 # chunks with no custom_metadata (text after thinking) still close 232 # the thinking phase. 233 if not llm_response.custom_metadata: 234 if session.state.get(thinking_just_yielded_key): 235 session.state[thinking_just_yielded_key] = False 236 log.debug( 237 "%s Skipping is_complete: text chunk follows thinking chunk in same delta", 238 log_identifier, 239 ) 240 return None 241 242 if session.state.get(thinking_phase_key): 243 session.state[thinking_phase_key] = False 244 thinking_complete = ThinkingContentData( 245 content="", 246 is_complete=True, 247 ) 248 await _publish_data_part_status_update( 249 host_component, a2a_context, thinking_complete 250 ) 251 log.debug("%s Thinking phase completed (no metadata), sent is_complete signal", log_identifier) 252 return None 253 254 is_thinking = llm_response.custom_metadata.get("is_thinking_content", False) 255 thinking_text_full = llm_response.custom_metadata.get("thinking_content") 256 257 # Bug 5 fix: reset thinking_just_yielded at the start of each metadata-bearing 258 # response so stale flags from a previous turn don't carry over. 259 session.state[thinking_just_yielded_key] = False 260 261 if is_thinking and llm_response.content and llm_response.content.parts: 262 # Streaming thinking chunk — publish each text part as a signal 263 session.state[thinking_phase_key] = True 264 # Bug 6 fix: mark that a thinking chunk was just yielded so the next 265 # no-metadata yield (text portion of the same delta) is not mistaken 266 # for a thinking-phase-end transition. 267 session.state[thinking_just_yielded_key] = True 268 for part in llm_response.content.parts: 269 if part.text: 270 thinking_data = ThinkingContentData( 271 content=part.text, 272 is_complete=False, 273 ) 274 await _publish_data_part_status_update( 275 host_component, a2a_context, thinking_data 276 ) 277 log.debug( 278 "%s Published thinking chunk (%d chars)", 279 log_identifier, 280 len(part.text), 281 ) 282 283 # Strip thinking content from the LlmResponse so it does not 284 # appear in the main text stream 285 llm_response.content = adk_types.Content(role="model", parts=[]) 286 return None 287 288 # Check if thinking phase just ended (transition from thinking to regular content) 289 if session.state.get(thinking_phase_key) and not is_thinking: 290 session.state[thinking_phase_key] = False 291 thinking_complete = ThinkingContentData( 292 content="", 293 is_complete=True, 294 ) 295 await _publish_data_part_status_update( 296 host_component, a2a_context, thinking_complete 297 ) 298 log.debug("%s Thinking phase completed, sent is_complete signal", log_identifier) 299 300 elif thinking_text_full: 301 # Non-streaming: full thinking content in metadata (from non-streaming response) 302 thinking_data = ThinkingContentData( 303 content=thinking_text_full, 304 is_complete=True, 305 ) 306 await _publish_data_part_status_update( 307 host_component, a2a_context, thinking_data 308 ) 309 log.debug( 310 "%s Published full thinking content (%d chars)", 311 log_identifier, 312 len(thinking_text_full), 313 ) 314 # Remove from metadata to avoid downstream confusion 315 llm_response.custom_metadata.pop("thinking_content", None) 316 317 return None 318 319 320 async def process_artifact_blocks_callback( 321 callback_context: CallbackContext, 322 llm_response: LlmResponse, 323 host_component: "SamAgentComponent", 324 ) -> Optional[LlmResponse]: 325 """ 326 Orchestrates the parsing of fenced artifact blocks from an LLM stream 327 by delegating to a FencedBlockStreamParser instance. 328 This callback is stateful across streaming chunks within a single turn. 329 """ 330 log_identifier = "[Callback:ProcessArtifactBlocks]" 331 parser_state_key = "fenced_block_parser" 332 session = get_session_from_callback_context(callback_context) 333 334 parser: FencedBlockStreamParser = session.state.get(parser_state_key) 335 if parser is None: 336 log.debug("%s New turn. Creating new FencedBlockStreamParser.", log_identifier) 337 parser = FencedBlockStreamParser(progress_update_interval_bytes=250) 338 session.state[parser_state_key] = parser 339 session.state["completed_artifact_blocks_list"] = [] 340 session.state["completed_template_blocks_list"] = [] 341 session.state["artifact_chars_sent"] = ( 342 0 # Reset character tracking for new turn 343 ) 344 345 stream_chunks_were_processed = callback_context.state.get( 346 A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY, False 347 ) 348 if llm_response.partial: 349 callback_context.state[A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY] = True 350 351 if llm_response.partial or not stream_chunks_were_processed: 352 processed_parts: List[adk_types.Part] = [] 353 original_parts = llm_response.content.parts if llm_response.content else [] 354 a2a_context = callback_context.state.get("a2a_context") 355 356 for part in original_parts: 357 if part.text is not None: 358 parser_result = parser.process_chunk(part.text) 359 360 if llm_response.partial: 361 if parser_result.user_facing_text: 362 processed_parts.append( 363 adk_types.Part(text=parser_result.user_facing_text) 364 ) 365 else: 366 processed_parts.append(part) 367 368 for event in parser_result.events: 369 if isinstance(event, BlockStartedEvent): 370 log.info( 371 "%s Event: BlockStarted. Params: %s", 372 log_identifier, 373 event.params, 374 ) 375 # Reset character tracking for this new artifact block 376 session.state["artifact_chars_sent"] = 0 377 378 filename = event.params.get("filename", "unknown_artifact") 379 if filename == "unknown_artifact": 380 log.warning( 381 "%s Fenced artifact block started without a 'filename' parameter.", 382 log_identifier, 383 ) 384 description = event.params.get("description") 385 if filename == "unknown_artifact": 386 log.warning( 387 "%s Fenced artifact block started without a 'filename' parameter.", 388 log_identifier, 389 ) 390 # Extract tags from params (comma-separated string to list) 391 tags = _parse_tags_param(event.params.get("tags")) or None 392 if a2a_context: 393 status_text = f"Receiving artifact `{filename}`..." 394 if description: 395 status_text = ( 396 f"Receiving artifact `{filename}`: {description}" 397 ) 398 progress_data = AgentProgressUpdateData( 399 status_text=status_text 400 ) 401 await _publish_data_part_status_update( 402 host_component, a2a_context, progress_data 403 ) 404 # Also send an initial in-progress event to create the UI bubble 405 artifact_progress_data = ArtifactCreationProgressData( 406 filename=filename, 407 description=description, 408 status="in-progress", 409 bytes_transferred=0, 410 artifact_chunk=None, 411 tags=tags, 412 ) 413 414 await _publish_data_part_status_update( 415 host_component, a2a_context, artifact_progress_data 416 ) 417 params_str = " ".join( 418 [f'{k}="{v}"' for k, v in event.params.items()] 419 ) 420 original_text = f"{ARTIFACT_BLOCK_DELIMITER_OPEN}save_artifact: {params_str}\n" 421 session.state["artifact_block_original_text"] = original_text 422 423 elif isinstance(event, BlockProgressedEvent): 424 log.debug( 425 "%s Event: BlockProgressed. Size: %d", 426 log_identifier, 427 event.buffered_size, 428 ) 429 params = event.params 430 filename = params.get("filename", "unknown_artifact") 431 if filename == "unknown_artifact": 432 log.warning( 433 "%s Fenced artifact block progressed without a 'filename' parameter.", 434 log_identifier, 435 ) 436 if a2a_context: 437 # Resolve early embeds in the chunk before streaming 438 resolved_chunk = await _resolve_early_embeds_in_chunk( 439 chunk=event.chunk, 440 callback_context=callback_context, 441 host_component=host_component, 442 log_identifier=f"{log_identifier}[ResolveChunk]", 443 ) 444 # Extract tags from params (comma-separated string to list) 445 tags = _parse_tags_param(params.get("tags")) or None 446 447 progress_data = ArtifactCreationProgressData( 448 filename=filename, 449 description=params.get("description"), 450 status="in-progress", 451 bytes_transferred=event.buffered_size, 452 artifact_chunk=resolved_chunk, # Resolved chunk 453 tags=tags, 454 ) 455 456 # Track the cumulative character count of what we've sent 457 # We need character count (not bytes) to slice correctly later 458 previous_char_count = session.state.get( 459 "artifact_chars_sent", 0 460 ) 461 new_char_count = previous_char_count + len(event.chunk) 462 session.state["artifact_chars_sent"] = new_char_count 463 464 await _publish_data_part_status_update( 465 host_component, a2a_context, progress_data 466 ) 467 468 elif isinstance(event, BlockCompletedEvent): 469 log.debug( 470 "%s Event: BlockCompleted. Content length: %d", 471 log_identifier, 472 len(event.content), 473 ) 474 original_text = session.state.get( 475 "artifact_block_original_text", "" 476 ) 477 original_text += event.content 478 original_text += "»»»" 479 480 tool_context_for_call = ToolContext( 481 callback_context._invocation_context 482 ) 483 484 params = event.params 485 filename = params.get("filename") 486 if not filename or not filename.strip(): 487 log.warning( 488 "%s Fenced artifact block is missing a valid 'filename'. Failing operation.", 489 log_identifier, 490 ) 491 session.state["completed_artifact_blocks_list"].append( 492 { 493 "filename": ( 494 "unknown_artifact" 495 if filename is None 496 else filename 497 ), 498 "version": 0, 499 "status": "error", 500 "original_text": original_text, 501 } 502 ) 503 if a2a_context: 504 if not filename or not filename.strip(): 505 filename = "unknown_artifact" 506 progress_data = ArtifactCreationProgressData( 507 filename=filename or "unknown_artifact", 508 description=params.get("description"), 509 status="failed", 510 bytes_transferred=0, 511 ) 512 await _publish_data_part_status_update( 513 host_component, a2a_context, progress_data 514 ) 515 continue 516 517 kwargs_for_call = { 518 "filename": filename, 519 "content": event.content, 520 "mime_type": params.get("mime_type"), 521 "description": params.get("description"), 522 "metadata_json": params.get("metadata"), 523 "tool_context": tool_context_for_call, 524 } 525 if "schema_max_keys" in params: 526 try: 527 kwargs_for_call["schema_max_keys"] = int( 528 params["schema_max_keys"] 529 ) 530 except (ValueError, TypeError): 531 log.warning( 532 "%s Invalid 'schema_max_keys' value '%s'. Ignoring.", 533 log_identifier, 534 params["schema_max_keys"], 535 ) 536 # Extract tags from params (comma-separated string to list) 537 tags_list = _parse_tags_param(params.get("tags")) 538 539 # Auto-tag artifacts as internal when created during structured invocation 540 logical_task_id_for_tags = a2a_context.get("logical_task_id") 541 if logical_task_id_for_tags: 542 with host_component.active_tasks_lock: 543 task_ctx = host_component.active_tasks.get(logical_task_id_for_tags) 544 if task_ctx and task_ctx.get_flag("is_structured_invocation"): 545 if ARTIFACT_TAG_WORKING not in tags_list: 546 tags_list.append(ARTIFACT_TAG_WORKING) 547 548 if tags_list: 549 kwargs_for_call["tags"] = tags_list 550 wrapped_creator = ADKToolWrapper( 551 original_func=_internal_create_artifact, 552 tool_config=None, # No specific config for this internal tool 553 tool_name="_internal_create_artifact", 554 origin="internal", 555 resolution_type="early", 556 ) 557 save_result = await wrapped_creator(**kwargs_for_call) 558 559 if save_result.status in ["success", "partial"]: 560 status_for_tool = "success" 561 version_for_tool = save_result.data.get("data_version", 1) if save_result.data else 1 562 try: 563 logical_task_id = a2a_context.get("logical_task_id") 564 if logical_task_id: 565 with host_component.active_tasks_lock: 566 task_context = host_component.active_tasks.get( 567 logical_task_id 568 ) 569 if task_context: 570 task_context.register_produced_artifact( 571 filename, version_for_tool 572 ) 573 log.info( 574 "%s Registered inline artifact '%s' v%d for task %s.", 575 log_identifier, 576 filename, 577 version_for_tool, 578 logical_task_id, 579 ) 580 else: 581 log.warning( 582 "%s TaskExecutionContext not found for task %s, cannot register inline artifact '%s'.", 583 log_identifier, 584 logical_task_id, 585 filename, 586 ) 587 else: 588 log.warning( 589 "%s No logical_task_id, cannot register inline artifact.", 590 log_identifier, 591 ) 592 except Exception as e_track: 593 log.error( 594 "%s Failed to track inline artifact: %s", 595 log_identifier, 596 e_track, 597 ) 598 599 # Send final progress update with any remaining content not yet sent 600 if a2a_context: 601 # Check if there's unsent content (content after last progress event) 602 total_bytes = len(event.content.encode("utf-8")) 603 chars_already_sent = session.state.get( 604 "artifact_chars_sent", 0 605 ) 606 607 if chars_already_sent < len(event.content): 608 # There's unsent content - send it as a final progress update 609 final_chunk = event.content[chars_already_sent:] 610 611 # Resolve embeds in final chunk 612 resolved_final_chunk = await _resolve_early_embeds_in_chunk( 613 chunk=final_chunk, 614 callback_context=callback_context, 615 host_component=host_component, 616 log_identifier=f"{log_identifier}[ResolveFinalChunk]", 617 ) 618 619 final_progress_data = ArtifactCreationProgressData( 620 filename=filename, 621 description=params.get("description"), 622 status="in-progress", 623 bytes_transferred=total_bytes, 624 artifact_chunk=resolved_final_chunk, # Resolved final chunk 625 ) 626 627 await _publish_data_part_status_update( 628 host_component, a2a_context, final_progress_data 629 ) 630 631 # Publish completion status immediately via SSE 632 if a2a_context: 633 # Get tags (already parsed above as kwargs_for_call["tags"]) 634 completion_tags = kwargs_for_call.get("tags") 635 progress_data = ArtifactCreationProgressData( 636 filename=filename, 637 description=params.get("description"), 638 status="completed", 639 bytes_transferred=len(event.content), 640 mime_type=params.get("mime_type"), 641 version=version_for_tool, 642 tags=completion_tags, 643 ) 644 await _publish_data_part_status_update( 645 host_component, a2a_context, progress_data 646 ) 647 else: 648 status_for_tool = "error" 649 version_for_tool = 0 650 # Publish failure status immediately via SSE 651 if a2a_context: 652 # Get tags (already parsed above as kwargs_for_call["tags"]) 653 failure_tags = kwargs_for_call.get("tags") 654 progress_data = ArtifactCreationProgressData( 655 filename=filename, 656 description=params.get("description"), 657 status="failed", 658 bytes_transferred=len(event.content), 659 tags=failure_tags, 660 ) 661 await _publish_data_part_status_update( 662 host_component, a2a_context, progress_data 663 ) 664 665 session.state["completed_artifact_blocks_list"].append( 666 { 667 "filename": filename, 668 "version": version_for_tool, 669 "status": status_for_tool, 670 "description": params.get("description"), 671 "mime_type": params.get("mime_type"), 672 "bytes_transferred": len(event.content), 673 "original_text": original_text, 674 "tags": kwargs_for_call.get("tags"), 675 } 676 ) 677 678 elif isinstance(event, TemplateBlockStartedEvent): 679 log.debug( 680 "%s Event: TemplateBlockStarted. Params: %s", 681 log_identifier, 682 event.params, 683 ) 684 685 elif isinstance(event, TemplateBlockCompletedEvent): 686 log.debug( 687 "%s Event: TemplateBlockCompleted. Template length: %d", 688 log_identifier, 689 len(event.template_content), 690 ) 691 692 # Create a TemplateBlockData message to send to the gateway 693 template_id = str(uuid.uuid4()) 694 params = event.params 695 696 data_artifact = params.get("data") 697 if not data_artifact: 698 log.warning( 699 "%s Template block is missing 'data' parameter. Skipping.", 700 log_identifier, 701 ) 702 continue 703 704 template_data = TemplateBlockData( 705 template_id=template_id, 706 data_artifact=data_artifact, 707 jsonpath=params.get("jsonpath"), 708 limit=( 709 int(params.get("limit")) 710 if params.get("limit") 711 else None 712 ), 713 template_content=event.template_content, 714 ) 715 716 # Publish A2A status update with template metadata 717 if a2a_context: 718 await _publish_data_part_status_update( 719 host_component, a2a_context, template_data 720 ) 721 log.info( 722 "%s Published TemplateBlockData with ID: %s", 723 log_identifier, 724 template_id, 725 ) 726 727 # Reconstruct the original template block text for peer-to-peer responses 728 # Peer agents don't receive TemplateBlockData signals, so they need 729 # the original block text to pass templates through to the gateway 730 params_str = " ".join([f'{k}="{v}"' for k, v in params.items()]) 731 original_template_text = ( 732 f"{TEMPLATE_LIQUID_START_SEQUENCE} {params_str}\n" 733 f"{event.template_content}" 734 f"{ARTIFACT_BLOCK_DELIMITER_CLOSE}" 735 ) 736 737 # For RUN_BASED sessions (peer-to-peer agent requests), preserve the 738 # template block in the response text at its original position. 739 # This allows the calling agent to forward it to the gateway. 740 # Gateway requests use streaming sessions and receive TemplateBlockData 741 # signals instead. 742 is_run_based = a2a_context and a2a_context.get( 743 "is_run_based_session", False 744 ) 745 if is_run_based and llm_response.partial: 746 processed_parts.append( 747 adk_types.Part(text=original_template_text) 748 ) 749 log.debug( 750 "%s Preserved template block in RUN_BASED peer response. Template ID: %s", 751 log_identifier, 752 template_id, 753 ) 754 755 # Store template_id and original text in session for potential future use 756 # (Gateway will handle the actual resolution via signals, 757 # but peer agents need the original text in their responses) 758 if ( 759 "completed_template_blocks_list" not in session.state 760 or session.state["completed_template_blocks_list"] is None 761 ): 762 session.state["completed_template_blocks_list"] = [] 763 session.state["completed_template_blocks_list"].append( 764 { 765 "template_id": template_id, 766 "data_artifact": data_artifact, 767 "original_text": original_template_text, 768 } 769 ) 770 771 elif isinstance(event, BlockInvalidatedEvent): 772 log.debug( 773 "%s Event: BlockInvalidated. Rolled back: '%s'", 774 log_identifier, 775 event.rolled_back_text, 776 ) 777 else: 778 processed_parts.append(part) 779 780 if llm_response.partial: 781 if llm_response.content: 782 llm_response.content.parts = processed_parts 783 elif processed_parts: 784 llm_response.content = adk_types.Content(parts=processed_parts) 785 else: 786 log.debug( 787 "%s Ignoring text content of final aggregated response because stream was already processed.", 788 log_identifier, 789 ) 790 791 if not llm_response.partial and not llm_response.interrupted: 792 log.debug( 793 "%s Final, non-interrupted stream chunk received. Finalizing parser.", 794 log_identifier, 795 ) 796 final_parser_result = parser.finalize() 797 798 for event in final_parser_result.events: 799 if isinstance(event, BlockInvalidatedEvent): 800 # This event is emitted when an unterminated block is detected at the end of a turn. 801 # The rolled_back_text contains the original opening line + buffered content. 802 # We need to: 803 # 1. Send a "cancelled" status to clean up any in-progress UI 804 # 2. Send the original text as a status update so the frontend can display it 805 log.warning( 806 "%s Unterminated artifact block detected at end of turn. Rolled back text length: %d", 807 log_identifier, 808 len(event.rolled_back_text), 809 ) 810 811 # Try to extract filename from the rolled back text for the cancelled status 812 # The format is: «««save_artifact: filename="test.md" ...\n...content... 813 filename = "unknown_artifact" 814 filename_match = re.search(r'filename="([^"]+)"', event.rolled_back_text) 815 if filename_match: 816 filename = filename_match.group(1) 817 818 # Send a "cancelled" status to clean up any in-progress UI that was created 819 # when the block started. This tells the frontend to remove the artifact bubble. 820 a2a_context = callback_context.state.get("a2a_context") 821 if a2a_context: 822 cancelled_progress_data = ArtifactCreationProgressData( 823 filename=filename, 824 status="cancelled", # Use "cancelled" instead of "failed" for unterminated blocks 825 bytes_transferred=0, 826 # Include the rolled back text so the frontend can display it 827 # This is sent along with the cancelled status so the frontend 828 # can show the original text that was incorrectly parsed as an artifact 829 rolled_back_text=event.rolled_back_text, 830 ) 831 await _publish_data_part_status_update( 832 host_component, a2a_context, cancelled_progress_data 833 ) 834 log.debug( 835 "%s Sent 'cancelled' status with rolled back text for unterminated artifact block: %s", 836 log_identifier, 837 filename, 838 ) 839 840 # Note: We no longer need to store the original text in completed_artifact_blocks_list 841 # because we're sending it directly with the cancelled status. 842 # The frontend will handle displaying the text when it receives the cancelled status. 843 844 # If there was any rolled-back text from finalization, append it 845 if final_parser_result.user_facing_text: 846 if ( 847 llm_response.content 848 and llm_response.content.parts 849 and llm_response.content.parts[-1].text is not None 850 ): 851 llm_response.content.parts[ 852 -1 853 ].text += final_parser_result.user_facing_text 854 else: 855 if llm_response.content is None: 856 llm_response.content = adk_types.Content(parts=[]) 857 elif llm_response.content.parts is None: 858 llm_response.content.parts = [] 859 llm_response.content.parts.append( 860 adk_types.Part(text=final_parser_result.user_facing_text) 861 ) 862 863 # Check if any blocks were completed and need to be injected into the final response 864 completed_blocks_list = session.state.get("completed_artifact_blocks_list") 865 if completed_blocks_list: 866 log.info( 867 "%s Injecting info for %d saved artifact(s) into final LlmResponse.", 868 log_identifier, 869 len(completed_blocks_list), 870 ) 871 872 # Get a2a_context for sending signals 873 a2a_context = callback_context.state.get("a2a_context") 874 875 tool_call_parts = [] 876 for block_info in completed_blocks_list: 877 function_call_id = f"host-notify-{uuid.uuid4()}" 878 notify_tool_call = adk_types.FunctionCall( 879 name="_notify_artifact_save", 880 args={ 881 "filename": block_info["filename"], 882 "version": block_info["version"], 883 "status": block_info["status"], 884 }, 885 id=function_call_id, 886 ) 887 tool_call_parts.append(adk_types.Part(function_call=notify_tool_call)) 888 889 # Send artifact saved notification now that we have the function_call_id 890 # This ensures the signal and tool call arrive together 891 if block_info["status"] == "success" and a2a_context: 892 try: 893 artifact_info = ArtifactInfo( 894 filename=block_info["filename"], 895 version=block_info["version"], 896 mime_type=block_info.get("mime_type") 897 or "application/octet-stream", 898 size=block_info.get("bytes_transferred", 0), 899 description=block_info.get("description"), 900 version_count=None, # Count not available in save context 901 ) 902 await host_component.notify_artifact_saved( 903 artifact_info=artifact_info, 904 a2a_context=a2a_context, 905 function_call_id=function_call_id, 906 ) 907 log.debug( 908 "%s Published artifact saved notification for fenced block: %s (function_call_id=%s)", 909 log_identifier, 910 block_info["filename"], 911 function_call_id, 912 ) 913 except Exception as signal_err: 914 log.warning( 915 "%s Failed to publish artifact saved notification: %s", 916 log_identifier, 917 signal_err, 918 ) 919 920 existing_parts = llm_response.content.parts if llm_response.content else [] 921 final_existing_parts = existing_parts 922 923 if llm_response.content is None: 924 llm_response.content = adk_types.Content(parts=[]) 925 926 llm_response.content.parts = tool_call_parts + final_existing_parts 927 928 llm_response.turn_complete = True 929 llm_response.partial = False 930 931 session.state[parser_state_key] = None 932 session.state["completed_artifact_blocks_list"] = None 933 session.state["artifact_block_original_text"] = None 934 session.state["completed_template_blocks_list"] = None 935 log.debug("%s Cleaned up parser session state.", log_identifier) 936 937 return None 938 939 940 def create_dangling_tool_call_repair_content( 941 dangling_calls: List[adk_types.FunctionCall], error_message: str 942 ) -> adk_types.Content: 943 """ 944 Creates a synthetic ADK Content object to repair a dangling tool call. 945 946 Args: 947 dangling_calls: The list of FunctionCall objects that need a response. 948 error_message: The error message to include in the response. 949 950 Returns: 951 An ADK Content object with role='tool' containing the error response. 952 """ 953 error_response_parts = [] 954 for fc in dangling_calls: 955 error_response_part = adk_types.Part.from_function_response( 956 name=fc.name, 957 response={"status": "error", "message": error_message}, 958 ) 959 error_response_part.function_response.id = fc.id 960 error_response_parts.append(error_response_part) 961 962 return adk_types.Content(role="tool", parts=error_response_parts) 963 964 965 def repair_history_callback( 966 callback_context: CallbackContext, llm_request: LlmRequest 967 ) -> Optional[LlmResponse]: 968 """ 969 ADK before_model_callback to proactively check for and repair dangling 970 tool calls in the conversation history before it's sent to the LLM. 971 This acts as a "suspender" to catch any history corruption. 972 """ 973 log_identifier = "[Callback:RepairHistory]" 974 if not llm_request.contents: 975 return None 976 977 history_modified = False 978 i = 0 979 while i < len(llm_request.contents): 980 content = llm_request.contents[i] 981 function_calls = [] 982 if content and content.role == "model" and content.parts: 983 function_calls = [p.function_call for p in content.parts if p.function_call] 984 985 if function_calls: 986 next_content_is_valid_response = False 987 if (i + 1) < len(llm_request.contents): 988 next_content = llm_request.contents[i + 1] 989 if ( 990 next_content.role in ["user", "tool"] 991 and next_content.parts 992 and any(p.function_response for p in next_content.parts) 993 ): 994 next_content_is_valid_response = True 995 996 if not next_content_is_valid_response: 997 log.warning( 998 "%s Found dangling tool call in history for tool(s): %s. Repairing.", 999 log_identifier, 1000 [fc.name for fc in function_calls], 1001 ) 1002 repair_content = create_dangling_tool_call_repair_content( 1003 dangling_calls=function_calls, 1004 error_message="The previous tool call did not complete successfully and was automatically repaired.", 1005 ) 1006 llm_request.contents.insert(i + 1, repair_content) 1007 history_modified = True 1008 i += 1 1009 i += 1 1010 1011 if history_modified: 1012 log.info( 1013 "%s History was modified to repair dangling tool calls.", log_identifier 1014 ) 1015 1016 return None 1017 1018 1019 def _recursively_clean_pydantic_types(data: Any) -> Any: 1020 """ 1021 Recursively traverses a data structure (dicts, lists) and converts 1022 Pydantic-specific types like AnyUrl to their primitive string representation 1023 to ensure JSON serializability. 1024 """ 1025 if isinstance(data, dict): 1026 return { 1027 key: _recursively_clean_pydantic_types(value) for key, value in data.items() 1028 } 1029 elif isinstance(data, list): 1030 return [_recursively_clean_pydantic_types(item) for item in data] 1031 # Check for Pydantic's AnyUrl without a direct import to avoid dependency issues. 1032 elif type(data).__name__ == "AnyUrl" and hasattr(data, "__str__"): 1033 return str(data) 1034 return data 1035 1036 1037 def _mcp_response_contains_non_text(mcp_response_dict: Dict[str, Any]) -> bool: 1038 """ 1039 Checks if the 'content' list in an MCP response dictionary contains any 1040 items that are not of type 'text'. 1041 """ 1042 if not isinstance(mcp_response_dict, dict): 1043 return False 1044 1045 content_list = mcp_response_dict.get("content") 1046 if not isinstance(content_list, list): 1047 return False 1048 1049 for item in content_list: 1050 if isinstance(item, dict) and item.get("type") != "text": 1051 return True 1052 return False 1053 1054 1055 async def manage_large_mcp_tool_responses_callback( 1056 tool: BaseTool, 1057 args: Dict[str, Any], 1058 tool_context: ToolContext, 1059 tool_response: Any, 1060 host_component: "SamAgentComponent", 1061 ) -> Optional[Dict[str, Any]]: 1062 """ 1063 Manages large or non-textual responses from MCP tools. 1064 1065 This callback intercepts the response from an MCPTool. Based on the response's 1066 size and content type, it performs one or more of the following actions: 1067 1. **Saves as Artifact:** If the response size exceeds a configured threshold, 1068 or if it contains non-textual content (like images), it calls the 1069 `save_mcp_response_as_artifact_intelligent` function to save the 1070 response as one or more typed artifacts. 1071 2. **Truncates for LLM:** If the response size exceeds a configured limit for 1072 the LLM, it truncates the content to a preview string. 1073 3. **Constructs Final Response:** It builds a new dictionary to be returned 1074 to the LLM, which includes: 1075 - A `message_to_llm` summarizing what was done (e.g., saved, truncated). 1076 - `saved_mcp_response_artifact_details` with the result of the save operation. 1077 - `mcp_tool_output` containing either the original response or the truncated preview. 1078 - A `status` field indicating the outcome (e.g., 'processed_and_saved'). 1079 1080 The `tool_response` is the direct output from the tool's `run_async` method. 1081 """ 1082 log_identifier = f"[Callback:ManageLargeMCPResponse:{tool.name}]" 1083 log.info( 1084 "%s Starting callback for tool response, type: %s", 1085 log_identifier, 1086 type(tool_response).__name__, 1087 ) 1088 1089 if tool_response is None: 1090 return None 1091 1092 if not isinstance(tool, MCPTool): 1093 log.debug( 1094 "%s Tool is not an MCPTool. Skipping large response handling.", 1095 log_identifier, 1096 ) 1097 return ( 1098 tool_response 1099 if isinstance(tool_response, dict) 1100 else {"result": tool_response} 1101 ) 1102 1103 log.debug( 1104 "%s Tool is an MCPTool. Proceeding with large response handling.", 1105 log_identifier, 1106 ) 1107 1108 if hasattr(tool_response, "model_dump"): 1109 mcp_response_dict = tool_response.model_dump(exclude_none=True) 1110 log.debug("%s Converted MCPTool response object to dictionary.", log_identifier) 1111 elif isinstance(tool_response, dict): 1112 mcp_response_dict = tool_response 1113 log.debug("%s MCPTool response is already a dictionary.", log_identifier) 1114 else: 1115 log.warning( 1116 "%s MCPTool response is not a Pydantic model or dict (type: %s). Attempting to proceed, but serialization might fail.", 1117 log_identifier, 1118 type(tool_response), 1119 ) 1120 mcp_response_dict = tool_response 1121 1122 # Clean any Pydantic-specific types before serialization 1123 mcp_response_dict = _recursively_clean_pydantic_types(mcp_response_dict) 1124 cleaned_args = _recursively_clean_pydantic_types(args) 1125 1126 try: 1127 save_threshold = host_component.get_config( 1128 "mcp_tool_response_save_threshold_bytes", 2048 1129 ) 1130 llm_max_bytes = host_component.get_config("mcp_tool_llm_return_max_bytes", 4096) 1131 log.debug( 1132 "%s Config: save_threshold=%d bytes, llm_max_bytes=%d bytes.", 1133 log_identifier, 1134 save_threshold, 1135 llm_max_bytes, 1136 ) 1137 except Exception as e: 1138 log.error( 1139 "%s Error retrieving configuration: %s. Using defaults.", log_identifier, e 1140 ) 1141 save_threshold = 2048 1142 llm_max_bytes = 4096 1143 1144 contains_non_text_content = _mcp_response_contains_non_text(mcp_response_dict) 1145 if not contains_non_text_content: 1146 try: 1147 serialized_original_response_str = json.dumps(mcp_response_dict) 1148 original_response_bytes = len( 1149 serialized_original_response_str.encode("utf-8") 1150 ) 1151 log.debug( 1152 "%s Original response size: %d bytes.", 1153 log_identifier, 1154 original_response_bytes, 1155 ) 1156 except TypeError as e: 1157 log.error( 1158 "%s Failed to serialize original MCP tool response dictionary: %s. Returning original response object.", 1159 log_identifier, 1160 e, 1161 ) 1162 return tool_response 1163 needs_truncation_for_llm = original_response_bytes > llm_max_bytes 1164 needs_saving_as_artifact = ( 1165 original_response_bytes > save_threshold 1166 ) or needs_truncation_for_llm 1167 else: 1168 needs_truncation_for_llm = False 1169 needs_saving_as_artifact = True 1170 1171 save_result = None 1172 if needs_saving_as_artifact: 1173 save_result = await save_mcp_response_as_artifact_intelligent( 1174 tool, tool_context, host_component, mcp_response_dict, cleaned_args 1175 ) 1176 if save_result.status == McpSaveStatus.ERROR: 1177 log.warning( 1178 "%s Failed to save artifact: %s. Proceeding without saved artifact details.", 1179 log_identifier, 1180 save_result.message, 1181 ) 1182 1183 final_llm_response_dict: Dict[str, Any] = {} 1184 message_parts_for_llm: list[str] = [] 1185 1186 if needs_truncation_for_llm: 1187 # ALL-OR-NOTHING APPROACH: Do not include truncated data to prevent LLM hallucination. 1188 # When LLMs receive partial data, they tend to confidently fill in gaps with 1189 # hallucinated information. By withholding partial data entirely, we force the LLM 1190 # to use reliable mechanisms (template_liquid, load_artifact) to access the full data. 1191 final_llm_response_dict["mcp_tool_output"] = { 1192 "type": "data_in_artifact_only", 1193 "message": "Data exceeds size limit. Full data saved as artifact - use template_liquid, load_artifact or other artifact analysis tools to process and access.", 1194 } 1195 message_parts_for_llm.append( 1196 f"The response from tool '{tool.name}' was too large ({original_response_bytes} bytes) to display directly. " 1197 "The data has NOT been included here to prevent incomplete information. " 1198 "You MUST use template_liquid (for displaying to users) or load_artifact or other " 1199 "artifact analysis tools (for processing) to access the full data." 1200 ) 1201 log.debug( 1202 "%s MCP tool output withheld from LLM (all-or-nothing approach).", 1203 log_identifier, 1204 ) 1205 1206 if needs_saving_as_artifact: 1207 if save_result and save_result.status in [ 1208 McpSaveStatus.SUCCESS, 1209 McpSaveStatus.PARTIAL_SUCCESS, 1210 ]: 1211 final_llm_response_dict["saved_mcp_response_artifact_details"] = ( 1212 save_result.model_dump(exclude_none=True) 1213 ) 1214 1215 total_artifacts = len(save_result.artifacts_saved) 1216 if total_artifacts > 0: 1217 first_artifact = save_result.artifacts_saved[0] 1218 filename = first_artifact.data_filename 1219 version = first_artifact.data_version 1220 if total_artifacts > 1: 1221 artifact_msg = f"The full response has been saved as {total_artifacts} artifacts, starting with '{filename}' (version {version})." 1222 else: 1223 artifact_msg = f"The full response has been saved as artifact '{filename}' (version {version})." 1224 1225 # When data was too large and truncated, provide explicit guidance 1226 if needs_truncation_for_llm: 1227 artifact_msg += ( 1228 f' To display this data to the user, use template_liquid with data="{filename}". ' 1229 f'To process the data yourself, use load_artifact("{filename}").' 1230 ) 1231 message_parts_for_llm.append(artifact_msg) 1232 elif save_result.fallback_artifact: 1233 filename = save_result.fallback_artifact.data_filename 1234 version = save_result.fallback_artifact.data_version 1235 artifact_msg = f"The full response has been saved as artifact '{filename}' (version {version})." 1236 if needs_truncation_for_llm: 1237 artifact_msg += ( 1238 f' To display this data to the user, use template_liquid with data="{filename}". ' 1239 f'To process the data yourself, use load_artifact("{filename}").' 1240 ) 1241 message_parts_for_llm.append(artifact_msg) 1242 1243 log.debug( 1244 "%s Added saved artifact details to LLM response.", log_identifier 1245 ) 1246 else: 1247 message_parts_for_llm.append( 1248 "Saving the full response as an artifact failed." 1249 ) 1250 if save_result: 1251 final_llm_response_dict["saved_mcp_response_artifact_details"] = ( 1252 save_result.model_dump(exclude_none=True) 1253 ) 1254 log.warning( 1255 "%s Artifact save failed, error details included in LLM response.", 1256 log_identifier, 1257 ) 1258 else: 1259 final_llm_response_dict["mcp_tool_output"] = mcp_response_dict 1260 1261 if needs_saving_as_artifact and ( 1262 save_result 1263 and save_result.status in [McpSaveStatus.SUCCESS, McpSaveStatus.PARTIAL_SUCCESS] 1264 ): 1265 if needs_truncation_for_llm: 1266 # Data was too large - withheld from LLM, only available via artifact 1267 final_llm_response_dict["status"] = "processed_saved_artifact_only" 1268 else: 1269 final_llm_response_dict["status"] = "processed_and_saved" 1270 elif needs_saving_as_artifact: 1271 if needs_truncation_for_llm: 1272 final_llm_response_dict["status"] = "processed_artifact_only_save_failed" 1273 else: 1274 final_llm_response_dict["status"] = "processed_save_failed" 1275 elif needs_truncation_for_llm: 1276 # This case shouldn't happen (truncation implies saving), but handle it 1277 final_llm_response_dict["status"] = "processed_data_withheld" 1278 else: 1279 final_llm_response_dict["status"] = "processed" 1280 1281 if not message_parts_for_llm: 1282 message_parts_for_llm.append(f"Response from tool '{tool.name}' processed.") 1283 final_llm_response_dict["message_to_llm"] = " ".join(message_parts_for_llm) 1284 1285 log.info( 1286 "%s Returning processed response for LLM. Final status: %s", 1287 log_identifier, 1288 final_llm_response_dict.get("status", "unknown"), 1289 ) 1290 return final_llm_response_dict 1291 1292 1293 def _generate_fenced_block_syntax_rules() -> str: 1294 """Generates the shared syntax rules for all fenced blocks.""" 1295 open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN 1296 close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE 1297 return f""" 1298 **Fenced Block Syntax Rules (Applies to `save_artifact` and `template_liquid`):** 1299 To create content blocks, you MUST use the EXACT syntax shown below. 1300 1301 **EXACT SYNTAX (copy this pattern exactly):** 1302 {open_delim}keyword: parameter="value" ... 1303 The content for the block goes here. 1304 It can span multiple lines. 1305 {close_delim} 1306 1307 **CRITICAL FORMATTING RULES:** 1308 1. The opening delimiter MUST be EXACTLY `{open_delim}`. 1309 2. Immediately after the delimiter, write the keyword (`save_artifact` or `template_liquid`) followed by a colon, with NO space before the colon (e.g., `{open_delim}save_artifact:`). 1310 3. All parameters (like `filename`, `data`, `mime_type`) must be on the SAME line as the opening delimiter. 1311 4. All parameter values **MUST** be enclosed in double quotes (e.g., `filename="example.txt"`). 1312 5. You **MUST NOT** use double quotes `"` inside parameter values. Use single quotes or rephrase instead. 1313 6. The block's content begins on the line immediately following the parameters. 1314 7. Close the block with EXACTLY `{close_delim}` (three angle brackets) on its own line. 1315 8. Do NOT surround the block with triple backticks (```). The `{open_delim}` and `{close_delim}` delimiters are sufficient. 1316 1317 **COMMON ERRORS TO AVOID:** 1318 ❌ WRONG: `{open_delim[0:1]}template_liquid:` (only 1 angle brackets) 1319 ❌ WRONG: `{open_delim[0:2]}save_artifact:` (only 2 angle brackets) 1320 ❌ WRONG: `{open_delim}save_artifact` (missing colon) 1321 ✅ CORRECT: `{open_delim}save_artifact: filename="test.txt" mime_type="text/plain"` 1322 """ 1323 1324 1325 def _generate_fenced_artifact_instruction() -> str: 1326 """Generates the instruction text for using fenced artifact blocks.""" 1327 open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN 1328 return f"""\ 1329 **Creating Text-Based Artifacts (`{open_delim}save_artifact: ...`):** 1330 1331 When to Create Artifacts: 1332 Create an artifact when the content provides value as a standalone file, such as: 1333 - Content with special formatting (HTML, Markdown, CSS). 1334 - Documents intended for use outside the conversation (reports, emails). 1335 - Structured reference content (schedules, guides, templates). 1336 - Substantial text documents or technical documentation. 1337 1338 When NOT to Create Artifacts: 1339 - Simple answers, explanations, or conversational responses. 1340 - Brief advice, opinions, or short lists. 1341 1342 Behavior of Created Artifacts: 1343 - They are sent to the user as an interactive file component. 1344 - The user can see the content, so there is no need to return or embed it again. 1345 1346 Parameters for `{open_delim}save_artifact: ...`: 1347 - `filename="your_filename.ext"` (REQUIRED) 1348 - `mime_type="text/plain"` (optional, defaults to text/plain) 1349 - `description="A brief description."` (optional) 1350 - `tags="tag1,tag2"` (optional, comma-separated list of tags for categorization) 1351 1352 Tagging Working Files: 1353 Add `tags="{ARTIFACT_TAG_WORKING}"` to artifacts that are intermediate or internal (e.g., scratch data, temp files, intermediate results used as input to further processing). These are hidden from the user's file list by default. Do NOT tag artifacts that are the final deliverable for the user. 1354 1355 The system will automatically save the content and confirm it in the next turn. 1356 """ 1357 1358 1359 def _generate_inline_template_instruction() -> str: 1360 """Generates the instruction text for using inline Liquid templates.""" 1361 open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN 1362 close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE 1363 return f"""\ 1364 **Inline Liquid Templates (`{open_delim}template_liquid: ...`):** 1365 1366 Use inline Liquid templates to dynamically render data from artifacts for user-friendly display. This is faster and more accurate than reading the artifact and reformatting it yourself. 1367 1368 IMPORTANT: Template Format 1369 - Templates use Liquid template syntax (same as Shopify templates - NOTE that Jekyll extensions are NOT supported). 1370 1371 When to Use Inline Templates: 1372 - Formatting CSV, JSON, or YAML data into tables or lists. 1373 - Applying simple transformations (filtering, limiting rows). 1374 1375 Parameters for `{open_delim}template_liquid: ...`: 1376 - `data="filename.ext"` (REQUIRED): The data artifact to render. Can include version: `data="file.csv:2"`. 1377 - `jsonpath="$.expression"` (optional): JSONPath to extract a subset of JSON/YAML data. 1378 - `limit="N"` (optional): Limit to the first N rows (CSV) or items (JSON/YAML arrays). 1379 1380 Data Context for Liquid Templates: 1381 - CSV data: Available as `headers` (array of column names) and `data_rows` (array of row arrays). 1382 - JSON/YAML arrays: Available as `items`. 1383 - JSON/YAML objects: Keys are directly available (e.g., `name`, `email`). 1384 1385 Example - CSV Table: 1386 {open_delim}template_liquid: data="sales_data.csv" limit="5" 1387 | {{% for h in headers %}}{{{{ h }}}} | {{% endfor %}} 1388 |{{% for h in headers %}}---|{{% endfor %}} 1389 {{% for row in data_rows %}}| {{% for cell in row %}}{{{{ cell }}}} | {{% endfor %}}{{% endfor %}} 1390 {close_delim} 1391 1392 **IMPORTANT - Pipe Characters in Markdown Tables:** 1393 Text data may contain "|" characters which will break markdown table rendering by pushing data into wrong columns. For text fields that might contain pipes, use the `replace` filter to escape them: 1394 `{{{{ item.summary | replace: "|", "|" }}}}` 1395 Only apply this to text fields that might contain pipes - numerical columns don't need it. 1396 1397 Negative Examples 1398 Use {{ issues.size }} instead of {{ issues|length }} 1399 Use {{ forloop.index }} instead of {{ loop.index }} (Liquid uses forloop not loop) 1400 Use {{ issue.fields.description | truncate: 200 }} instead of slicing with [:200] 1401 Do not use Jekyll-specific tags or filters (e.g., `{{% assign %}}`, `{{% capture %}}`, `where`, `sort`, `where_exp`, etc.) 1402 1403 The rendered output will appear inline in your response automatically. 1404 1405 **IMPORTANT - No Math Embeds Inside template_liquid:** 1406 Never place math embeds (e.g., `{open_delim}math:...{close_delim}`) inside a `template_liquid` block. Math embeds are resolved at a different stage and will not work correctly within Liquid templates. If you need to perform calculations on data, do the math outside the template_liquid block using separate math embeds, or use Liquid's built-in arithmetic filters (e.g., `| plus:`, `| minus:`, `| times:`, `| divided_by:`). 1407 """ 1408 1409 1410 def _generate_artifact_creation_instruction() -> str: 1411 return """ 1412 **Creating Text-Based Artifacts:** 1413 1414 When to Create Text-based Artifacts: 1415 Create an artifact when the content provides value as a standalone file: 1416 - Content with special formatting (HTML, Markdown, CSS, structured markup) that requires proper rendering 1417 - Content explicitly intended for use outside this conversation (reports, emails, presentations, reference documents) 1418 - Structured reference content users will save or follow (schedules, guides, templates) 1419 - Content that will be edited, expanded, or reused 1420 - Substantial text documents 1421 - Technical documentation meant as reference material 1422 1423 When NOT to Create Text-based Artifacts: 1424 - Simple answers, explanations, or conversational responses 1425 - Brief advice, opinions, or quick information 1426 - Short lists, summaries, or single paragraphs 1427 - Temporary content only relevant to the immediate conversation 1428 - Basic explanations that don't require reference material 1429 """ 1430 1431 1432 def _generate_examples_instruction() -> str: 1433 open_delim = ARTIFACT_BLOCK_DELIMITER_OPEN 1434 close_delim = ARTIFACT_BLOCK_DELIMITER_CLOSE 1435 embed_open_delim = EMBED_DELIMITER_OPEN 1436 embed_close_delim = EMBED_DELIMITER_CLOSE 1437 1438 return ( 1439 f"""\ 1440 Example 1: 1441 - User: "Create a markdown file with your two csv files as tables." 1442 <note>There are two csv files already uploaded: data1.csv and data2.csv</note> 1443 - OrchestratorAgent: 1444 {embed_open_delim}status_update:Creating Markdown tables from CSV files...{embed_close_delim} 1445 {open_delim}save_artifact: filename="data_tables.md" mime_type="text/markdown" description="Markdown tables from CSV files" 1446 # Data Tables 1447 ## Data 1 1448 {open_delim}template_liquid: data="data1.csv" 1449 """ 1450 + """| {% for h in headers %}{{ h }} | {% endfor %} 1451 |{% for h in headers %}---|{% endfor %} 1452 {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %} 1453 """ 1454 + f"""{close_delim} 1455 ## Data 2 1456 {open_delim}template_liquid: data="data2.csv" 1457 """ 1458 + """| {% for h in headers %}{{ h }} | {% endfor %} 1459 |{% for h in headers %}---|{% endfor %} 1460 {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %} 1461 """ 1462 + f"""{close_delim} 1463 {close_delim} 1464 Example 2: 1465 - User: "Create a text file with the result of sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680)." 1466 - OrchestratorAgent: 1467 {embed_open_delim}status_update:Calculating and creating text file...{embed_close_delim} 1468 {open_delim}save_artifact: filename="math.txt" mime_type="text/plain" description="Result of sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680)" 1469 result = {embed_open_delim}math: sqrt(12345) + sqrt(67890) + sqrt(13579) + sqrt(24680) | .2f{embed_close_delim} 1470 {close_delim} 1471 1472 Example 3: 1473 - User: "Show me the first 10 entries from data1.csv" 1474 - OrchestratorAgent: 1475 {embed_open_delim}status_update:Loading CSV data...{embed_close_delim} 1476 {open_delim}template_liquid: data="data1.csv" limit="10" 1477 """ 1478 + """| {% for h in headers %}{{ h }} | {% endfor %} 1479 |{% for h in headers %}---|{% endfor %} 1480 {% for row in data_rows %}| {% for cell in row %}{{ cell }} | {% endfor %}{% endfor %} 1481 """ 1482 + f"""{close_delim} 1483 1484 Example 4: 1485 - User: "Show me the Jira issues as a table" 1486 <note>There is a JSON artifact jira_issues.json with items containing key, summary, status, type, assignee, updated fields</note> 1487 - OrchestratorAgent: 1488 {embed_open_delim}status_update:Rendering Jira issues table...{embed_close_delim} 1489 {open_delim}template_liquid: data="jira_issues.json" limit="10" 1490 """ 1491 + """| Key | Summary | Status | Type | Assignee | Updated | 1492 |-----|---------|--------|------|----------|---------| 1493 {% for item in items %}| [{{ item.key }}](https://jira.example.com/browse/{{ item.key }}) | {{ item.summary | replace: "|", "|" }} | {{ item.status }} | {{ item.type }} | {{ item.assignee }} | {{ item.updated }} | 1494 {% endfor %}""" 1495 + f""" 1496 {close_delim} 1497 <note>The replace filter on item.summary escapes any pipe characters that would break the markdown table. Only apply to text fields that might contain pipes.</note> 1498 1499 Example 5: 1500 - User: "Search the database for all orders from last month" 1501 - OrchestratorAgent: 1502 {embed_open_delim}status_update:Querying order database...{embed_close_delim} 1503 [calls search_database tool with no visible text] 1504 [After getting results:] 1505 Found 247 orders from last month totaling $45,231. 1506 1507 Example 6: 1508 - User: "Create an HTML with the chart image you just generated with the customer data." 1509 - OrchestratorAgent: 1510 {embed_open_delim}status_update:Generating HTML report with chart...{embed_close_delim} 1511 {open_delim}save_artifact: filename="customer_analysis.html" mime_type="text/html" description="Interactive customer analysis dashboard" 1512 <!DOCTYPE html> 1513 <html> 1514 <head> 1515 <title>Customer Chart - {embed_open_delim}datetime:%Y-%m-%d{embed_close_delim}</title> 1516 """ 1517 + """ 1518 <style> 1519 body { font-family: Arial, sans-serif; margin: 20px; } 1520 .metric { background: #f0f0f0; padding: 10px; margin: 10px 0; } 1521 img { max-width: 100%; height: auto; } 1522 """ 1523 + f""" </style> 1524 </head> 1525 <body> 1526 <h1>Customer Analysis Report</h1> 1527 <p>Generated: {embed_open_delim}datetime:iso{embed_close_delim}</p> 1528 1529 <h2>Customer Distribution Chart</h2> 1530 <img src="{embed_open_delim}artifact_content:customer_chart.png >>> format:datauri{embed_close_delim}" alt="Customer Distribution"> 1531 1532 </body> 1533 </html> 1534 {close_delim} 1535 1536 """ 1537 ) 1538 1539 1540 def _generate_embed_instruction( 1541 include_artifact_content: bool, 1542 log_identifier: str, 1543 suppress_artifact_return: bool = False, 1544 ) -> Optional[str]: 1545 """Generates the instruction text for using embeds. 1546 1547 Args: 1548 include_artifact_content: Whether to include artifact_content embed instructions. 1549 log_identifier: Logging prefix. 1550 suppress_artifact_return: If True, omit artifact_return directives (used in SI mode 1551 where only the result embed matters). 1552 """ 1553 open_delim = EMBED_DELIMITER_OPEN 1554 close_delim = EMBED_DELIMITER_CLOSE 1555 chain_delim = EMBED_CHAIN_DELIMITER 1556 early_types = "`math`, `datetime`, `uuid`, `artifact_meta`" 1557 1558 modifier_list = MODIFIER_IMPLEMENTATIONS.keys() 1559 # Remove apply_to_template from the modifier list as it's been deprecated 1560 if "apply_to_template" in modifier_list: 1561 modifier_list = list(modifier_list) 1562 modifier_list.remove("apply_to_template") 1563 modifier_list = ", ".join([f"`{prefix}`" for prefix in modifier_list]) 1564 1565 base_instruction = f"""\ 1566 **Using Dynamic Embeds in Responses:** 1567 1568 You can use dynamic embeds in your text responses and tool parameters using the syntax {open_delim}type:expression {chain_delim} format{close_delim}. NOTE that this differs from 'save_artifact', which has different delimiters. This allows you to 1569 always have correct information in your output. Specifically, make sure you always use embeds for math, even if it is simple. You will make mistakes if you try to do math yourself. 1570 Use HTML entities to escape the delimiters. 1571 This host resolves the following embed types *early* (before sending to the LLM or tool): {early_types}. This means the embed is replaced with its resolved value. 1572 - `{open_delim}math:expression | .2f{close_delim}`: Evaluates the math expression using asteval - this must just be plain math (plus random(), randint() and uniform()), don't import anything. Optional format specifier follows Python's format(). Use this for all math calculations rather than doing it yourself. Don't give approximations. 1573 - `{open_delim}datetime:format_or_keyword{close_delim}`: Inserts current date/time. Use Python strftime format (e.g., `%Y-%m-%d`) or keywords (`iso`, `timestamp`, `date`, `time`, `now`). 1574 - `{open_delim}uuid:{close_delim}`: Inserts a random UUID. 1575 - `{open_delim}artifact_meta:filename[:version]{close_delim}`: Inserts a summary of the artifact's metadata (latest version if unspecified). 1576 - `{open_delim}status_update:Your message here{close_delim}`: Generates an immediate, distinct status message event that is displayed to the user (e.g., 'Thinking...', 'Searching database...'). This message appears in a status area, not as part of the main chat conversation. Use this to provide interim feedback during processing. 1577 1578 Examples: 1579 - `{open_delim}status_update:Analyzing data...{close_delim}` (Shows 'Analyzing data...' as a status update) 1580 - `The result of 23.5 * 4.2 is {open_delim}math:23.5 * 4.2 | .2f{close_delim}` (Embeds calculated result with 2 decimal places) 1581 """ 1582 1583 if not suppress_artifact_return: 1584 base_instruction += f"""\ 1585 The following embeds are resolved *late* (by the gateway before final display): 1586 - `{open_delim}artifact_return:filename[:version]{close_delim}`: Attaches an artifact to your message so the user receives the file. The embed itself is removed from the text. 1587 1588 **CRITICAL - Returning Artifacts to Users:** 1589 Only artifacts created with the `{open_delim}save_artifact:...{close_delim}` fenced block syntax are automatically sent to the user. 1590 1591 **You MUST use artifact_return for:** 1592 - Artifacts created by tools (e.g., image generation, chart creation, file conversion) 1593 - Artifacts created by other agents you called 1594 - Artifacts from MCP servers 1595 1596 **When deciding whether to return an artifact:** 1597 - Return artifacts the user explicitly requested or that answer their question 1598 - Return final outputs (charts, reports, images, documents) 1599 - Do NOT return intermediate/temporary artifacts (e.g., temp files, internal data) 1600 1601 **Example - Tool creates an image:** 1602 User: "Create a chart of sales data" 1603 [You call a charting tool that creates sales_chart.png] 1604 Your response: "Here's the sales chart. {open_delim}artifact_return:sales_chart.png{close_delim}" 1605 1606 **Example - Agent creates a report:** 1607 User: "Generate a quarterly report" 1608 [You call ReportAgent which creates quarterly_report.pdf] 1609 Your response: "The quarterly report is ready. {open_delim}artifact_return:quarterly_report.pdf{close_delim}" 1610 """ 1611 1612 artifact_content_instruction = f""" 1613 - `{open_delim}artifact_content:filename[:version] {chain_delim} modifier1:value1 {chain_delim} ... {chain_delim} format:output_format{close_delim}`: Embeds artifact content after applying a chain of modifiers. This is resolved *late* (typically by a gateway before final display). 1614 - If this embed resolves to binary content (like an image), it will be automatically converted into an attached file, similar to `artifact_return`. 1615 - Use `{chain_delim}` to separate the artifact identifier from the modifier steps and the final format step. 1616 - Available modifiers: {modifier_list}. 1617 - The `format:output_format` step *must* be the last step in the chain. Supported formats include `text`, `datauri`, `json`, `json_pretty`, `csv`. Formatting as datauri, will include the data URI prefix, so do not add it yourself. 1618 - Use `artifact_meta` first to check size; embedding large files may fail. 1619 - Efficient workflows for large artifacts: 1620 - To extract specific line ranges: `load_artifact(filename, version, include_line_numbers=True)` to identify lines, then use `slice_lines:start:end` modifier to extract that range. 1621 - To fill templates with many placeholders: use `artifact_search_and_replace_regex` with `replacements` array (single atomic operation instead of multiple calls). 1622 - Line numbers are display-only; `slice_lines` always operates on original content. 1623 - Examples: 1624 - `<img src="{open_delim}artifact_content:image.png {chain_delim} format:datauri{close_delim}`"> (Embed image as data URI - NOTE that this includes the datauri prefix. Do not add it yourself.) 1625 - `{open_delim}artifact_content:data.json {chain_delim} jsonpath:$.items[*] {chain_delim} select_fields:name,status {chain_delim} format:json_pretty{close_delim}` (Extract and format JSON fields) 1626 - `{open_delim}artifact_content:logs.txt {chain_delim} grep:ERROR {chain_delim} head:10 {chain_delim} format:text{close_delim}` (Get first 10 error lines) 1627 - `{open_delim}artifact_content:config.json {chain_delim} jsonpath:$.userPreferences.theme {chain_delim} format:text{close_delim}` (Extract a single value from a JSON artifact) 1628 - `{open_delim}artifact_content:server.log {chain_delim} tail:100 {chain_delim} grep:WARN {chain_delim} format:text{close_delim}` (Get warning lines from the last 100 lines of a log file) 1629 - `{open_delim}artifact_content:template.html {chain_delim} slice_lines:10:50 {chain_delim} format:text{close_delim}` (Extract lines 10-50 from a large file) 1630 - `<img src="{open_delim}artifact_content:diagram.png {chain_delim} format:datauri{close_delim}`"> (Embed an PNG diagram as a data URI)` 1631 """ 1632 1633 final_instruction = base_instruction 1634 if include_artifact_content: 1635 final_instruction += artifact_content_instruction 1636 1637 final_instruction += f""" 1638 Ensure the syntax is exactly `{open_delim}type:expression{close_delim}` or `{open_delim}type:expression {chain_delim} ... {chain_delim} format:output_format{close_delim}` with no extra spaces around delimiters (`{open_delim}`, `{close_delim}`, `{chain_delim}`, `:`, `|`). Malformed directives will be ignored.""" 1639 1640 return final_instruction 1641 1642 1643 def _generate_conversation_flow_instruction() -> str: 1644 """Generates instruction text for conversation flow and response formatting.""" 1645 open_delim = EMBED_DELIMITER_OPEN 1646 close_delim = EMBED_DELIMITER_CLOSE 1647 return f"""\ 1648 **Conversation Flow and Response Formatting:** 1649 1650 **CRITICAL: Minimize Narration - Maximize Results** 1651 1652 You do NOT need to produce visible text on every turn. Many turns should contain ONLY status updates and tool calls, with NO visible text at all. 1653 Only produce visible text when you have actual results, answers, or insights to share with the user. 1654 1655 Response Content Rules: 1656 1. Visible responses should contain ONLY: 1657 - Direct answers to the user's question 1658 - Analysis and insights derived from tool results 1659 - Final results and data 1660 - Follow-up questions when needed 1661 - Plans for complex multi-step tasks 1662 1663 2. DO NOT include visible text for: 1664 - Process narration ("Let me...", "I'll...", "Now I will...") 1665 - Acknowledgments of tool calls ("I'm calling...", "Searching...") 1666 - Descriptions of what you're about to do 1667 - Play-by-play commentary on your actions 1668 - Transitional phrases between tool calls 1669 1670 3. **MANDATORY: Emit a status_update embed BEFORE EVERY tool call.** This is required — the user sees a progress timeline and needs to know what is happening. The status text must be: 1671 - Objective and user-centric (describe what is happening, not what tool is being called) 1672 - Contextual (include the specific topic, query, or subject being worked on) 1673 - Concise (one short sentence, no more than 80 characters) 1674 - NEVER expose internal tool names, agent names, or implementation details 1675 1676 Good status_update examples: 1677 - "{open_delim}status_update:Searching for current TSLA stock price...{close_delim}" 1678 - "{open_delim}status_update:Looking up weather in Ottawa...{close_delim}" 1679 - "{open_delim}status_update:Analyzing the quarterly sales data...{close_delim}" 1680 - "{open_delim}status_update:Creating a summary report...{close_delim}" 1681 - "{open_delim}status_update:Checking available flight options...{close_delim}" 1682 1683 Bad status_update examples (DO NOT use these patterns): 1684 - "{open_delim}status_update:Calling web_search_google tool...{close_delim}" (exposes tool name) 1685 - "{open_delim}status_update:Delegating to ResearchAgent...{close_delim}" (exposes agent name) 1686 - "{open_delim}status_update:Processing...{close_delim}" (too vague, no context) 1687 - "{open_delim}status_update:Using peer_ResearchAgent...{close_delim}" (exposes internal name) 1688 1689 4. NEVER mix process narration with status updates - if you use a status_update embed, do NOT repeat that information in visible text. 1690 1691 5. When delegating to another agent (peer_ tools), still emit a status_update describing what you're asking them to do, NOT which agent you're calling. 1692 1693 Examples: 1694 1695 **Excellent (status update before tool call, no visible text):** 1696 "{open_delim}status_update:Searching for current TSLA stock price...{close_delim}" [then calls web_search_google tool, no visible text] 1697 1698 **Excellent (status update before peer delegation):** 1699 "{open_delim}status_update:Researching Tesla stock performance...{close_delim}" [then calls peer_ResearchAgent, no visible text] 1700 1701 **Good (visible text only contains results):** 1702 "{open_delim}status_update:Analyzing Q4 sales data...{close_delim}" [calls tool] 1703 "Sales increased 23% in Q4, driven primarily by enterprise accounts." 1704 1705 **Bad (no status_update before tool call):** 1706 [calls tool directly without any status_update embed] 1707 1708 **Bad (unnecessary narration instead of status_update):** 1709 "Let me retrieve the sales data for you." [then calls tool] 1710 1711 **Bad (exposes internal names):** 1712 "{open_delim}status_update:Calling web_search_google...{close_delim}" [then calls tool] 1713 1714 Remember: The user sees a progress timeline. Every tool call MUST be preceded by a contextual status_update embed. Never expose tool or agent names in status updates. 1715 """ 1716 1717 1718 def _generate_tool_instructions_from_registry( 1719 active_tools: List[BuiltinTool], 1720 log_identifier: str, 1721 ) -> str: 1722 """Generates instruction text from a list of BuiltinTool definitions.""" 1723 if not active_tools: 1724 return "" 1725 1726 instructions_by_category = defaultdict(list) 1727 for tool in sorted(active_tools, key=lambda t: (t.category, t.name)): 1728 # Skip internal tools (those starting with underscore) 1729 if tool.name.startswith("_"): 1730 continue 1731 1732 param_parts = [] 1733 if tool.parameters and tool.parameters.properties: 1734 for name, schema in tool.parameters.properties.items(): 1735 is_optional = name not in (tool.parameters.required or []) 1736 type_name = "any" 1737 if schema and hasattr(schema, "type") and schema.type: 1738 type_name = schema.type.name.lower() 1739 1740 param_str = f"{name}: {type_name}" 1741 if is_optional: 1742 param_str = f"Optional[{param_str}]" 1743 param_parts.append(param_str) 1744 1745 signature = f"`{tool.name}({', '.join(param_parts)})`" 1746 description = tool.description or "No description available." 1747 1748 instructions_by_category[tool.category].append(f"- {signature}: {description}") 1749 1750 full_instruction_list = [] 1751 for category, tool_instructions in sorted(instructions_by_category.items()): 1752 category_display_name = category.replace("_", " ").title() 1753 full_instruction_list.append( 1754 f"You have access to the following '{category_display_name}' tools:" 1755 ) 1756 full_instruction_list.extend(tool_instructions) 1757 1758 return "\n".join(full_instruction_list) 1759 1760 1761 def inject_dynamic_instructions_callback( 1762 callback_context: CallbackContext, 1763 llm_request: LlmRequest, 1764 host_component: "SamAgentComponent", 1765 active_builtin_tools: List[BuiltinTool], 1766 ) -> Optional[LlmResponse]: 1767 """ 1768 ADK before_model_callback to inject instructions based on host config. 1769 Modifies the llm_request directly. 1770 """ 1771 log_identifier = "[Callback:InjectInstructions]" 1772 log.debug("%s Running instruction injection callback...", log_identifier) 1773 1774 session = get_session_from_callback_context(callback_context) 1775 session.state["thinking_phase_active"] = False 1776 session.state["thinking_just_yielded"] = False 1777 1778 if not host_component: 1779 log.error( 1780 "%s Host component instance not provided. Cannot inject instructions.", 1781 log_identifier, 1782 ) 1783 return None 1784 1785 injected_instructions = [] 1786 1787 planning_instruction = """\ 1788 Parallel Tool Calling: 1789 The system is capable of calling multiple tools in parallel to speed up processing. Please try to run tools in parallel when they don't depend on each other. This saves money and time, providing faster results to the user. 1790 1791 **Response Formatting - CRITICAL**: 1792 In most cases when calling tools, you should produce NO visible text at all - only status_update embeds and the tool calls themselves. 1793 The user can see a progress timeline showing your status updates, so narrating your actions is redundant and creates noise. 1794 1795 **MANDATORY: You MUST emit a status_update embed BEFORE EVERY tool call.** The user's progress timeline depends on these. Status text must describe what you're doing in user-friendly terms — never expose tool names or agent names. 1796 1797 If you do include visible text: 1798 - It must contain actual results, insights, or answers - NOT process narration 1799 - Do NOT end with a colon (":") before tool calls, as this leaves it hanging 1800 - Prefer ending with a period (".") if you must include visible text 1801 1802 Examples: 1803 - BEST: "{open_delim}status_update:Looking up current TSLA stock price...{close_delim}" [then calls tool, NO visible text] 1804 - BAD: "Let me search for that information." [then calls tool] 1805 - BAD: "Searching for information..." [then calls tool] 1806 - BAD: [calls tool directly without any status_update embed] 1807 1808 **CRITICAL - No Links From Training Data**: 1809 - DO NOT include URLs, links, or markdown links from your training data in responses 1810 - NEVER include markdown links like [text](url) or raw URLs like https://example.com unless they came from a tool result 1811 - If a delegated agent's response contains [[cite:searchN]] citations, those are properly formatted - preserve them exactly 1812 - If a delegated agent's response has no links, do NOT add any links yourself 1813 - The ONLY acceptable links are those returned by tools (web search, deep research, etc.) with proper citation format 1814 - Your role is to coordinate and present results, not to augment them with links from your training data 1815 1816 Embeds in responses from agents: 1817 To be efficient, peer agents may respond with artifact_content in their responses. These will not be resolved until they are sent back to a gateway. If it makes 1818 sense, just carry that embed forward to your response to the user. For example, if you ask for an org chart from another agent and its response contains an embed like 1819 `{open_delim}artifact_content:org_chart.md{close_delim}`, you can just include that embed in your response to the user. The gateway will resolve it and display the org chart. 1820 1821 Similarly, template_liquid blocks in peer agent responses can be carried forward to your response to the user for resolution by the gateway. 1822 1823 When faced with a complex goal or request that involves multiple steps, data retrieval, or artifact summarization to produce a new report or document, you MUST first create a plan. 1824 Simple, direct requests like 'create an image of a dog' or 'write an email to thank my boss' do not require a plan. 1825 1826 If a plan is created: 1827 1. It should be a terse, hierarchical list describing the steps needed, with each checkbox item on its own line. 1828 2. Use '⬜' for pending items, '✅' for completed items, and '❌' for cancelled items. 1829 3. If the plan changes significantly during execution, restate the updated plan. 1830 4. As items are completed, update the plan to check them off. 1831 1832 """ 1833 injected_instructions.append(planning_instruction) 1834 1835 # Inject LLM self-awareness: tell the agent which model it is running as. 1836 _model_config = host_component.get_config("model", {}) 1837 if isinstance(_model_config, dict): 1838 _llm_model_name = _model_config.get("model", "") 1839 elif isinstance(_model_config, str): 1840 _llm_model_name = _model_config 1841 else: 1842 _llm_model_name = "" 1843 if _llm_model_name: 1844 _llm_model_name_display = _llm_model_name.rsplit("/", 1)[-1] 1845 injected_instructions.append( 1846 f"**Your LLM Identity:**\n" 1847 f"You are running as the `{_llm_model_name_display}` language model. " 1848 "If a user asks which AI model or LLM you are, you may truthfully state this." 1849 ) 1850 log.debug("%s Injected LLM self-awareness instruction (model: %s).", log_identifier, _llm_model_name_display) 1851 1852 # Add the consolidated block instructions 1853 injected_instructions.append(_generate_fenced_artifact_instruction()) 1854 injected_instructions.append(_generate_inline_template_instruction()) 1855 injected_instructions.append(_generate_fenced_block_syntax_rules()) 1856 1857 agent_instruction_str: Optional[str] = None 1858 if host_component._agent_system_instruction_callback: 1859 log.debug( 1860 "%s Calling agent-provided system instruction callback.", log_identifier 1861 ) 1862 try: 1863 agent_instruction_str = host_component._agent_system_instruction_callback( 1864 callback_context, llm_request 1865 ) 1866 if agent_instruction_str and isinstance(agent_instruction_str, str): 1867 injected_instructions.append(agent_instruction_str) 1868 log.info( 1869 "%s Injected instructions from agent callback.", log_identifier 1870 ) 1871 elif agent_instruction_str: 1872 log.warning( 1873 "%s Agent instruction callback returned non-string type: %s. Ignoring.", 1874 log_identifier, 1875 type(agent_instruction_str), 1876 ) 1877 except Exception as e_cb: 1878 log.error( 1879 "%s Error in agent-provided system instruction callback: %s. Skipping.", 1880 log_identifier, 1881 e_cb, 1882 ) 1883 if host_component._agent_system_instruction_string: 1884 log.debug( 1885 "%s Using agent-provided static system instruction string.", log_identifier 1886 ) 1887 agent_instruction_str = host_component._agent_system_instruction_string 1888 if agent_instruction_str and isinstance(agent_instruction_str, str): 1889 injected_instructions.append(agent_instruction_str) 1890 log.info("%s Injected static instructions from agent.", log_identifier) 1891 1892 contents = llm_request.contents 1893 if contents: 1894 log.debug("\n\n### LLM Request Contents ###") 1895 for content in contents: 1896 if content.parts: 1897 for part in content.parts: 1898 if part.text: 1899 log.debug("Content part: %s", part.text) 1900 elif part.function_call: 1901 log.debug("Function call: %s", part.function_call.name) 1902 elif part.function_response: 1903 log.debug("Function response: %s", part.function_response) 1904 else: 1905 log.debug("raw: %s", part) 1906 log.debug("### End LLM Request Contents ###\n\n") 1907 1908 if host_component.get_config("enable_embed_resolution", True): 1909 include_artifact_content_instr = host_component.get_config( 1910 "enable_artifact_content_instruction", True 1911 ) 1912 # In structured invocation mode, suppress artifact_return directives 1913 # since only the result embed matters 1914 is_si_mode = False 1915 a2a_context = callback_context.state.get("a2a_context") 1916 if a2a_context: 1917 logical_task_id = a2a_context.get("logical_task_id") 1918 if logical_task_id: 1919 with host_component.active_tasks_lock: 1920 task_context = host_component.active_tasks.get(logical_task_id) 1921 if task_context: 1922 is_si_mode = task_context.get_flag("structured_invocation", False) 1923 instruction = _generate_embed_instruction( 1924 include_artifact_content_instr, log_identifier, 1925 suppress_artifact_return=is_si_mode, 1926 ) 1927 if instruction: 1928 injected_instructions.append(instruction) 1929 log.debug( 1930 "%s Prepared embed instructions (artifact_content included: %s).", 1931 log_identifier, 1932 include_artifact_content_instr, 1933 ) 1934 1935 instruction = _generate_conversation_flow_instruction() 1936 if instruction: 1937 injected_instructions.append(instruction) 1938 log.debug("%s Prepared conversation flow instructions.", log_identifier) 1939 1940 if active_builtin_tools: 1941 instruction = _generate_tool_instructions_from_registry( 1942 active_builtin_tools, log_identifier 1943 ) 1944 if instruction: 1945 injected_instructions.append(instruction) 1946 log.debug( 1947 "%s Prepared instructions for %d active built-in tools.", 1948 log_identifier, 1949 len(active_builtin_tools), 1950 ) 1951 1952 peer_instructions = callback_context.state.get("peer_tool_instructions") 1953 if peer_instructions and isinstance(peer_instructions, str): 1954 injected_instructions.append(peer_instructions) 1955 log.debug( 1956 "%s Injected peer discovery instructions from callback state.", 1957 log_identifier, 1958 ) 1959 1960 project_tool_instructions = callback_context.state.get( 1961 "project_tool_instructions" 1962 ) 1963 if project_tool_instructions and isinstance(project_tool_instructions, str): 1964 injected_instructions.append(project_tool_instructions) 1965 log.debug( 1966 "%s Injected project tool instructions from callback state.", 1967 log_identifier, 1968 ) 1969 1970 # Check for WorkflowAgentTool instances and inject specific instructions 1971 has_workflow_tools = False 1972 if llm_request.tools_dict: 1973 for tool in llm_request.tools_dict.values(): 1974 if isinstance(tool, WorkflowAgentTool): 1975 has_workflow_tools = True 1976 break 1977 1978 if has_workflow_tools: 1979 workflow_instruction = ( 1980 "**Workflow Execution:**\n" 1981 "You have access to workflow tools (prefixed with `workflow_`). These tools represent structured business processes.\n" 1982 "They support two modes of invocation:\n" 1983 "1. **Parameter Mode:** Provide arguments directly matching the tool's schema. Use this for new data or simple inputs.\n" 1984 "2. **Artifact Mode:** Provide a single `input_artifact` argument with the filename of an existing JSON artifact. " 1985 "Use this when passing large datasets or outputs from previous steps to avoid re-tokenizing.\n" 1986 "Do NOT provide both parameters and `input_artifact` simultaneously." 1987 ) 1988 injected_instructions.append(workflow_instruction) 1989 log.debug("%s Injected workflow execution instructions.", log_identifier) 1990 1991 last_call_notification_message_added = False 1992 try: 1993 invocation_context = callback_context._invocation_context 1994 if invocation_context and invocation_context.run_config: 1995 current_llm_calls = ( 1996 invocation_context._invocation_cost_manager._number_of_llm_calls 1997 ) 1998 max_llm_calls = invocation_context.run_config.max_llm_calls 1999 2000 log.debug( 2001 "%s Checking for last LLM call: current_calls=%d, max_calls=%s", 2002 log_identifier, 2003 current_llm_calls, 2004 max_llm_calls, 2005 ) 2006 2007 if ( 2008 max_llm_calls 2009 and max_llm_calls > 0 2010 and current_llm_calls >= (max_llm_calls - 1) 2011 ): 2012 last_call_text = ( 2013 "IMPORTANT: This is your final allowed interaction for the current request. " 2014 "Please inform the user that to continue this line of inquiry, they will need to " 2015 "make a new request or explicitly ask to continue if the interface supports it. " 2016 "Summarize your current findings and conclude your response." 2017 ) 2018 if llm_request.contents is None: 2019 llm_request.contents = [] 2020 2021 last_call_content = adk_types.Content( 2022 role="model", 2023 parts=[adk_types.Part(text=last_call_text)], 2024 ) 2025 llm_request.contents.append(last_call_content) 2026 last_call_notification_message_added = True 2027 log.info( 2028 "%s Added 'last LLM call' notification as a 'model' message to llm_request.contents. Current calls (%d) reached max_llm_calls (%d).", 2029 log_identifier, 2030 current_llm_calls, 2031 max_llm_calls, 2032 ) 2033 except Exception as e_last_call: 2034 log.error( 2035 "%s Error checking/injecting last LLM call notification message: %s", 2036 log_identifier, 2037 e_last_call, 2038 ) 2039 2040 injected_instructions.append(_generate_examples_instruction()) 2041 2042 if injected_instructions: 2043 combined_instructions = "\n\n---\n\n".join(injected_instructions) 2044 if llm_request.config is None: 2045 log.warning( 2046 "%s llm_request.config is None, cannot append system instructions.", 2047 log_identifier, 2048 ) 2049 else: 2050 if llm_request.config.system_instruction is None: 2051 llm_request.config.system_instruction = "" 2052 2053 if llm_request.config.system_instruction: 2054 llm_request.config.system_instruction += ( 2055 "\n\n---\n\n" + combined_instructions 2056 ) 2057 else: 2058 llm_request.config.system_instruction = combined_instructions 2059 log.info( 2060 "%s Injected %d dynamic instruction block(s) into llm_request.config.system_instruction.", 2061 log_identifier, 2062 len(injected_instructions), 2063 ) 2064 elif not last_call_notification_message_added: 2065 log.debug( 2066 "%s No dynamic instructions (system or last_call message) were injected based on config.", 2067 log_identifier, 2068 ) 2069 2070 return None 2071 2072 2073 async def after_tool_callback_inject_metadata( 2074 tool: BaseTool, 2075 args: Dict, 2076 tool_context: ToolContext, 2077 tool_response: Dict, 2078 host_component: "SamAgentComponent", 2079 ) -> Optional[Dict]: 2080 """ 2081 ADK after_tool_callback to automatically load and inject metadata for 2082 newly created artifacts into the tool's response dictionary. 2083 """ 2084 log_identifier = f"[Callback:InjectMetadata:{tool.name}]" 2085 log.info( 2086 "%s Starting metadata injection for tool response, type: %s", 2087 log_identifier, 2088 type(tool_response).__name__, 2089 ) 2090 2091 if not host_component: 2092 log.error( 2093 "%s Host component instance not provided. Cannot proceed.", 2094 log_identifier, 2095 ) 2096 return None 2097 2098 if not tool_context.actions.artifact_delta: 2099 log.debug( 2100 "%s No artifact delta found. Skipping metadata injection.", log_identifier 2101 ) 2102 return None 2103 2104 artifact_service: Optional[BaseArtifactService] = ( 2105 tool_context._invocation_context.artifact_service 2106 ) 2107 if not artifact_service: 2108 log.error( 2109 "%s ArtifactService not available. Cannot load metadata.", 2110 log_identifier, 2111 ) 2112 return None 2113 2114 app_name = tool_context._invocation_context.app_name 2115 user_id = tool_context._invocation_context.user_id 2116 session_id = get_original_session_id(tool_context._invocation_context) 2117 2118 metadata_texts = [] 2119 2120 for filename, version in tool_context.actions.artifact_delta.items(): 2121 if filename.endswith(METADATA_SUFFIX): 2122 log.debug( 2123 "%s Skipping metadata artifact '%s' itself.", log_identifier, filename 2124 ) 2125 continue 2126 2127 metadata_filename = f"{filename}{METADATA_SUFFIX}" 2128 log.debug( 2129 "%s Found data artifact '%s' v%d. Attempting to load metadata '%s' v%d.", 2130 log_identifier, 2131 filename, 2132 version, 2133 metadata_filename, 2134 version, 2135 ) 2136 2137 try: 2138 metadata_part = await artifact_service.load_artifact( 2139 app_name=app_name, 2140 user_id=user_id, 2141 session_id=session_id, 2142 filename=metadata_filename, 2143 version=version, 2144 ) 2145 2146 if metadata_part and metadata_part.inline_data: 2147 try: 2148 metadata_dict = json.loads( 2149 metadata_part.inline_data.data.decode("utf-8") 2150 ) 2151 metadata_dict["version"] = version 2152 metadata_dict["filename"] = filename 2153 formatted_text = format_metadata_for_llm(metadata_dict) 2154 metadata_texts.append(formatted_text) 2155 log.info( 2156 "%s Successfully loaded and formatted metadata for '%s' v%d.", 2157 log_identifier, 2158 filename, 2159 version, 2160 ) 2161 except json.JSONDecodeError as json_err: 2162 log.warning( 2163 "%s Failed to parse metadata JSON for '%s' v%d: %s", 2164 log_identifier, 2165 metadata_filename, 2166 version, 2167 json_err, 2168 ) 2169 except Exception as fmt_err: 2170 log.warning( 2171 "%s Failed to format metadata for '%s' v%d: %s", 2172 log_identifier, 2173 metadata_filename, 2174 version, 2175 fmt_err, 2176 ) 2177 else: 2178 log.warning( 2179 "%s Companion metadata artifact '%s' v%d not found or empty.", 2180 log_identifier, 2181 metadata_filename, 2182 version, 2183 ) 2184 2185 except Exception as load_err: 2186 log.error( 2187 "%s Error loading companion metadata artifact '%s' v%d: %s", 2188 log_identifier, 2189 metadata_filename, 2190 version, 2191 load_err, 2192 ) 2193 2194 if metadata_texts: 2195 if not isinstance(tool_response, dict): 2196 log.error( 2197 "%s Tool response is not a dictionary. Cannot inject metadata. Type: %s", 2198 log_identifier, 2199 type(tool_response), 2200 ) 2201 return None 2202 2203 combined_metadata_text = "\n\n".join(metadata_texts) 2204 tool_response[METADATA_RESPONSE_KEY] = combined_metadata_text 2205 log.info( 2206 "%s Injected metadata for %d artifact(s) into tool response key '%s'.", 2207 log_identifier, 2208 len(metadata_texts), 2209 METADATA_RESPONSE_KEY, 2210 ) 2211 return tool_response 2212 else: 2213 log.debug( 2214 "%s No metadata loaded or formatted. Returning original tool response.", 2215 log_identifier, 2216 ) 2217 return None 2218 2219 2220 async def track_produced_artifacts_callback( 2221 tool: BaseTool, 2222 args: Dict, 2223 tool_context: ToolContext, 2224 tool_response: Dict, 2225 host_component: "SamAgentComponent", 2226 ) -> Optional[Dict]: 2227 """ 2228 ADK after_tool_callback to automatically track all artifacts created by a tool. 2229 It inspects the artifact_delta and registers the created artifacts in the 2230 TaskExecutionContext. 2231 """ 2232 log_identifier = f"[Callback:TrackArtifacts:{tool.name}]" 2233 log.debug("%s Starting artifact tracking for tool response.", log_identifier) 2234 2235 if not tool_context.actions.artifact_delta: 2236 log.debug("%s No artifact delta found. Skipping tracking.", log_identifier) 2237 return None 2238 2239 if not host_component: 2240 log.error( 2241 "%s Host component instance not provided. Cannot proceed.", log_identifier 2242 ) 2243 return None 2244 2245 try: 2246 a2a_context = tool_context.state.get("a2a_context", {}) 2247 logical_task_id = a2a_context.get("logical_task_id") 2248 if not logical_task_id: 2249 log.warning( 2250 "%s Could not find logical_task_id in tool_context. Cannot track artifacts.", 2251 log_identifier, 2252 ) 2253 return None 2254 2255 with host_component.active_tasks_lock: 2256 task_context = host_component.active_tasks.get(logical_task_id) 2257 2258 if not task_context: 2259 log.warning( 2260 "%s TaskExecutionContext not found for task %s. Cannot track artifacts.", 2261 log_identifier, 2262 logical_task_id, 2263 ) 2264 return None 2265 2266 for filename, version in tool_context.actions.artifact_delta.items(): 2267 if filename.endswith(METADATA_SUFFIX): 2268 continue 2269 log.info( 2270 "%s Registering produced artifact '%s' v%d for task %s.", 2271 log_identifier, 2272 filename, 2273 version, 2274 logical_task_id, 2275 ) 2276 task_context.register_produced_artifact(filename, version) 2277 2278 except Exception as e: 2279 log.exception( 2280 "%s Error during artifact tracking callback: %s", log_identifier, e 2281 ) 2282 2283 return None 2284 2285 2286 def log_streaming_chunk_callback( 2287 callback_context: CallbackContext, 2288 llm_response: LlmResponse, 2289 host_component: "SamAgentComponent", 2290 ) -> Optional[LlmResponse]: 2291 """ 2292 ADK after_model_callback to log the content of each LLM response chunk 2293 *after* potential modification by other callbacks (like embed resolution). 2294 """ 2295 log_identifier = "[Callback:LogChunk]" 2296 try: 2297 content_str = "None" 2298 is_partial = llm_response.partial 2299 is_final = llm_response.turn_complete 2300 if llm_response.content and llm_response.content.parts: 2301 texts = [p.text for p in llm_response.content.parts if p.text] 2302 content_str = '"' + "".join(texts) + '"' if texts else "[Non-text parts]" 2303 elif llm_response.error_message: 2304 content_str = f"[ERROR: {llm_response.error_message}]" 2305 2306 except Exception as e: 2307 log.error("%s Error logging LLM chunk: %s", log_identifier, e) 2308 2309 return None 2310 2311 2312 def _sanitize_bytes_in_dict_inplace(obj): 2313 """Recursively replace bytes values **in place** in a dict/list with a placeholder string. 2314 2315 Mutates *obj* directly — callers should not rely on a return value. 2316 2317 This is needed because LLM request dumps may contain inline_data with raw 2318 bytes (e.g., from inline vision images) which cannot be JSON-serialized 2319 for publishing over the Solace broker. 2320 """ 2321 if isinstance(obj, dict): 2322 for key, value in obj.items(): 2323 if isinstance(value, (bytes, bytearray)): 2324 obj[key] = f"<binary data: {len(value)} bytes>" 2325 elif isinstance(value, (dict, list)): 2326 _sanitize_bytes_in_dict_inplace(value) 2327 elif isinstance(obj, list): 2328 for i, item in enumerate(obj): 2329 if isinstance(item, (bytes, bytearray)): 2330 obj[i] = f"<binary data: {len(item)} bytes>" 2331 elif isinstance(item, (dict, list)): 2332 _sanitize_bytes_in_dict_inplace(item) 2333 2334 2335 def solace_llm_invocation_callback( 2336 callback_context: CallbackContext, 2337 llm_request: LlmRequest, 2338 host_component: "SamAgentComponent", 2339 ) -> Optional[LlmResponse]: 2340 """ 2341 ADK before_model_callback to send a Solace message when an LLM is invoked, 2342 using the host_component's process_and_publish_adk_event method. 2343 """ 2344 log_identifier = "[Callback:SolaceLLMInvocation]" 2345 log.debug( 2346 "%s Running Solace LLM invocation notification callback...", log_identifier 2347 ) 2348 2349 if not host_component: 2350 log.error( 2351 "%s Host component instance not provided. Cannot send Solace message.", 2352 log_identifier, 2353 ) 2354 return None 2355 2356 callback_context.state[A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY] = False 2357 log.debug( 2358 "%s Reset %s to False.", log_identifier, A2A_LLM_STREAM_CHUNKS_PROCESSED_KEY 2359 ) 2360 2361 try: 2362 a2a_context = callback_context.state.get("a2a_context") 2363 if not a2a_context: 2364 log.error( 2365 "%s a2a_context not found in callback_context.state. Cannot send Solace message.", 2366 log_identifier, 2367 ) 2368 return None 2369 2370 logical_task_id = a2a_context.get("logical_task_id") 2371 context_id = a2a_context.get("contextId") 2372 2373 # Store model name in callback state for later use in response callback 2374 model_name = host_component.model_config 2375 if isinstance(model_name, dict): 2376 model_name = model_name.get("model", "unknown") 2377 callback_context.state["model_name"] = model_name 2378 2379 request_dump = llm_request.model_dump(exclude_none=True) 2380 # Sanitize binary data (e.g., inline_data from images) to make it JSON-serializable. 2381 # The raw bytes cannot be sent over the Solace broker in status events. 2382 _sanitize_bytes_in_dict_inplace(request_dump) 2383 llm_data = LlmInvocationData(request=request_dump) 2384 status_update_event = a2a.create_data_signal_event( 2385 task_id=logical_task_id, 2386 context_id=context_id, 2387 signal_data=llm_data, 2388 agent_name=host_component.agent_name, 2389 ) 2390 2391 loop = host_component.get_async_loop() 2392 if loop and loop.is_running(): 2393 asyncio.run_coroutine_threadsafe( 2394 host_component._publish_status_update_with_buffer_flush( 2395 status_update_event, 2396 a2a_context, 2397 skip_buffer_flush=False, 2398 ), 2399 loop, 2400 ) 2401 log.debug( 2402 "%s Scheduled LLM invocation status update with buffer flush.", 2403 log_identifier, 2404 ) 2405 else: 2406 log.error( 2407 "%s Async loop not available. Cannot publish LLM invocation status update.", 2408 log_identifier, 2409 ) 2410 2411 except Exception as e: 2412 log.error( 2413 "%s Error during Solace LLM invocation notification: %s", log_identifier, e 2414 ) 2415 2416 return None 2417 2418 2419 def solace_llm_response_callback( 2420 callback_context: CallbackContext, 2421 llm_response: LlmResponse, 2422 host_component: "SamAgentComponent", 2423 ) -> Optional[LlmResponse]: 2424 """ 2425 ADK after_model_callback to send a Solace message with the LLM's response 2426 and token usage information. 2427 """ 2428 log_identifier = "[Callback:SolaceLLMResponse]" 2429 if llm_response.partial: # Don't send partial responses for this notification 2430 log.debug("%s Skipping partial response", log_identifier) 2431 return None 2432 2433 if not host_component: 2434 log.error( 2435 "%s Host component instance not provided. Cannot send Solace message.", 2436 log_identifier, 2437 ) 2438 return None 2439 2440 try: 2441 a2a_context = callback_context.state.get("a2a_context") 2442 if not a2a_context: 2443 log.error( 2444 "%s a2a_context not found in callback_context.state. Cannot send Solace message.", 2445 log_identifier, 2446 ) 2447 return None 2448 2449 agent_name = host_component.get_config("agent_name", "unknown_agent") 2450 logical_task_id = a2a_context.get("logical_task_id") 2451 2452 # Check for parallel tool calls - if multiple function_calls in this response, 2453 # generate a parallel_group_id for the frontend to group them visually 2454 function_calls = [] 2455 if llm_response.content and llm_response.content.parts: 2456 function_calls = [ 2457 p for p in llm_response.content.parts if p.function_call 2458 ] 2459 2460 if len(function_calls) > 1: 2461 import uuid 2462 parallel_group_id = f"llm_batch_{uuid.uuid4().hex[:8]}" 2463 callback_context.state["parallel_group_id"] = parallel_group_id 2464 log.debug( 2465 "%s Detected %d parallel tool calls, assigned parallel_group_id=%s", 2466 log_identifier, 2467 len(function_calls), 2468 parallel_group_id, 2469 ) 2470 else: 2471 # Clear any previous parallel_group_id 2472 callback_context.state["parallel_group_id"] = None 2473 2474 llm_response_data = { 2475 "type": "llm_response", 2476 "data": llm_response.model_dump(exclude_none=True), 2477 } 2478 2479 # Extract and record token usage 2480 if llm_response.usage_metadata: 2481 usage = llm_response.usage_metadata 2482 model_name = callback_context.state.get("model_name", "unknown") 2483 2484 usage_dict = { 2485 "input_tokens": usage.prompt_token_count, 2486 "output_tokens": usage.candidates_token_count, 2487 "model": model_name, 2488 } 2489 2490 # Check for cached tokens (provider-specific) 2491 cached_tokens = 0 2492 if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details: 2493 cached_tokens = getattr(usage.prompt_tokens_details, "cached_tokens", 0) 2494 if cached_tokens > 0: 2495 usage_dict["cached_input_tokens"] = cached_tokens 2496 2497 # Add to response data 2498 llm_response_data["usage"] = usage_dict 2499 2500 # Record in task context for aggregation 2501 with host_component.active_tasks_lock: 2502 task_context = host_component.active_tasks.get(logical_task_id) 2503 2504 if task_context: 2505 # Resolve context-window for this model: admin UI value wins, 2506 # LiteLLM registry is fallback, else None. The result is 2507 # persisted on the task record so the gateway's indicator can 2508 # render without cross-service lookups. 2509 max_input_tokens: Optional[int] = None 2510 try: 2511 agent_model = getattr(getattr(host_component, "adk_agent", None), "model", None) 2512 if agent_model is not None and hasattr(agent_model, "get_max_input_tokens"): 2513 max_input_tokens = agent_model.get_max_input_tokens() 2514 if log.isEnabledFor(logging.DEBUG): 2515 # Diagnostic-only path. Config dict keys are filtered 2516 # to exclude auth-adjacent names (api_key, token, 2517 # secret, credential, password) to avoid leaking 2518 # sensitive config shape into logs. 2519 component_model_cfg = host_component.get_config("model", None) 2520 if isinstance(component_model_cfg, dict): 2521 _SENSITIVE = ("key", "token", "secret", "credential", "password", "auth") 2522 cfg_keys = [ 2523 k for k in component_model_cfg.keys() 2524 if not any(s in str(k).lower() for s in _SENSITIVE) 2525 ] 2526 cfg_max = component_model_cfg.get("max_input_tokens") 2527 else: 2528 cfg_keys = type(component_model_cfg).__name__ 2529 cfg_max = None 2530 log.debug( 2531 "%s Resolved max_input_tokens=%s for model=%s (agent_model_type=%s, cfg_keys=%s, cfg_max=%s)", 2532 log_identifier, 2533 max_input_tokens, 2534 model_name, 2535 type(agent_model).__name__ if agent_model else "None", 2536 cfg_keys, 2537 cfg_max, 2538 ) 2539 except Exception: 2540 log.exception("%s Could not resolve max_input_tokens", log_identifier) 2541 task_context.record_token_usage( 2542 input_tokens=usage.prompt_token_count, 2543 output_tokens=usage.candidates_token_count, 2544 model=model_name, 2545 source="agent", 2546 cached_input_tokens=cached_tokens, 2547 max_input_tokens=max_input_tokens, 2548 ) 2549 log.debug( 2550 "%s Recorded token usage: input=%d, output=%d, cached=%d, model=%s", 2551 log_identifier, 2552 usage.prompt_token_count, 2553 usage.candidates_token_count, 2554 cached_tokens, 2555 model_name, 2556 ) 2557 2558 # This signal doesn't have a dedicated Pydantic model, so we create the 2559 # DataPart directly and use the lower-level helpers. 2560 data_part = a2a.create_data_part(data=llm_response_data) 2561 a2a_message = a2a.create_agent_parts_message( 2562 parts=[data_part], 2563 task_id=logical_task_id, 2564 context_id=a2a_context.get("contextId"), 2565 ) 2566 status_update_event = a2a.create_status_update( 2567 task_id=logical_task_id, 2568 context_id=a2a_context.get("contextId"), 2569 message=a2a_message, 2570 is_final=False, 2571 metadata={"agent_name": agent_name}, 2572 ) 2573 loop = host_component.get_async_loop() 2574 if loop and loop.is_running(): 2575 asyncio.run_coroutine_threadsafe( 2576 host_component._publish_status_update_with_buffer_flush( 2577 status_update_event, 2578 a2a_context, 2579 skip_buffer_flush=False, 2580 ), 2581 loop, 2582 ) 2583 log.debug( 2584 "%s Scheduled LLM response status update with buffer flush (final_chunk=%s).", 2585 log_identifier, 2586 llm_response.turn_complete, 2587 ) 2588 else: 2589 log.error( 2590 "%s Async loop not available. Cannot publish LLM response status update.", 2591 log_identifier, 2592 ) 2593 2594 except Exception as e: 2595 log.error( 2596 "%s Error during Solace LLM response notification: %s", log_identifier, e 2597 ) 2598 2599 return None 2600 2601 2602 def notify_tool_invocation_start_callback( 2603 tool: BaseTool, 2604 args: Dict[str, Any], 2605 tool_context: ToolContext, 2606 host_component: "SamAgentComponent", 2607 ) -> None: 2608 """ 2609 ADK before_tool_callback to send an A2A status message indicating 2610 that a tool is about to be invoked. 2611 """ 2612 log_identifier = f"[Callback:NotifyToolInvocationStart:{tool.name}]" 2613 log.debug( 2614 "%s Triggered for tool '%s' with args: %s", log_identifier, tool.name, args 2615 ) 2616 2617 if not host_component: 2618 log.error( 2619 "%s Host component instance not provided. Cannot send notification.", 2620 log_identifier, 2621 ) 2622 return 2623 2624 a2a_context = tool_context.state.get("a2a_context") 2625 if not a2a_context: 2626 log.error( 2627 "%s a2a_context not found in tool_context.state. Cannot send notification.", 2628 log_identifier, 2629 ) 2630 return 2631 2632 try: 2633 serializable_args = {} 2634 for k, v in args.items(): 2635 try: 2636 json.dumps(v) 2637 serializable_args[k] = v 2638 except TypeError: 2639 serializable_args[k] = str(v) 2640 2641 # Get parallel_group_id from callback state if this is part of a parallel batch 2642 parallel_group_id = tool_context.state.get("parallel_group_id") 2643 2644 tool_data = ToolInvocationStartData( 2645 tool_name=tool.name, 2646 tool_args=serializable_args, 2647 function_call_id=tool_context.function_call_id, 2648 parallel_group_id=parallel_group_id, 2649 ) 2650 host_component.publish_data_signal_from_thread( 2651 a2a_context=a2a_context, 2652 signal_data=tool_data, 2653 skip_buffer_flush=False, 2654 log_identifier=log_identifier, 2655 ) 2656 log.debug( 2657 "%s Scheduled tool_invocation_start notification.", 2658 log_identifier, 2659 ) 2660 2661 except Exception as e: 2662 log.exception( 2663 "%s Error publishing tool_invocation_start status update: %s", 2664 log_identifier, 2665 e, 2666 ) 2667 2668 return None 2669 2670 2671 def notify_tool_execution_result_callback( 2672 tool: BaseTool, 2673 args: Dict[str, Any], 2674 tool_context: ToolContext, 2675 tool_response: Any, 2676 host_component: "SamAgentComponent", 2677 ) -> None: 2678 """ 2679 ADK after_tool_callback to send an A2A status message with the result 2680 of a tool's execution. 2681 """ 2682 log_identifier = f"[Callback:NotifyToolResult:{tool.name}]" 2683 log.debug("%s Triggered for tool '%s'", log_identifier, tool.name) 2684 2685 if not host_component: 2686 log.error( 2687 "%s Host component instance not provided. Cannot send notification.", 2688 log_identifier, 2689 ) 2690 return 2691 2692 a2a_context = tool_context.state.get("a2a_context") 2693 if not a2a_context: 2694 log.error( 2695 "%s a2a_context not found in tool_context.state. Cannot send notification.", 2696 log_identifier, 2697 ) 2698 return 2699 2700 if tool.is_long_running and not tool_response: 2701 log.debug( 2702 "%s Tool is long-running and is not yet complete. Don't notify its completion", 2703 log_identifier, 2704 ) 2705 return 2706 2707 try: 2708 # Attempt to make the response JSON serializable 2709 serializable_response = tool_response 2710 if hasattr(tool_response, "model_dump"): 2711 serializable_response = tool_response.model_dump(exclude_none=True) 2712 else: 2713 try: 2714 # A simple check to see if it can be dumped. 2715 # This isn't perfect but catches many non-serializable types. 2716 json.dumps(tool_response) 2717 except (TypeError, OverflowError): 2718 serializable_response = str(tool_response) 2719 2720 tool_data = ToolResultData( 2721 tool_name=tool.name, 2722 result_data=serializable_response, 2723 function_call_id=tool_context.function_call_id, 2724 ) 2725 host_component.publish_data_signal_from_thread( 2726 a2a_context=a2a_context, 2727 signal_data=tool_data, 2728 skip_buffer_flush=False, 2729 log_identifier=log_identifier, 2730 ) 2731 log.debug( 2732 "%s Scheduled tool_result notification for function call ID %s.", 2733 log_identifier, 2734 tool_context.function_call_id, 2735 ) 2736 2737 except Exception as e: 2738 log.exception( 2739 "%s Error publishing tool_result status update: %s", 2740 log_identifier, 2741 e, 2742 ) 2743 2744 return None 2745 2746 2747 def auto_continue_on_max_tokens_callback( 2748 callback_context: CallbackContext, 2749 llm_response: LlmResponse, 2750 host_component: "SamAgentComponent", 2751 ) -> Optional[LlmResponse]: 2752 """ 2753 ADK after_model_callback to automatically continue an LLM response that 2754 was interrupted. This handles two interruption signals: 2755 1. The explicit `llm_response.interrupted` flag from the ADK. 2756 2. An implicit signal where the model itself calls a `_continue` tool. 2757 """ 2758 log_identifier = "[Callback:AutoContinue]" 2759 2760 if not host_component.get_config("enable_auto_continuation", True): 2761 log.debug("%s Auto-continuation is disabled. Skipping.", log_identifier) 2762 return None 2763 2764 # An interruption is signaled by either the explicit flag or an implicit tool call. 2765 was_explicitly_interrupted = llm_response.interrupted 2766 was_implicitly_interrupted = False 2767 if llm_response.content and llm_response.content.parts: 2768 if any( 2769 p.function_call and p.function_call.name == "_continue" 2770 for p in llm_response.content.parts 2771 ): 2772 was_implicitly_interrupted = True 2773 2774 if not was_explicitly_interrupted and not was_implicitly_interrupted: 2775 return None 2776 2777 log.info( 2778 "%s Interruption signal detected (explicit: %s, implicit: %s). Triggering auto-continuation.", 2779 log_identifier, 2780 was_explicitly_interrupted, 2781 was_implicitly_interrupted, 2782 ) 2783 2784 # Get existing parts from the response, but filter out any `_continue` calls 2785 # the model might have added. 2786 existing_parts = [] 2787 if llm_response.content and llm_response.content.parts: 2788 existing_parts = [ 2789 p 2790 for p in llm_response.content.parts 2791 if not (p.function_call and p.function_call.name == "_continue") 2792 ] 2793 if was_implicitly_interrupted: 2794 log.debug( 2795 "%s Removed implicit '_continue' tool call from response parts.", 2796 log_identifier, 2797 ) 2798 2799 continue_tool_call = adk_types.FunctionCall( 2800 name="_continue_generation", 2801 args={}, 2802 id=f"host-continue-{uuid.uuid4()}", 2803 ) 2804 continue_part = adk_types.Part(function_call=continue_tool_call) 2805 2806 all_parts = existing_parts + [continue_part] 2807 2808 # If there was no text content in the interrupted part, add a space to ensure 2809 # the event is not filtered out by history processing logic. 2810 if not any(p.text for p in existing_parts): 2811 all_parts.insert(0, adk_types.Part(text=" ")) 2812 log.debug( 2813 "%s Prepended empty text part to ensure event is preserved.", log_identifier 2814 ) 2815 2816 # Create a new, non-interrupted LlmResponse containing all parts. 2817 # This ensures the partial text is saved to history and the tool call is executed. 2818 hijacked_response = LlmResponse( 2819 content=adk_types.Content(role="model", parts=all_parts), 2820 partial=False, 2821 custom_metadata={ 2822 "was_interrupted": True, 2823 }, 2824 ) 2825 2826 return hijacked_response 2827 2828 2829 def preregister_long_running_tools_callback( 2830 callback_context: CallbackContext, 2831 llm_response: LlmResponse, 2832 host_component: "SamAgentComponent", 2833 ) -> Optional[LlmResponse]: 2834 """ 2835 ADK after_model_callback to pre-register all long-running tool calls 2836 before any tool execution begins. This prevents race conditions where 2837 one tool completes before another has registered. 2838 2839 The race condition occurs because tools are executed via asyncio.gather 2840 (non-deterministic order) and each tool calls register_parallel_call_sent() 2841 inside its run_async(). If Tool A completes before Tool B even registers, 2842 the system thinks all calls are done (completed=1, total=1). 2843 2844 By pre-registering all long-running tools in this callback (which runs 2845 BEFORE tool execution), we ensure the total count is set correctly upfront. 2846 """ 2847 log_identifier = "[Callback:PreregisterLongRunning]" 2848 2849 # Only process non-partial responses with function calls 2850 if llm_response.partial: 2851 return None 2852 2853 if not llm_response.content or not llm_response.content.parts: 2854 return None 2855 2856 # Find all long-running tool calls (identified by peer_ or workflow_ prefix) 2857 long_running_calls = [] 2858 for part in llm_response.content.parts: 2859 if part.function_call: 2860 tool_name = part.function_call.name 2861 if tool_name.startswith(PEER_TOOL_PREFIX) or tool_name.startswith(WORKFLOW_TOOL_PREFIX): 2862 long_running_calls.append(part.function_call) 2863 2864 if not long_running_calls: 2865 return None 2866 2867 # Get task context 2868 a2a_context = callback_context.state.get("a2a_context") 2869 if not a2a_context: 2870 log.warning("%s No a2a_context, cannot pre-register tools", log_identifier) 2871 return None 2872 2873 logical_task_id = a2a_context.get("logical_task_id") 2874 invocation_id = callback_context._invocation_context.invocation_id 2875 2876 with host_component.active_tasks_lock: 2877 task_context = host_component.active_tasks.get(logical_task_id) 2878 2879 if not task_context: 2880 log.warning( 2881 "%s TaskContext not found for %s, cannot pre-register", 2882 log_identifier, 2883 logical_task_id, 2884 ) 2885 return None 2886 2887 # Pre-register ALL long-running calls atomically 2888 for fc in long_running_calls: 2889 task_context.register_parallel_call_sent(invocation_id) 2890 2891 log.info( 2892 "%s Pre-registered %d long-running tool call(s) for invocation %s (task %s)", 2893 log_identifier, 2894 len(long_running_calls), 2895 invocation_id, 2896 logical_task_id, 2897 ) 2898 2899 return None # Don't alter the response 2900 2901 2902 # ============================================================================ 2903 # Tool Name Sanitization Callback 2904 # ============================================================================ 2905 2906 # Bedrock tool name validation pattern: must start with letter or underscore, then alphanumeric/underscore/hyphen 2907 # Note: We allow underscore prefix for internal tools like _notify_artifact_save 2908 VALID_TOOL_NAME_PATTERN = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_-]*$') 2909 2910 2911 def sanitize_tool_names_callback( 2912 callback_context: CallbackContext, 2913 llm_response: LlmResponse, 2914 host_component: "SamAgentComponent", 2915 ) -> Optional[LlmResponse]: 2916 """ 2917 ADK after_model_callback to sanitize and validate tool names in LLM responses. 2918 2919 This callback catches hallucinated tool names that would cause 2920 errors when sent back to the LLM provider. 2921 2922 When an invalid tool name is detected: 2923 1. The invalid function_call is removed from the response 2924 2. A synthetic error message is injected to inform the LLM 2925 3. The response is modified to continue the conversation gracefully 2926 2927 This prevents the entire request from failing with a provider error. 2928 """ 2929 log_identifier = "[Callback:SanitizeToolNames]" 2930 2931 # Only process non-partial responses with function calls 2932 if llm_response.partial: 2933 return None 2934 2935 if not llm_response.content or not llm_response.content.parts: 2936 return None 2937 2938 # Find all function calls and check for invalid tool names 2939 invalid_calls = [] # Calls that need error responses 2940 silently_dropped_calls = [] # Calls to drop without error (e.g., redundant internal tool calls) 2941 valid_parts = [] 2942 2943 for part in llm_response.content.parts: 2944 if part.function_call: 2945 tool_name = part.function_call.name 2946 function_call_id = part.function_call.id or "" 2947 2948 # Check for placeholder patterns (tool names starting with $) 2949 # This catches $FUNCTION_NAME, $ARTIFACT_TOOL, $TOOL_NAME, etc. 2950 is_placeholder = tool_name.startswith('$') 2951 2952 # Check against Bedrock's validation pattern 2953 is_valid_format = VALID_TOOL_NAME_PATTERN.match(tool_name) is not None 2954 2955 # Check for unauthorized _notify_artifact_save calls 2956 # This tool should only be called by the system with host-notify- prefix 2957 # LLM-generated calls (without this prefix) should be silently dropped 2958 # since the artifact was already saved and a system call was injected 2959 is_unauthorized_internal_tool = ( 2960 tool_name == "_notify_artifact_save" 2961 and not function_call_id.startswith("host-notify-") 2962 ) 2963 2964 if is_placeholder: 2965 log.warning( 2966 "%s Detected hallucinated placeholder tool name: '%s'. " 2967 "This is a known LLM hallucination from training data examples.", 2968 log_identifier, 2969 tool_name, 2970 ) 2971 invalid_calls.append((part.function_call, "placeholder_hallucination")) 2972 elif is_unauthorized_internal_tool: 2973 # Silently drop LLM-generated _notify_artifact_save calls 2974 # The system already injected a valid call, so this is redundant 2975 log.info( 2976 "%s Silently dropping redundant LLM-generated call to internal tool: '%s' (id=%s). " 2977 "The system already injected a valid call for this artifact save.", 2978 log_identifier, 2979 tool_name, 2980 function_call_id, 2981 ) 2982 silently_dropped_calls.append(part.function_call) 2983 # Don't add to invalid_calls - we don't want to send an error response 2984 elif not is_valid_format: 2985 log.warning( 2986 "%s Detected invalid tool name format: '%s'. " 2987 "Tool names must match pattern: ^[a-zA-Z][a-zA-Z0-9_-]*$", 2988 log_identifier, 2989 tool_name, 2990 ) 2991 invalid_calls.append((part.function_call, "invalid_format")) 2992 else: 2993 # Valid tool call, keep it 2994 valid_parts.append(part) 2995 else: 2996 # Non-function-call parts (text, etc.) are always kept 2997 valid_parts.append(part) 2998 2999 if not invalid_calls and not silently_dropped_calls: 3000 # All tool names are valid, no modification needed 3001 return None 3002 3003 # If we only have silently dropped calls (no invalid calls needing error responses), 3004 # just return a modified response without forcing another turn 3005 if not invalid_calls and silently_dropped_calls: 3006 log.info( 3007 "%s Silently dropped %d redundant internal tool call(s). No error response needed.", 3008 log_identifier, 3009 len(silently_dropped_calls), 3010 ) 3011 # Return modified response with the dropped calls removed, preserving turn_complete 3012 return LlmResponse( 3013 content=adk_types.Content( 3014 role="model", 3015 parts=valid_parts if valid_parts else [adk_types.Part(text=" ")], 3016 ), 3017 partial=False, 3018 turn_complete=llm_response.turn_complete, # Preserve original turn_complete 3019 ) 3020 3021 # Log the invalid calls for debugging 3022 for fc, reason in invalid_calls: 3023 log.error( 3024 "%s Removing invalid tool call: name='%s', reason='%s', args=%s", 3025 log_identifier, 3026 fc.name, 3027 reason, 3028 fc.args, 3029 ) 3030 3031 # Create synthetic error responses for the invalid calls 3032 # This informs the LLM that its tool call was invalid 3033 error_response_parts = [] 3034 for fc, reason in invalid_calls: 3035 if reason == "placeholder_hallucination": 3036 error_message = ( 3037 f"ERROR: '{fc.name}' is not a valid tool name. " 3038 "You appear to have hallucinated a placeholder. " 3039 "Please use only the actual tools available to you. " 3040 "Review the available tools and try again with a real tool name." 3041 ) 3042 else: 3043 error_message = ( 3044 f"ERROR: '{fc.name}' is not a valid tool name format. " 3045 "Tool names must start with a letter and contain only letters, numbers, underscores, and hyphens. " 3046 "Please use only the actual tools available to you." 3047 ) 3048 3049 error_response_part = adk_types.Part.from_function_response( 3050 name=fc.name, 3051 response={"status": "error", "message": error_message}, 3052 ) 3053 # Preserve the function call ID for proper pairing 3054 if fc.id: 3055 error_response_part.function_response.id = fc.id 3056 error_response_parts.append(error_response_part) 3057 3058 # If there are still valid parts, keep them and add error responses 3059 if valid_parts or error_response_parts: 3060 # Reconstruct the response with valid parts 3061 # The error responses will be added as a follow-up content 3062 modified_response = LlmResponse( 3063 content=adk_types.Content( 3064 role="model", 3065 parts=valid_parts if valid_parts else [adk_types.Part(text=" ")], 3066 ), 3067 partial=False, 3068 turn_complete=False, # Force another turn to handle the error 3069 ) 3070 3071 # Store the error responses in callback state for the framework to handle 3072 # The ADK will automatically pair these with the function calls 3073 if error_response_parts: 3074 # Create a synthetic tool response content to inject 3075 error_content = adk_types.Content( 3076 role="tool", 3077 parts=error_response_parts, 3078 ) 3079 # Store in callback state for potential use by other callbacks 3080 callback_context.state["sanitized_tool_error_content"] = error_content 3081 3082 log.info( 3083 "%s Sanitized %d invalid tool call(s). Response modified to continue gracefully.", 3084 log_identifier, 3085 len(invalid_calls), 3086 ) 3087 3088 return modified_response 3089 3090 # Edge case: all parts were invalid function calls 3091 # Return a response asking the LLM to try again 3092 log.warning( 3093 "%s All function calls in response were invalid. Returning error guidance.", 3094 log_identifier, 3095 ) 3096 3097 return LlmResponse( 3098 content=adk_types.Content( 3099 role="model", 3100 parts=[ 3101 adk_types.Part( 3102 text="I apologize, but I made an error in my tool usage. " 3103 "Let me review the available tools and try again with the correct tool names." 3104 ) 3105 ], 3106 ), 3107 partial=False, 3108 turn_complete=True, 3109 ) 3110 3111 3112 # ============================================================================ 3113 # OpenAPI Tool Audit Logging Callbacks 3114 # ============================================================================ 3115 3116 3117 def _is_openapi_tool(tool: BaseTool) -> bool: 3118 """ 3119 Check if a tool is an OpenAPI-based RestApiTool. 3120 3121 Args: 3122 tool: The tool to check 3123 3124 Returns: 3125 True if the tool is OpenAPI-based, False otherwise 3126 """ 3127 # Check the origin attribute set by SAM at initialization 3128 return getattr(tool, "origin", None) == "openapi" 3129 3130 3131 def _extract_openapi_base_url(tool: BaseTool) -> Optional[str]: 3132 """Extract base URL from an OpenAPI tool.""" 3133 try: 3134 # Check for endpoint.base_url attribute (RestApiTool has this) 3135 if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "base_url"): 3136 return str(tool.endpoint.base_url) 3137 3138 if hasattr(tool, "base_url") and tool.base_url: 3139 return str(tool.base_url) 3140 3141 if hasattr(tool, "_base_url") and tool._base_url: 3142 return str(tool._base_url) 3143 3144 if hasattr(tool, "_config") and isinstance(tool._config, dict): 3145 return tool._config.get("base_url") 3146 3147 except Exception as e: 3148 log.debug("Could not extract base URL: %s", e) 3149 3150 return None 3151 3152 3153 def _extract_openapi_http_method(tool: BaseTool) -> Optional[str]: 3154 """Extract HTTP method from OpenAPI tool.""" 3155 # Get from tool's endpoint (RestApiTool) 3156 if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "method"): 3157 return str(tool.endpoint.method).upper() 3158 return None 3159 3160 3161 def _extract_openapi_operation_id(tool: BaseTool) -> Optional[str]: 3162 """Extract operation ID from OpenAPI tool.""" 3163 # Get from tool's operation (RestApiTool) 3164 if hasattr(tool, "operation") and hasattr(tool.operation, "operationId"): 3165 return tool.operation.operationId 3166 return None 3167 3168 3169 def _extract_openapi_metadata(tool: BaseTool) -> Dict[str, Optional[str]]: 3170 """ 3171 Extract all OpenAPI metadata from tool in one pass. 3172 3173 Returns: 3174 Dict with keys: operation_id, base_url, http_method, endpoint_path, tool_uri 3175 """ 3176 operation_id = _extract_openapi_operation_id(tool) 3177 base_url = _extract_openapi_base_url(tool) 3178 http_method = _extract_openapi_http_method(tool) 3179 3180 # Extract endpoint path template (safe, non-sensitive) 3181 endpoint_path = None 3182 if hasattr(tool, "endpoint") and hasattr(tool.endpoint, "path"): 3183 endpoint_path = tool.endpoint.path 3184 3185 # Construct URI from base URL + path template 3186 tool_uri = base_url 3187 if base_url and endpoint_path: 3188 base_clean = base_url.rstrip('/') 3189 path_clean = endpoint_path.lstrip('/') if endpoint_path.startswith('/') else endpoint_path 3190 tool_uri = f"{base_clean}/{path_clean}" 3191 3192 return { 3193 "operation_id": operation_id, 3194 "base_url": base_url, 3195 "http_method": http_method, 3196 "endpoint_path": endpoint_path, 3197 "tool_uri": tool_uri, 3198 } 3199 3200 3201 def audit_log_openapi_tool_invocation_start( 3202 tool: BaseTool, 3203 args: Dict[str, Any], 3204 tool_context: ToolContext, 3205 host_component: "SamAgentComponent", 3206 ) -> None: 3207 """ 3208 ADK before_tool_callback for OpenAPI tools - logs invocation start. 3209 3210 Args: 3211 tool: The tool being invoked 3212 args: Tool arguments (NOT logged) 3213 tool_context: ADK tool context 3214 host_component: The SamAgentComponent host 3215 """ 3216 # Only process OpenAPI tools (RestApiTool instances created from OpenAPI specs) 3217 if not _is_openapi_tool(tool): 3218 return 3219 3220 # Extract context 3221 invocation_context = tool_context._invocation_context 3222 session_id = None 3223 user_id = None 3224 3225 if invocation_context and invocation_context.session: 3226 session_id = invocation_context.session.id 3227 user_id = invocation_context.session.user_id 3228 3229 # Extract all OpenAPI metadata 3230 metadata = _extract_openapi_metadata(tool) 3231 3232 # Build action field and correlation tag 3233 action = f"{metadata['http_method']}: {metadata['operation_id']}" if metadata['http_method'] and metadata['operation_id'] else (metadata['operation_id'] or "unknown") 3234 correlation_tag = f"corr:{session_id}" if session_id else "corr:unknown" 3235 3236 # Store start time for latency calculation 3237 tool_context.state["audit_start_time_ms"] = int(time.time() * 1000) 3238 3239 # Log in MCP-style format: [openapi-tool] [corr:xxx] message 3240 log.info( 3241 "[openapi-tool] [%s] Tool call: %s - User: %s, Agent: %s, URI: %s", 3242 correlation_tag, 3243 action, 3244 user_id, 3245 host_component.agent_name, 3246 metadata['tool_uri'], 3247 extra={ 3248 "user_id": user_id, 3249 "agent_id": host_component.agent_name, 3250 "tool_name": tool.name, 3251 "session_id": session_id, 3252 "operation_id": metadata['operation_id'], 3253 "tool_uri": metadata['tool_uri'], 3254 }, 3255 ) 3256 3257 3258 async def audit_log_openapi_tool_execution_result( 3259 tool: BaseTool, 3260 args: Dict[str, Any], 3261 tool_context: ToolContext, 3262 tool_response: Any, 3263 host_component: "SamAgentComponent", 3264 ) -> Optional[Dict[str, Any]]: 3265 """ 3266 ADK after_tool_callback for OpenAPI tools - logs execution result. 3267 3268 Args: 3269 tool: The tool that was executed 3270 args: Tool arguments (NOT logged) 3271 tool_context: ADK tool context 3272 tool_response: Tool response (NOT logged for sensitive data) 3273 host_component: The SamAgentComponent host 3274 3275 Returns: 3276 None (does not modify the response) 3277 """ 3278 # Only process OpenAPI tools (RestApiTool instances created from OpenAPI specs) 3279 if not _is_openapi_tool(tool): 3280 return None 3281 3282 # Extract context 3283 invocation_context = tool_context._invocation_context 3284 session_id = None 3285 user_id = None 3286 3287 if invocation_context and invocation_context.session: 3288 session_id = invocation_context.session.id 3289 user_id = invocation_context.session.user_id 3290 3291 # Extract all OpenAPI metadata 3292 metadata = _extract_openapi_metadata(tool) 3293 3294 # Check if request failed or is pending auth 3295 has_error = False 3296 is_pending_auth = False 3297 error_type = None 3298 3299 if isinstance(tool_response, dict): 3300 has_error = "error" in tool_response 3301 is_pending_auth = tool_response.get("pending") == True 3302 3303 # Extract error type if present (safe, non-sensitive) 3304 if has_error: 3305 error_data = tool_response.get("error", {}) 3306 if isinstance(error_data, dict): 3307 error_type = error_data.get("type") or error_data.get("code") 3308 elif isinstance(error_data, str): 3309 # If error is just a string, try to classify it 3310 error_lower = error_data.lower() 3311 if "auth" in error_lower or "unauthorized" in error_lower: 3312 error_type = "auth_error" 3313 elif "not found" in error_lower or "404" in error_lower: 3314 error_type = "not_found" 3315 elif "timeout" in error_lower: 3316 error_type = "timeout" 3317 elif "network" in error_lower or "connection" in error_lower: 3318 error_type = "network_error" 3319 3320 # Calculate latency 3321 latency_ms = None 3322 start_time = tool_context.state.get("audit_start_time_ms") 3323 if start_time: 3324 latency_ms = int(time.time() * 1000) - start_time 3325 3326 # Build action and correlation tag 3327 action = f"{metadata['http_method']}: {metadata['operation_id']}" if metadata['http_method'] and metadata['operation_id'] else (metadata['operation_id'] or "unknown") 3328 correlation_tag = f"corr:{session_id}" if session_id else "corr:unknown" 3329 3330 if has_error: 3331 # Build error message with optional error type 3332 error_msg = f"[openapi-tool] [{correlation_tag}] {action} failed - Path: {metadata['endpoint_path'] or 'unknown'}" 3333 if error_type: 3334 error_msg += f", Error Type: {error_type}" 3335 error_msg += f", Latency: {latency_ms}ms, User: {user_id}" 3336 3337 log.error( 3338 error_msg, 3339 extra={ 3340 "user_id": user_id, 3341 "agent_id": host_component.agent_name, 3342 "tool_name": tool.name, 3343 "session_id": session_id, 3344 "operation_id": metadata['operation_id'], 3345 "tool_uri": metadata['tool_uri'], 3346 "endpoint_path": metadata['endpoint_path'], 3347 "error_type": error_type, 3348 }, 3349 ) 3350 elif is_pending_auth: 3351 log.warning( 3352 "[openapi-tool] [%s] %s pending auth - Latency: %sms, User: %s", 3353 correlation_tag, 3354 action, 3355 latency_ms, 3356 user_id, 3357 extra={ 3358 "user_id": user_id, 3359 "agent_id": host_component.agent_name, 3360 "tool_name": tool.name, 3361 "session_id": session_id, 3362 "operation_id": metadata['operation_id'], 3363 "tool_uri": metadata['tool_uri'], 3364 "status": "pending_auth", 3365 }, 3366 ) 3367 else: 3368 # SUCCESS format 3369 log.info( 3370 "[openapi-tool] [%s] %s completed - Latency: %sms, User: %s", 3371 correlation_tag, 3372 action, 3373 latency_ms, 3374 user_id, 3375 extra={ 3376 "user_id": user_id, 3377 "agent_id": host_component.agent_name, 3378 "tool_name": tool.name, 3379 "session_id": session_id, 3380 "operation_id": metadata['operation_id'], 3381 "tool_uri": metadata['tool_uri'], 3382 }, 3383 ) 3384 3385 return None 3386 3387 3388 def apply_model_override_callback( 3389 callback_context: CallbackContext, 3390 llm_request: LlmRequest, 3391 host_component: "SamAgentComponent", 3392 ) -> Optional[LlmResponse]: 3393 """Read model_override from A2A task metadata. 3394 3395 model_override is a full LiteLLM-ready config dict (same format as 3396 ``ModelConfigService.get_by_alias(raw=True)``). Applied via a 3397 ``ContextVar`` that ``LiteLlm.generate_content_async`` consumes. 3398 """ 3399 # Inline import to avoid circular dependency (callbacks ← setup → lite_llm) 3400 from .models.lite_llm import set_model_override 3401 3402 a2a_context = callback_context.state.get("a2a_context") 3403 if not a2a_context: 3404 set_model_override(None) 3405 return None 3406 3407 metadata = a2a_context.get("original_message_metadata") or {} 3408 3409 model_override = metadata.get("model_override") 3410 if isinstance(model_override, dict) and model_override.get("model"): 3411 set_model_override(model_override) 3412 log.info( 3413 "[Callback:ModelOverride] Set per-request model override: %s", 3414 model_override["model"], 3415 ) 3416 else: 3417 set_model_override(None) 3418 3419 return None