component.py
1 """ 2 WorkflowExecutorComponent implementation. 3 Orchestrates workflow execution by coordinating agents. 4 """ 5 6 import logging 7 import threading 8 import uuid 9 import asyncio 10 import json 11 from datetime import datetime, timezone 12 from typing import Any, Dict, Optional, TYPE_CHECKING 13 14 from solace_ai_connector.common.message import Message as SolaceMessage 15 from solace_ai_connector.common.event import Event, EventType 16 17 from ..common import a2a 18 from ..common.sac.sam_component_base import SamComponentBase 19 from ..common.agent_registry import AgentRegistry 20 from ..common.constants import ( 21 ARTIFACT_TAG_WORKING, 22 EXTENSION_URI_AGENT_TYPE, 23 EXTENSION_URI_SCHEMAS, 24 ) 25 26 EXTENSION_URI_WORKFLOW_VISUALIZATION = "https://solace.com/a2a/extensions/sam/workflow-visualization" 27 from ..common.data_parts import ( 28 WorkflowExecutionStartData, 29 WorkflowExecutionResultData, 30 WorkflowNodeExecutionStartData, 31 WorkflowNodeExecutionResultData, 32 WorkflowMapProgressData, 33 ArtifactRef, 34 ) 35 from ..agent.adk.services import ( 36 initialize_session_service, 37 initialize_artifact_service, 38 ) 39 from ..agent.utils.artifact_helpers import save_artifact_with_metadata 40 from .app import WorkflowDefinition 41 from .workflow_execution_context import WorkflowExecutionContext, WorkflowExecutionState 42 from .dag_executor import DAGExecutor, WorkflowNodeFailureError 43 from .agent_caller import AgentCaller 44 from .protocol.event_handlers import ( 45 handle_task_request, 46 handle_agent_response, 47 handle_cancel_request, 48 handle_agent_card_message, 49 ) 50 51 from a2a.types import ( 52 A2ARequest, 53 AgentCard, 54 AgentCapabilities, 55 TaskState, 56 TaskStatusUpdateEvent, 57 ) 58 59 log = logging.getLogger(__name__) 60 61 info = { 62 "class_name": "WorkflowExecutorComponent", 63 "description": "Orchestrates workflow execution by coordinating agents.", 64 "config_parameters": [], 65 } 66 67 68 class WorkflowExecutorComponent(SamComponentBase): 69 """ 70 Orchestrates workflow execution by coordinating agents. 71 72 Extends SamComponentBase to leverage: 73 - Dedicated asyncio event loop 74 - A2A message publishing infrastructure 75 - Component lifecycle management 76 """ 77 78 def __init__(self, **kwargs): 79 """ 80 Initialize workflow executor component. 81 """ 82 if "component_config" in kwargs and "app_config" in kwargs["component_config"]: 83 name = kwargs["component_config"]["app_config"].get("name") 84 if name: 85 kwargs.setdefault("name", name) 86 87 super().__init__(info, **kwargs) 88 89 # Configuration 90 self.workflow_name = self.get_config("name") 91 self.namespace = self.get_config("namespace") 92 workflow_config = self.get_config("workflow") 93 self.auto_summarization_config = self.get_config( 94 "auto_summarization", { 95 "enabled": True, 96 "compaction_percentage": 0.25 97 } 98 ) 99 100 # Parse workflow definition 101 self.workflow_definition = WorkflowDefinition.model_validate(workflow_config) 102 103 # Initialize synchronous services 104 self.session_service = initialize_session_service(self) 105 self.artifact_service = initialize_artifact_service(self) 106 107 # Initialize execution tracking 108 self.active_workflows: Dict[str, WorkflowExecutionContext] = {} 109 self.active_workflows_lock = threading.Lock() 110 111 # Initialize executor components 112 self.dag_executor = DAGExecutor(self.workflow_definition, self) 113 self.agent_caller = AgentCaller(self) 114 115 # Create agent registry for agent discovery 116 self.agent_registry = AgentRegistry() 117 118 def invoke(self, message: SolaceMessage, data: dict) -> dict: 119 """Placeholder invoke method. Logic in process_event.""" 120 return None 121 122 def _get_component_id(self) -> str: 123 """Returns the workflow name as the component identifier.""" 124 return self.workflow_name 125 126 def _get_component_type(self) -> str: 127 """Returns 'workflow' as the component type.""" 128 return "workflow" 129 130 async def _handle_message_async(self, message: SolaceMessage, topic: str) -> None: 131 """ 132 Async handler for incoming messages. 133 """ 134 # Determine message type based on topic 135 request_topic = a2a.get_agent_request_topic(self.namespace, self.workflow_name) 136 discovery_topic = a2a.get_discovery_subscription_topic(self.namespace) 137 response_sub = a2a.get_agent_response_subscription_topic( 138 self.namespace, self.workflow_name 139 ) 140 status_sub = a2a.get_agent_status_subscription_topic( 141 self.namespace, self.workflow_name 142 ) 143 if topic == request_topic: 144 # Check if this is a cancel request or a regular task request 145 try: 146 payload = message.get_payload() 147 method = payload.get("method") if isinstance(payload, dict) else None 148 if method == "tasks/cancel": 149 task_id = a2a.get_task_id_from_cancel_request( 150 A2ARequest.model_validate(payload) 151 ) 152 await handle_cancel_request(self, task_id, message) 153 else: 154 await handle_task_request(self, message) 155 except Exception as e: 156 log.error(f"{self.log_identifier} Error processing request: {e}") 157 message.call_acknowledgements() 158 elif a2a.topic_matches_subscription(topic, discovery_topic): 159 handle_agent_card_message(self, message) 160 elif a2a.topic_matches_subscription( 161 topic, response_sub 162 ) or a2a.topic_matches_subscription(topic, status_sub): 163 await handle_agent_response(self, message) 164 else: 165 log.warning(f"{self.log_identifier} Unknown topic: {topic}") 166 message.call_acknowledgements() 167 168 async def _async_setup_and_run(self) -> None: 169 """ 170 Async initialization called by SamComponentBase. 171 Sets up services and publishes workflow agent card. 172 """ 173 # Set up periodic agent card publishing 174 self._setup_periodic_agent_card_publishing() 175 176 # Component is now ready to receive requests 177 log.info(f"{self.log_identifier} Workflow ready: {self.workflow_name}") 178 179 def _pre_async_cleanup(self) -> None: 180 """Pre-cleanup before async loop stops.""" 181 pass 182 183 def _setup_periodic_agent_card_publishing(self) -> None: 184 """ 185 Sets up periodic publishing of the workflow agent card. 186 Similar to SamAgentComponent's _publish_agent_card method. 187 """ 188 try: 189 publish_config = self.get_config("agent_card_publishing", {}) 190 publish_interval_sec = publish_config.get("interval_seconds") 191 192 if publish_interval_sec and publish_interval_sec > 0: 193 log.info( 194 f"{self.log_identifier} Scheduling workflow agent card publishing " 195 f"every {publish_interval_sec} seconds." 196 ) 197 198 # Publish immediately on first call 199 self._publish_workflow_agent_card_sync() 200 201 # Set up periodic timer 202 self.add_timer( 203 delay_ms=publish_interval_sec * 1000, 204 timer_id="workflow_agent_card_publish", 205 interval_ms=publish_interval_sec * 1000, 206 callback=lambda timer_data: self._publish_workflow_agent_card_sync(), 207 ) 208 else: 209 log.warning( 210 f"{self.log_identifier} Workflow agent card publishing interval not " 211 f"configured or invalid, card will not be published periodically." 212 ) 213 except Exception as e: 214 log.exception( 215 f"{self.log_identifier} Error during agent card publishing setup: {e}" 216 ) 217 218 def _publish_workflow_agent_card_sync(self) -> None: 219 """ 220 Synchronous wrapper for publishing workflow agent card. 221 Called by timer callback. 222 """ 223 try: 224 agent_card = self._create_workflow_agent_card() 225 discovery_topic = a2a.get_agent_discovery_topic(self.namespace) 226 self.publish_a2a_message( 227 payload=agent_card.model_dump(exclude_none=True), topic=discovery_topic 228 ) 229 log.debug( 230 f"{self.log_identifier} Published workflow agent card to {discovery_topic}" 231 ) 232 except Exception as e: 233 log.error( 234 f"{self.log_identifier} Failed to publish workflow agent card: {e}" 235 ) 236 237 def _get_workflow_config_json(self) -> Dict[str, Any]: 238 """Get workflow configuration as a JSON-serializable dict.""" 239 nodes_json = [] 240 for node in self.workflow_definition.nodes: 241 node_dict = { 242 "id": node.id, 243 "type": node.type, 244 } 245 if node.depends_on: 246 node_dict["depends_on"] = node.depends_on 247 248 # Add type-specific fields 249 if node.type == "agent": 250 node_dict["agent_name"] = node.agent_name 251 if node.input: 252 node_dict["input"] = node.input 253 if node.instruction: 254 node_dict["instruction"] = node.instruction 255 if node.input_schema_override: 256 node_dict["input_schema_override"] = node.input_schema_override 257 if node.output_schema_override: 258 node_dict["output_schema_override"] = node.output_schema_override 259 elif node.type == "switch": 260 node_dict["cases"] = [ 261 {"condition": c.condition, "node": c.node} 262 for c in node.cases 263 ] 264 if node.default: 265 node_dict["default"] = node.default 266 elif node.type == "map": 267 node_dict["node"] = node.node 268 node_dict["items"] = node.items 269 elif node.type == "loop": 270 node_dict["node"] = node.node 271 if node.condition: 272 node_dict["condition"] = node.condition 273 if node.max_iterations: 274 node_dict["max_iterations"] = node.max_iterations 275 if node.delay: 276 node_dict["delay"] = node.delay 277 elif node.type == "workflow": 278 node_dict["workflow_name"] = node.workflow_name 279 if node.input: 280 node_dict["input"] = node.input 281 if node.instruction: 282 node_dict["instruction"] = node.instruction 283 if node.input_schema_override: 284 node_dict["input_schema_override"] = node.input_schema_override 285 if node.output_schema_override: 286 node_dict["output_schema_override"] = node.output_schema_override 287 288 nodes_json.append(node_dict) 289 290 config = { 291 "nodes": nodes_json, 292 } 293 294 if self.workflow_definition.description: 295 config["description"] = self.workflow_definition.description 296 if self.workflow_definition.input_schema: 297 config["input_schema"] = self.workflow_definition.input_schema 298 if self.workflow_definition.output_schema: 299 config["output_schema"] = self.workflow_definition.output_schema 300 if self.workflow_definition.version: 301 config["version"] = self.workflow_definition.version 302 if self.workflow_definition.output_mapping: 303 config["output_mapping"] = self.workflow_definition.output_mapping 304 305 return config 306 307 def _create_workflow_agent_card(self) -> AgentCard: 308 """Create the workflow agent card.""" 309 # Build extensions list 310 extensions_list = [] 311 312 from a2a.types import AgentExtension 313 314 # Add agent type extension 315 agent_type_extension = AgentExtension( 316 uri=EXTENSION_URI_AGENT_TYPE, 317 description="Specifies the type of agent (e.g., 'workflow').", 318 params={"type": "workflow"}, 319 ) 320 extensions_list.append(agent_type_extension) 321 322 # Add schema extension if schemas are defined 323 input_schema = self.workflow_definition.input_schema 324 output_schema = self.workflow_definition.output_schema 325 326 if input_schema or output_schema: 327 schema_params = {} 328 if input_schema: 329 schema_params["input_schema"] = input_schema 330 if output_schema: 331 schema_params["output_schema"] = output_schema 332 333 schemas_extension = AgentExtension( 334 uri=EXTENSION_URI_SCHEMAS, 335 description="Input and output JSON schemas for the workflow.", 336 params=schema_params, 337 ) 338 extensions_list.append(schemas_extension) 339 340 # Add workflow configuration extension 341 try: 342 workflow_config = self._get_workflow_config_json() 343 viz_extension = AgentExtension( 344 uri=EXTENSION_URI_WORKFLOW_VISUALIZATION, 345 description="JSON configuration of the workflow.", 346 params={"workflow_config": workflow_config}, 347 ) 348 extensions_list.append(viz_extension) 349 except Exception as e: 350 log.warning( 351 f"{self.log_identifier} Failed to generate workflow configuration: {e}" 352 ) 353 354 capabilities = AgentCapabilities( 355 streaming=False, 356 extensions=extensions_list if extensions_list else None, 357 ) 358 359 return AgentCard( 360 name=self.workflow_name, 361 display_name=self.get_config("display_name"), 362 description=self.workflow_definition.description, 363 defaultInputModes=["text"], 364 defaultOutputModes=["text"], 365 skills=self.workflow_definition.skills or [], 366 capabilities=capabilities, 367 version=self.workflow_definition.version, 368 url=f"solace:{a2a.get_agent_request_topic(self.namespace, self.workflow_name)}", 369 ) 370 371 async def handle_cache_expiry_event(self, cache_data: Dict[str, Any]): 372 """Handle agent call timeout via cache expiry.""" 373 sub_task_id = cache_data.get("key") 374 workflow_task_id = cache_data.get("expired_data") 375 376 if not sub_task_id or not workflow_task_id: 377 return 378 379 # Find workflow context 380 with self.active_workflows_lock: 381 workflow_context = self.active_workflows.get(workflow_task_id) 382 383 if not workflow_context: 384 log.warning( 385 f"{self.log_identifier} Timeout for unknown workflow: {workflow_task_id}" 386 ) 387 return 388 389 # Get node ID for this sub-task 390 node_id = workflow_context.get_node_id_for_sub_task(sub_task_id) 391 392 if not node_id: 393 return 394 395 timeout_seconds = self.get_config("default_node_timeout_seconds", 300) 396 log.error( 397 f"{self.log_identifier} Agent call timed out for node '{node_id}' " 398 f"(sub-task: {sub_task_id})" 399 ) 400 401 # Create timeout error 402 from ..common.data_parts import StructuredInvocationResult 403 404 result_data = StructuredInvocationResult( 405 type="structured_invocation_result", 406 status="error", 407 error_message=f"Agent timed out after {timeout_seconds} seconds", 408 ) 409 410 # Handle as node failure 411 await self.dag_executor.handle_node_completion( 412 workflow_context, sub_task_id, result_data 413 ) 414 415 async def publish_workflow_event( 416 self, 417 workflow_context: WorkflowExecutionContext, 418 event_data: Any, 419 ): 420 """Publish a workflow status event.""" 421 try: 422 status_update_event = a2a.create_data_signal_event( 423 task_id=workflow_context.a2a_context["logical_task_id"], 424 context_id=workflow_context.a2a_context["session_id"], 425 signal_data=event_data, 426 agent_name=self.workflow_name, 427 ) 428 429 rpc_response = a2a.create_success_response( 430 result=status_update_event, 431 request_id=workflow_context.a2a_context["jsonrpc_request_id"], 432 ) 433 434 target_topic = workflow_context.a2a_context.get( 435 "statusTopic" 436 ) or a2a.get_gateway_status_topic( 437 self.namespace, 438 "gateway", # Placeholder, usually gateway_id 439 workflow_context.a2a_context["logical_task_id"], 440 ) 441 442 self.publish_a2a_message( 443 payload=rpc_response.model_dump(exclude_none=True), 444 topic=target_topic, 445 user_properties={ 446 "a2aUserConfig": workflow_context.a2a_context.get( 447 "a2a_user_config", {} 448 ) 449 }, 450 ) 451 except Exception as e: 452 log.error(f"{self.log_identifier} Failed to publish workflow event: {e}") 453 454 async def _execute_exit_handlers( 455 self, 456 workflow_context: WorkflowExecutionContext, 457 outcome: str, 458 error: Exception = None, 459 ): 460 """ 461 Execute exit handlers (onExit) based on workflow outcome. 462 463 Args: 464 workflow_context: The workflow context 465 outcome: "success" or "failure" 466 error: The error if outcome is "failure" 467 """ 468 log_id = f"{self.log_identifier}[ExitHandler:{workflow_context.workflow_task_id}]" 469 470 on_exit = self.workflow_definition.on_exit 471 if not on_exit: 472 return 473 474 workflow_state = workflow_context.workflow_state 475 476 # Inject workflow status and error into node_outputs for template resolution 477 workflow_state.node_outputs["workflow"] = { 478 "status": outcome, 479 "error": { 480 "message": str(error) if error else None, 481 "node_id": ( 482 workflow_state.error_state.get("failed_node_id") 483 if workflow_state.error_state 484 else None 485 ), 486 } 487 if error 488 else None, 489 } 490 491 # Determine which handlers to run 492 nodes_to_run = [] 493 if isinstance(on_exit, str): 494 # Simple string reference to a single node 495 nodes_to_run.append(on_exit) 496 else: 497 # ExitHandler object with conditional handlers 498 if on_exit.always: 499 nodes_to_run.append(on_exit.always) 500 if outcome == "success" and on_exit.on_success: 501 nodes_to_run.append(on_exit.on_success) 502 if outcome == "failure" and on_exit.on_failure: 503 nodes_to_run.append(on_exit.on_failure) 504 if outcome == "cancelled" and on_exit.on_cancel: 505 nodes_to_run.append(on_exit.on_cancel) 506 507 # Execute each exit handler node 508 for node_id in nodes_to_run: 509 try: 510 log.info(f"{log_id} Executing exit handler node '{node_id}'") 511 await self.dag_executor.execute_node( 512 node_id, workflow_state, workflow_context 513 ) 514 except Exception as e: 515 # Log but don't fail the workflow - exit handlers shouldn't break finalization 516 log.error( 517 f"{log_id} Exit handler node '{node_id}' failed: {e}. " 518 "Continuing with finalization." 519 ) 520 521 async def finalize_workflow_success( 522 self, workflow_context: WorkflowExecutionContext 523 ): 524 """Finalize successful workflow execution and publish result.""" 525 log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]" 526 log.info(f"{log_id} Finalizing workflow success") 527 528 # Execute exit handlers first 529 await self._execute_exit_handlers(workflow_context, "success") 530 531 # Construct final output based on output mapping 532 final_output = await self._construct_final_output(workflow_context) 533 534 # Create output artifact with the workflow result 535 # Use unique filename: <workflow_name>_<4-digit-uuid>_result.json 536 unique_suffix = uuid.uuid4().hex[:4] 537 output_artifact_name = f"{self.workflow_name}_{unique_suffix}_result.json" 538 539 user_id = workflow_context.a2a_context["user_id"] 540 session_id = workflow_context.a2a_context["session_id"] 541 542 # Prepare artifact content - store just the output data 543 # (not wrapped in {"status": ..., "output": ...} to match agent artifact format) 544 content_bytes = json.dumps(final_output).encode("utf-8") 545 546 # Get output schema from workflow definition for artifact metadata 547 output_schema = self.workflow_definition.output_schema 548 metadata_dict = { 549 "description": f"Output from workflow '{self.workflow_name}'", 550 "source": "workflow_execution", 551 "workflow_name": self.workflow_name, 552 } 553 if output_schema: 554 metadata_dict["schema"] = output_schema 555 556 try: 557 save_result = await save_artifact_with_metadata( 558 artifact_service=self.artifact_service, 559 app_name=self.workflow_name, 560 user_id=user_id, 561 session_id=session_id, 562 filename=output_artifact_name, 563 content_bytes=content_bytes, 564 mime_type="application/json", 565 metadata_dict=metadata_dict, 566 timestamp=datetime.now(timezone.utc), 567 tags=[ARTIFACT_TAG_WORKING], 568 ) 569 570 if save_result["status"] != "success": 571 log.error(f"{log_id} Failed to save workflow output artifact: {save_result.get('message')}") 572 artifact_version = None 573 else: 574 artifact_version = save_result.get("data_version") 575 log.info(f"{log_id} Created workflow output artifact: {output_artifact_name} v{artifact_version}") 576 except Exception as e: 577 log.exception(f"{log_id} Error saving workflow output artifact: {e}") 578 artifact_version = None 579 580 # Publish completion event 581 await self.publish_workflow_event( 582 workflow_context, 583 WorkflowExecutionResultData( 584 type="workflow_execution_result", 585 status="success", 586 workflow_output=final_output, 587 ), 588 ) 589 590 # Build produced_artifacts list for the response 591 produced_artifacts = [] 592 if artifact_version is not None: 593 produced_artifacts.append({ 594 "filename": output_artifact_name, 595 "version": artifact_version, 596 }) 597 598 # Create response message text that includes artifact reference 599 if artifact_version is not None: 600 response_text = f"Workflow completed successfully. Output artifact: {output_artifact_name}:v{artifact_version}" 601 else: 602 response_text = "Workflow completed successfully" 603 604 # Check if this workflow was invoked by another workflow (structured invocation) 605 # Check if this was a structured invocation (from gateway, workflow, or agent) 606 # This is indicated by either: 607 # 1. The is_structured_invocation flag set during request handling, OR 608 # 2. Legacy check: replyToTopic containing '/agent/response/' 609 reply_to_topic = workflow_context.a2a_context.get("replyToTopic", "") 610 is_structured_invocation = workflow_context.a2a_context.get( 611 "is_structured_invocation", False 612 ) or "/agent/response/" in reply_to_topic 613 614 # Build message parts 615 message_parts = [] 616 617 if is_structured_invocation and artifact_version is not None: 618 # When invoked as a sub-workflow, include StructuredInvocationResult 619 # so the parent workflow can process the response 620 from ..common.data_parts import ArtifactRef, StructuredInvocationResult 621 invocation_result = StructuredInvocationResult( 622 type="structured_invocation_result", 623 status="success", 624 output_artifact_ref=ArtifactRef( 625 name=output_artifact_name, version=artifact_version 626 ), 627 ) 628 message_parts.append(a2a.create_data_part(data=invocation_result.model_dump())) 629 630 # Add text part 631 message_parts.append(a2a.create_text_part(text=response_text)) 632 633 # Create final task response 634 final_task = a2a.create_final_task( 635 task_id=workflow_context.a2a_context["logical_task_id"], 636 context_id=workflow_context.a2a_context["session_id"], 637 final_status=a2a.create_task_status( 638 state=TaskState.completed, 639 message=a2a.create_agent_parts_message(parts=message_parts), 640 ), 641 metadata={ 642 "workflow_name": self.workflow_name, 643 "agent_name": self.workflow_name, # For compatibility with peer agent response handling 644 "output": final_output, 645 "produced_artifacts": produced_artifacts, 646 }, 647 ) 648 649 # Publish response 650 response_topic = workflow_context.a2a_context.get( 651 "replyToTopic" 652 ) or a2a.get_client_response_topic( 653 self.namespace, workflow_context.a2a_context["client_id"] 654 ) 655 656 response = a2a.create_success_response( 657 result=final_task, 658 request_id=workflow_context.a2a_context["jsonrpc_request_id"], 659 ) 660 661 self.publish_a2a_message( 662 payload=response.model_dump(exclude_none=True), 663 topic=response_topic, 664 user_properties={ 665 "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {}) 666 }, 667 ) 668 669 # ACK original message 670 original_message = workflow_context.get_original_solace_message() 671 if original_message: 672 original_message.call_acknowledgements() 673 674 await self._cleanup_workflow_state(workflow_context) 675 676 async def finalize_workflow_failure( 677 self, workflow_context: WorkflowExecutionContext, error: Exception 678 ): 679 """Finalize failed workflow execution and publish error.""" 680 log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]" 681 log.warning(f"{log_id} Finalizing workflow failure: {error}") 682 683 # Execute exit handlers first (passing error info) 684 await self._execute_exit_handlers(workflow_context, "failure", error) 685 686 # Create output artifact with the error information 687 # Use unique filename: <workflow_name>_<4-digit-uuid>_result.json 688 unique_suffix = uuid.uuid4().hex[:4] 689 output_artifact_name = f"{self.workflow_name}_{unique_suffix}_result.json" 690 691 user_id = workflow_context.a2a_context["user_id"] 692 session_id = workflow_context.a2a_context["session_id"] 693 694 # Prepare artifact content with status and error 695 artifact_content = { 696 "status": "error", 697 "message": str(error), 698 } 699 content_bytes = json.dumps(artifact_content).encode("utf-8") 700 701 metadata_dict = { 702 "description": f"Error output from workflow '{self.workflow_name}'", 703 "source": "workflow_execution", 704 "workflow_name": self.workflow_name, 705 } 706 707 try: 708 save_result = await save_artifact_with_metadata( 709 artifact_service=self.artifact_service, 710 app_name=self.workflow_name, 711 user_id=user_id, 712 session_id=session_id, 713 filename=output_artifact_name, 714 content_bytes=content_bytes, 715 mime_type="application/json", 716 metadata_dict=metadata_dict, 717 timestamp=datetime.now(timezone.utc), 718 tags=[ARTIFACT_TAG_WORKING], 719 ) 720 721 if save_result["status"] != "success": 722 log.error(f"{log_id} Failed to save workflow error artifact: {save_result.get('message')}") 723 artifact_version = None 724 else: 725 artifact_version = save_result.get("data_version") 726 log.info(f"{log_id} Created workflow error artifact: {output_artifact_name} v{artifact_version}") 727 except Exception as e: 728 log.exception(f"{log_id} Error saving workflow error artifact: {e}") 729 artifact_version = None 730 731 # Publish failure event 732 await self.publish_workflow_event( 733 workflow_context, 734 WorkflowExecutionResultData( 735 type="workflow_execution_result", 736 status="error", 737 error_message=str(error), 738 ), 739 ) 740 741 # Build produced_artifacts list for the response 742 produced_artifacts = [] 743 if artifact_version is not None: 744 produced_artifacts.append({ 745 "filename": output_artifact_name, 746 "version": artifact_version, 747 }) 748 749 # Create response message text that includes artifact reference 750 if artifact_version is not None: 751 response_text = f"Workflow failed: {str(error)}. Error artifact: {output_artifact_name}:v{artifact_version}" 752 else: 753 response_text = f"Workflow failed: {str(error)}" 754 755 # Check if this was a structured invocation (from gateway, workflow, or agent) 756 reply_to_topic = workflow_context.a2a_context.get("replyToTopic", "") 757 is_structured_invocation = workflow_context.a2a_context.get( 758 "is_structured_invocation", False 759 ) or "/agent/response/" in reply_to_topic 760 761 # Build message parts 762 message_parts = [] 763 764 if is_structured_invocation: 765 # When invoked as structured invocation, include StructuredInvocationResult 766 # so the caller (gateway, workflow, agent) can process the error response 767 from ..common.data_parts import ArtifactRef, StructuredInvocationResult 768 invocation_result = StructuredInvocationResult( 769 type="structured_invocation_result", 770 status="error", 771 error_message=str(error), 772 output_artifact_ref=ArtifactRef( 773 name=output_artifact_name, version=artifact_version 774 ) if artifact_version is not None else None, 775 ) 776 message_parts.append(a2a.create_data_part(data=invocation_result.model_dump())) 777 778 # Add text part 779 message_parts.append(a2a.create_text_part(text=response_text)) 780 781 # Create final task response 782 final_task = a2a.create_final_task( 783 task_id=workflow_context.a2a_context["logical_task_id"], 784 context_id=workflow_context.a2a_context["session_id"], 785 final_status=a2a.create_task_status( 786 state=TaskState.failed, 787 message=a2a.create_agent_parts_message(parts=message_parts), 788 ), 789 metadata={ 790 "workflow_name": self.workflow_name, 791 "agent_name": self.workflow_name, # For compatibility with peer agent response handling 792 "produced_artifacts": produced_artifacts, 793 }, 794 ) 795 796 # Publish response 797 response_topic = workflow_context.a2a_context.get( 798 "replyToTopic" 799 ) or a2a.get_client_response_topic( 800 self.namespace, workflow_context.a2a_context["client_id"] 801 ) 802 803 response = a2a.create_success_response( 804 result=final_task, 805 request_id=workflow_context.a2a_context["jsonrpc_request_id"], 806 ) 807 808 self.publish_a2a_message( 809 payload=response.model_dump(exclude_none=True), 810 topic=response_topic, 811 user_properties={ 812 "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {}) 813 }, 814 ) 815 816 # ACK original message (we handled the error gracefully) 817 original_message = workflow_context.get_original_solace_message() 818 if original_message: 819 original_message.call_acknowledgements() 820 821 await self._cleanup_workflow_state(workflow_context) 822 823 async def finalize_workflow_cancelled( 824 self, workflow_context: WorkflowExecutionContext 825 ): 826 """Finalize cancelled workflow execution and publish cancellation status.""" 827 log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]" 828 log.info(f"{log_id} Finalizing workflow cancellation") 829 830 # Execute exit handlers (passing cancellation info) 831 await self._execute_exit_handlers(workflow_context, "cancelled") 832 833 # Publish cancellation event 834 await self.publish_workflow_event( 835 workflow_context, 836 WorkflowExecutionResultData( 837 type="workflow_execution_result", 838 status="cancelled", 839 error_message="Workflow was cancelled", 840 ), 841 ) 842 843 # Create final task response with cancelled state 844 final_task = a2a.create_final_task( 845 task_id=workflow_context.a2a_context["logical_task_id"], 846 context_id=workflow_context.a2a_context["session_id"], 847 final_status=a2a.create_task_status( 848 state=TaskState.canceled, 849 message=a2a.create_agent_text_message( 850 text="Workflow was cancelled" 851 ), 852 ), 853 metadata={ 854 "workflow_name": self.workflow_name, 855 "agent_name": self.workflow_name, 856 }, 857 ) 858 859 # Publish response 860 response_topic = workflow_context.a2a_context.get( 861 "replyToTopic" 862 ) or a2a.get_client_response_topic( 863 self.namespace, workflow_context.a2a_context["client_id"] 864 ) 865 866 response = a2a.create_success_response( 867 result=final_task, 868 request_id=workflow_context.a2a_context["jsonrpc_request_id"], 869 ) 870 871 self.publish_a2a_message( 872 payload=response.model_dump(exclude_none=True), 873 topic=response_topic, 874 user_properties={ 875 "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {}) 876 }, 877 ) 878 879 # ACK original message 880 original_message = workflow_context.get_original_solace_message() 881 if original_message: 882 original_message.call_acknowledgements() 883 884 await self._cleanup_workflow_state(workflow_context) 885 log.info(f"{log_id} Workflow cancellation finalized") 886 887 async def _construct_final_output( 888 self, workflow_context: WorkflowExecutionContext 889 ) -> Dict[str, Any]: 890 """Construct final output from workflow state using output mapping.""" 891 mapping = self.workflow_definition.output_mapping 892 state = workflow_context.workflow_state 893 894 final_output = {} 895 for key, value_def in mapping.items(): 896 final_output[key] = self.dag_executor.resolve_value(value_def, state) 897 898 return final_output 899 900 async def _update_workflow_state( 901 self, 902 workflow_context: WorkflowExecutionContext, 903 workflow_state: WorkflowExecutionState, 904 ): 905 """Persist workflow state to session service.""" 906 session = await self._get_workflow_session(workflow_context) 907 session.state["workflow_execution"] = workflow_state.model_dump() 908 # Note: Session state is persisted automatically by the SessionService 909 # when managed through ADK operations (get_session, append_event, etc.) 910 911 async def _cleanup_workflow_state(self, workflow_context: WorkflowExecutionContext): 912 """Clean up workflow state on completion.""" 913 # Set TTL on session state for auto-cleanup 914 session = await self._get_workflow_session(workflow_context) 915 916 # Mark workflow complete 917 state = workflow_context.workflow_state 918 state.metadata["completion_time"] = datetime.now(timezone.utc).isoformat() 919 state.metadata["status"] = "completed" 920 921 session.state["workflow_execution"] = state.model_dump() 922 # Note: Session state is persisted automatically by the SessionService 923 924 # Clean up any remaining cache entries for timeout tracking 925 # These should normally be removed when nodes complete, but this is a safety net 926 for sub_task_id in workflow_context.get_all_sub_task_ids(): 927 self.cache_service.remove_data(sub_task_id) 928 929 # Remove from active workflows 930 with self.active_workflows_lock: 931 self.active_workflows.pop(workflow_context.workflow_task_id, None) 932 933 async def _get_workflow_session(self, workflow_context: WorkflowExecutionContext): 934 """Retrieve the ADK session for the workflow.""" 935 return await self.session_service.get_session( 936 app_name=self.workflow_name, 937 user_id=workflow_context.a2a_context["user_id"], 938 session_id=workflow_context.a2a_context["session_id"], 939 ) 940 941 async def _load_node_output( 942 self, 943 node_id: str, 944 artifact_name: str, 945 artifact_version: int, 946 workflow_context: WorkflowExecutionContext, 947 sub_task_id: Optional[str] = None, 948 ) -> Any: 949 """Load a node's output artifact. 950 951 Artifacts are namespace-scoped by the ScopedArtifactServiceWrapper, 952 so the app_name parameter is automatically transformed to the namespace 953 when artifact_scope is "namespace". This allows all agents and workflows 954 in the same namespace to access the same artifact store. 955 956 We use the parent workflow session ID to load artifacts, as agents are expected 957 to save their outputs to the shared parent session scope. 958 """ 959 user_id = workflow_context.a2a_context["user_id"] 960 # Use the parent session ID (caller's session) to ensure artifacts are shared/persisted 961 workflow_session_id = workflow_context.a2a_context["session_id"] 962 963 # If sub_task_id is not provided, look it up from the node_id 964 if not sub_task_id: 965 sub_task_id = workflow_context.get_sub_task_for_node(node_id) 966 if not sub_task_id: 967 raise ValueError(f"No sub-task ID found for node {node_id}") 968 969 # The app_name doesn't matter in namespace mode - the ScopedArtifactServiceWrapper 970 # will replace it with self.namespace. But we pass workflow_name for consistency. 971 artifact = await self.artifact_service.load_artifact( 972 app_name=self.workflow_name, 973 user_id=user_id, 974 session_id=workflow_session_id, 975 filename=artifact_name, 976 version=artifact_version, 977 ) 978 979 if not artifact or not artifact.inline_data: 980 raise ValueError( 981 f"Artifact {artifact_name} v{artifact_version} not found in session {workflow_session_id}" 982 ) 983 984 return json.loads(artifact.inline_data.data.decode("utf-8")) 985 986 def cleanup(self): 987 """Clean up resources on component shutdown.""" 988 log.info(f"{self.log_identifier} Cleaning up workflow executor") 989 990 # Cancel active workflows 991 with self.active_workflows_lock: 992 for workflow_context in self.active_workflows.values(): 993 workflow_context.cancel() 994 self.active_workflows.clear() 995 996 # Call base class cleanup (stops async loop) 997 super().cleanup()