/ src / solace_agent_mesh / workflow / component.py
component.py
  1  """
  2  WorkflowExecutorComponent implementation.
  3  Orchestrates workflow execution by coordinating agents.
  4  """
  5  
  6  import logging
  7  import threading
  8  import uuid
  9  import asyncio
 10  import json
 11  from datetime import datetime, timezone
 12  from typing import Any, Dict, Optional, TYPE_CHECKING
 13  
 14  from solace_ai_connector.common.message import Message as SolaceMessage
 15  from solace_ai_connector.common.event import Event, EventType
 16  
 17  from ..common import a2a
 18  from ..common.sac.sam_component_base import SamComponentBase
 19  from ..common.agent_registry import AgentRegistry
 20  from ..common.constants import (
 21      ARTIFACT_TAG_WORKING,
 22      EXTENSION_URI_AGENT_TYPE,
 23      EXTENSION_URI_SCHEMAS,
 24  )
 25  
 26  EXTENSION_URI_WORKFLOW_VISUALIZATION = "https://solace.com/a2a/extensions/sam/workflow-visualization"
 27  from ..common.data_parts import (
 28      WorkflowExecutionStartData,
 29      WorkflowExecutionResultData,
 30      WorkflowNodeExecutionStartData,
 31      WorkflowNodeExecutionResultData,
 32      WorkflowMapProgressData,
 33      ArtifactRef,
 34  )
 35  from ..agent.adk.services import (
 36      initialize_session_service,
 37      initialize_artifact_service,
 38  )
 39  from ..agent.utils.artifact_helpers import save_artifact_with_metadata
 40  from .app import WorkflowDefinition
 41  from .workflow_execution_context import WorkflowExecutionContext, WorkflowExecutionState
 42  from .dag_executor import DAGExecutor, WorkflowNodeFailureError
 43  from .agent_caller import AgentCaller
 44  from .protocol.event_handlers import (
 45      handle_task_request,
 46      handle_agent_response,
 47      handle_cancel_request,
 48      handle_agent_card_message,
 49  )
 50  
 51  from a2a.types import (
 52      A2ARequest,
 53      AgentCard,
 54      AgentCapabilities,
 55      TaskState,
 56      TaskStatusUpdateEvent,
 57  )
 58  
 59  log = logging.getLogger(__name__)
 60  
 61  info = {
 62      "class_name": "WorkflowExecutorComponent",
 63      "description": "Orchestrates workflow execution by coordinating agents.",
 64      "config_parameters": [],
 65  }
 66  
 67  
 68  class WorkflowExecutorComponent(SamComponentBase):
 69      """
 70      Orchestrates workflow execution by coordinating agents.
 71  
 72      Extends SamComponentBase to leverage:
 73      - Dedicated asyncio event loop
 74      - A2A message publishing infrastructure
 75      - Component lifecycle management
 76      """
 77  
 78      def __init__(self, **kwargs):
 79          """
 80          Initialize workflow executor component.
 81          """
 82          if "component_config" in kwargs and "app_config" in kwargs["component_config"]:
 83              name = kwargs["component_config"]["app_config"].get("name")
 84              if name:
 85                  kwargs.setdefault("name", name)
 86  
 87          super().__init__(info, **kwargs)
 88  
 89          # Configuration
 90          self.workflow_name = self.get_config("name")
 91          self.namespace = self.get_config("namespace")
 92          workflow_config = self.get_config("workflow")
 93          self.auto_summarization_config = self.get_config(
 94              "auto_summarization", {
 95                  "enabled": True,
 96                  "compaction_percentage": 0.25
 97              }
 98          )
 99  
100          # Parse workflow definition
101          self.workflow_definition = WorkflowDefinition.model_validate(workflow_config)
102  
103          # Initialize synchronous services
104          self.session_service = initialize_session_service(self)
105          self.artifact_service = initialize_artifact_service(self)
106  
107          # Initialize execution tracking
108          self.active_workflows: Dict[str, WorkflowExecutionContext] = {}
109          self.active_workflows_lock = threading.Lock()
110  
111          # Initialize executor components
112          self.dag_executor = DAGExecutor(self.workflow_definition, self)
113          self.agent_caller = AgentCaller(self)
114  
115          # Create agent registry for agent discovery
116          self.agent_registry = AgentRegistry()
117  
118      def invoke(self, message: SolaceMessage, data: dict) -> dict:
119          """Placeholder invoke method. Logic in process_event."""
120          return None
121  
122      def _get_component_id(self) -> str:
123          """Returns the workflow name as the component identifier."""
124          return self.workflow_name
125  
126      def _get_component_type(self) -> str:
127          """Returns 'workflow' as the component type."""
128          return "workflow"
129  
130      async def _handle_message_async(self, message: SolaceMessage, topic: str) -> None:
131          """
132          Async handler for incoming messages.
133          """
134          # Determine message type based on topic
135          request_topic = a2a.get_agent_request_topic(self.namespace, self.workflow_name)
136          discovery_topic = a2a.get_discovery_subscription_topic(self.namespace)
137          response_sub = a2a.get_agent_response_subscription_topic(
138              self.namespace, self.workflow_name
139          )
140          status_sub = a2a.get_agent_status_subscription_topic(
141              self.namespace, self.workflow_name
142          )
143          if topic == request_topic:
144              # Check if this is a cancel request or a regular task request
145              try:
146                  payload = message.get_payload()
147                  method = payload.get("method") if isinstance(payload, dict) else None
148                  if method == "tasks/cancel":
149                      task_id = a2a.get_task_id_from_cancel_request(
150                          A2ARequest.model_validate(payload)
151                      )
152                      await handle_cancel_request(self, task_id, message)
153                  else:
154                      await handle_task_request(self, message)
155              except Exception as e:
156                  log.error(f"{self.log_identifier} Error processing request: {e}")
157                  message.call_acknowledgements()
158          elif a2a.topic_matches_subscription(topic, discovery_topic):
159              handle_agent_card_message(self, message)
160          elif a2a.topic_matches_subscription(
161              topic, response_sub
162          ) or a2a.topic_matches_subscription(topic, status_sub):
163              await handle_agent_response(self, message)
164          else:
165              log.warning(f"{self.log_identifier} Unknown topic: {topic}")
166              message.call_acknowledgements()
167  
168      async def _async_setup_and_run(self) -> None:
169          """
170          Async initialization called by SamComponentBase.
171          Sets up services and publishes workflow agent card.
172          """
173          # Set up periodic agent card publishing
174          self._setup_periodic_agent_card_publishing()
175  
176          # Component is now ready to receive requests
177          log.info(f"{self.log_identifier} Workflow ready: {self.workflow_name}")
178  
179      def _pre_async_cleanup(self) -> None:
180          """Pre-cleanup before async loop stops."""
181          pass
182  
183      def _setup_periodic_agent_card_publishing(self) -> None:
184          """
185          Sets up periodic publishing of the workflow agent card.
186          Similar to SamAgentComponent's _publish_agent_card method.
187          """
188          try:
189              publish_config = self.get_config("agent_card_publishing", {})
190              publish_interval_sec = publish_config.get("interval_seconds")
191  
192              if publish_interval_sec and publish_interval_sec > 0:
193                  log.info(
194                      f"{self.log_identifier} Scheduling workflow agent card publishing "
195                      f"every {publish_interval_sec} seconds."
196                  )
197  
198                  # Publish immediately on first call
199                  self._publish_workflow_agent_card_sync()
200  
201                  # Set up periodic timer
202                  self.add_timer(
203                      delay_ms=publish_interval_sec * 1000,
204                      timer_id="workflow_agent_card_publish",
205                      interval_ms=publish_interval_sec * 1000,
206                      callback=lambda timer_data: self._publish_workflow_agent_card_sync(),
207                  )
208              else:
209                  log.warning(
210                      f"{self.log_identifier} Workflow agent card publishing interval not "
211                      f"configured or invalid, card will not be published periodically."
212                  )
213          except Exception as e:
214              log.exception(
215                  f"{self.log_identifier} Error during agent card publishing setup: {e}"
216              )
217  
218      def _publish_workflow_agent_card_sync(self) -> None:
219          """
220          Synchronous wrapper for publishing workflow agent card.
221          Called by timer callback.
222          """
223          try:
224              agent_card = self._create_workflow_agent_card()
225              discovery_topic = a2a.get_agent_discovery_topic(self.namespace)
226              self.publish_a2a_message(
227                  payload=agent_card.model_dump(exclude_none=True), topic=discovery_topic
228              )
229              log.debug(
230                  f"{self.log_identifier} Published workflow agent card to {discovery_topic}"
231              )
232          except Exception as e:
233              log.error(
234                  f"{self.log_identifier} Failed to publish workflow agent card: {e}"
235              )
236  
237      def _get_workflow_config_json(self) -> Dict[str, Any]:
238          """Get workflow configuration as a JSON-serializable dict."""
239          nodes_json = []
240          for node in self.workflow_definition.nodes:
241              node_dict = {
242                  "id": node.id,
243                  "type": node.type,
244              }
245              if node.depends_on:
246                  node_dict["depends_on"] = node.depends_on
247  
248              # Add type-specific fields
249              if node.type == "agent":
250                  node_dict["agent_name"] = node.agent_name
251                  if node.input:
252                      node_dict["input"] = node.input
253                  if node.instruction:
254                      node_dict["instruction"] = node.instruction
255                  if node.input_schema_override:
256                      node_dict["input_schema_override"] = node.input_schema_override
257                  if node.output_schema_override:
258                      node_dict["output_schema_override"] = node.output_schema_override
259              elif node.type == "switch":
260                  node_dict["cases"] = [
261                      {"condition": c.condition, "node": c.node}
262                      for c in node.cases
263                  ]
264                  if node.default:
265                      node_dict["default"] = node.default
266              elif node.type == "map":
267                  node_dict["node"] = node.node
268                  node_dict["items"] = node.items
269              elif node.type == "loop":
270                  node_dict["node"] = node.node
271                  if node.condition:
272                      node_dict["condition"] = node.condition
273                  if node.max_iterations:
274                      node_dict["max_iterations"] = node.max_iterations
275                  if node.delay:
276                      node_dict["delay"] = node.delay
277              elif node.type == "workflow":
278                  node_dict["workflow_name"] = node.workflow_name
279                  if node.input:
280                      node_dict["input"] = node.input
281                  if node.instruction:
282                      node_dict["instruction"] = node.instruction
283                  if node.input_schema_override:
284                      node_dict["input_schema_override"] = node.input_schema_override
285                  if node.output_schema_override:
286                      node_dict["output_schema_override"] = node.output_schema_override
287  
288              nodes_json.append(node_dict)
289  
290          config = {
291              "nodes": nodes_json,
292          }
293  
294          if self.workflow_definition.description:
295              config["description"] = self.workflow_definition.description
296          if self.workflow_definition.input_schema:
297              config["input_schema"] = self.workflow_definition.input_schema
298          if self.workflow_definition.output_schema:
299              config["output_schema"] = self.workflow_definition.output_schema
300          if self.workflow_definition.version:
301              config["version"] = self.workflow_definition.version
302          if self.workflow_definition.output_mapping:
303              config["output_mapping"] = self.workflow_definition.output_mapping
304  
305          return config
306  
307      def _create_workflow_agent_card(self) -> AgentCard:
308          """Create the workflow agent card."""
309          # Build extensions list
310          extensions_list = []
311  
312          from a2a.types import AgentExtension
313  
314          # Add agent type extension
315          agent_type_extension = AgentExtension(
316              uri=EXTENSION_URI_AGENT_TYPE,
317              description="Specifies the type of agent (e.g., 'workflow').",
318              params={"type": "workflow"},
319          )
320          extensions_list.append(agent_type_extension)
321  
322          # Add schema extension if schemas are defined
323          input_schema = self.workflow_definition.input_schema
324          output_schema = self.workflow_definition.output_schema
325  
326          if input_schema or output_schema:
327              schema_params = {}
328              if input_schema:
329                  schema_params["input_schema"] = input_schema
330              if output_schema:
331                  schema_params["output_schema"] = output_schema
332  
333              schemas_extension = AgentExtension(
334                  uri=EXTENSION_URI_SCHEMAS,
335                  description="Input and output JSON schemas for the workflow.",
336                  params=schema_params,
337              )
338              extensions_list.append(schemas_extension)
339  
340          # Add workflow configuration extension
341          try:
342              workflow_config = self._get_workflow_config_json()
343              viz_extension = AgentExtension(
344                  uri=EXTENSION_URI_WORKFLOW_VISUALIZATION,
345                  description="JSON configuration of the workflow.",
346                  params={"workflow_config": workflow_config},
347              )
348              extensions_list.append(viz_extension)
349          except Exception as e:
350              log.warning(
351                  f"{self.log_identifier} Failed to generate workflow configuration: {e}"
352              )
353  
354          capabilities = AgentCapabilities(
355              streaming=False,
356              extensions=extensions_list if extensions_list else None,
357          )
358  
359          return AgentCard(
360              name=self.workflow_name,
361              display_name=self.get_config("display_name"),
362              description=self.workflow_definition.description,
363              defaultInputModes=["text"],
364              defaultOutputModes=["text"],
365              skills=self.workflow_definition.skills or [],
366              capabilities=capabilities,
367              version=self.workflow_definition.version,
368              url=f"solace:{a2a.get_agent_request_topic(self.namespace, self.workflow_name)}",
369          )
370  
371      async def handle_cache_expiry_event(self, cache_data: Dict[str, Any]):
372          """Handle agent call timeout via cache expiry."""
373          sub_task_id = cache_data.get("key")
374          workflow_task_id = cache_data.get("expired_data")
375  
376          if not sub_task_id or not workflow_task_id:
377              return
378  
379          # Find workflow context
380          with self.active_workflows_lock:
381              workflow_context = self.active_workflows.get(workflow_task_id)
382  
383          if not workflow_context:
384              log.warning(
385                  f"{self.log_identifier} Timeout for unknown workflow: {workflow_task_id}"
386              )
387              return
388  
389          # Get node ID for this sub-task
390          node_id = workflow_context.get_node_id_for_sub_task(sub_task_id)
391  
392          if not node_id:
393              return
394  
395          timeout_seconds = self.get_config("default_node_timeout_seconds", 300)
396          log.error(
397              f"{self.log_identifier} Agent call timed out for node '{node_id}' "
398              f"(sub-task: {sub_task_id})"
399          )
400  
401          # Create timeout error
402          from ..common.data_parts import StructuredInvocationResult
403  
404          result_data = StructuredInvocationResult(
405              type="structured_invocation_result",
406              status="error",
407              error_message=f"Agent timed out after {timeout_seconds} seconds",
408          )
409  
410          # Handle as node failure
411          await self.dag_executor.handle_node_completion(
412              workflow_context, sub_task_id, result_data
413          )
414  
415      async def publish_workflow_event(
416          self,
417          workflow_context: WorkflowExecutionContext,
418          event_data: Any,
419      ):
420          """Publish a workflow status event."""
421          try:
422              status_update_event = a2a.create_data_signal_event(
423                  task_id=workflow_context.a2a_context["logical_task_id"],
424                  context_id=workflow_context.a2a_context["session_id"],
425                  signal_data=event_data,
426                  agent_name=self.workflow_name,
427              )
428  
429              rpc_response = a2a.create_success_response(
430                  result=status_update_event,
431                  request_id=workflow_context.a2a_context["jsonrpc_request_id"],
432              )
433  
434              target_topic = workflow_context.a2a_context.get(
435                  "statusTopic"
436              ) or a2a.get_gateway_status_topic(
437                  self.namespace,
438                  "gateway",  # Placeholder, usually gateway_id
439                  workflow_context.a2a_context["logical_task_id"],
440              )
441  
442              self.publish_a2a_message(
443                  payload=rpc_response.model_dump(exclude_none=True),
444                  topic=target_topic,
445                  user_properties={
446                      "a2aUserConfig": workflow_context.a2a_context.get(
447                          "a2a_user_config", {}
448                      )
449                  },
450              )
451          except Exception as e:
452              log.error(f"{self.log_identifier} Failed to publish workflow event: {e}")
453  
454      async def _execute_exit_handlers(
455          self,
456          workflow_context: WorkflowExecutionContext,
457          outcome: str,
458          error: Exception = None,
459      ):
460          """
461          Execute exit handlers (onExit) based on workflow outcome.
462  
463          Args:
464              workflow_context: The workflow context
465              outcome: "success" or "failure"
466              error: The error if outcome is "failure"
467          """
468          log_id = f"{self.log_identifier}[ExitHandler:{workflow_context.workflow_task_id}]"
469  
470          on_exit = self.workflow_definition.on_exit
471          if not on_exit:
472              return
473  
474          workflow_state = workflow_context.workflow_state
475  
476          # Inject workflow status and error into node_outputs for template resolution
477          workflow_state.node_outputs["workflow"] = {
478              "status": outcome,
479              "error": {
480                  "message": str(error) if error else None,
481                  "node_id": (
482                      workflow_state.error_state.get("failed_node_id")
483                      if workflow_state.error_state
484                      else None
485                  ),
486              }
487              if error
488              else None,
489          }
490  
491          # Determine which handlers to run
492          nodes_to_run = []
493          if isinstance(on_exit, str):
494              # Simple string reference to a single node
495              nodes_to_run.append(on_exit)
496          else:
497              # ExitHandler object with conditional handlers
498              if on_exit.always:
499                  nodes_to_run.append(on_exit.always)
500              if outcome == "success" and on_exit.on_success:
501                  nodes_to_run.append(on_exit.on_success)
502              if outcome == "failure" and on_exit.on_failure:
503                  nodes_to_run.append(on_exit.on_failure)
504              if outcome == "cancelled" and on_exit.on_cancel:
505                  nodes_to_run.append(on_exit.on_cancel)
506  
507          # Execute each exit handler node
508          for node_id in nodes_to_run:
509              try:
510                  log.info(f"{log_id} Executing exit handler node '{node_id}'")
511                  await self.dag_executor.execute_node(
512                      node_id, workflow_state, workflow_context
513                  )
514              except Exception as e:
515                  # Log but don't fail the workflow - exit handlers shouldn't break finalization
516                  log.error(
517                      f"{log_id} Exit handler node '{node_id}' failed: {e}. "
518                      "Continuing with finalization."
519                  )
520  
521      async def finalize_workflow_success(
522          self, workflow_context: WorkflowExecutionContext
523      ):
524          """Finalize successful workflow execution and publish result."""
525          log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]"
526          log.info(f"{log_id} Finalizing workflow success")
527  
528          # Execute exit handlers first
529          await self._execute_exit_handlers(workflow_context, "success")
530  
531          # Construct final output based on output mapping
532          final_output = await self._construct_final_output(workflow_context)
533  
534          # Create output artifact with the workflow result
535          # Use unique filename: <workflow_name>_<4-digit-uuid>_result.json
536          unique_suffix = uuid.uuid4().hex[:4]
537          output_artifact_name = f"{self.workflow_name}_{unique_suffix}_result.json"
538  
539          user_id = workflow_context.a2a_context["user_id"]
540          session_id = workflow_context.a2a_context["session_id"]
541  
542          # Prepare artifact content - store just the output data
543          # (not wrapped in {"status": ..., "output": ...} to match agent artifact format)
544          content_bytes = json.dumps(final_output).encode("utf-8")
545  
546          # Get output schema from workflow definition for artifact metadata
547          output_schema = self.workflow_definition.output_schema
548          metadata_dict = {
549              "description": f"Output from workflow '{self.workflow_name}'",
550              "source": "workflow_execution",
551              "workflow_name": self.workflow_name,
552          }
553          if output_schema:
554              metadata_dict["schema"] = output_schema
555  
556          try:
557              save_result = await save_artifact_with_metadata(
558                  artifact_service=self.artifact_service,
559                  app_name=self.workflow_name,
560                  user_id=user_id,
561                  session_id=session_id,
562                  filename=output_artifact_name,
563                  content_bytes=content_bytes,
564                  mime_type="application/json",
565                  metadata_dict=metadata_dict,
566                  timestamp=datetime.now(timezone.utc),
567                  tags=[ARTIFACT_TAG_WORKING],
568              )
569  
570              if save_result["status"] != "success":
571                  log.error(f"{log_id} Failed to save workflow output artifact: {save_result.get('message')}")
572                  artifact_version = None
573              else:
574                  artifact_version = save_result.get("data_version")
575                  log.info(f"{log_id} Created workflow output artifact: {output_artifact_name} v{artifact_version}")
576          except Exception as e:
577              log.exception(f"{log_id} Error saving workflow output artifact: {e}")
578              artifact_version = None
579  
580          # Publish completion event
581          await self.publish_workflow_event(
582              workflow_context,
583              WorkflowExecutionResultData(
584                  type="workflow_execution_result",
585                  status="success",
586                  workflow_output=final_output,
587              ),
588          )
589  
590          # Build produced_artifacts list for the response
591          produced_artifacts = []
592          if artifact_version is not None:
593              produced_artifacts.append({
594                  "filename": output_artifact_name,
595                  "version": artifact_version,
596              })
597  
598          # Create response message text that includes artifact reference
599          if artifact_version is not None:
600              response_text = f"Workflow completed successfully. Output artifact: {output_artifact_name}:v{artifact_version}"
601          else:
602              response_text = "Workflow completed successfully"
603  
604          # Check if this workflow was invoked by another workflow (structured invocation)
605          # Check if this was a structured invocation (from gateway, workflow, or agent)
606          # This is indicated by either:
607          # 1. The is_structured_invocation flag set during request handling, OR
608          # 2. Legacy check: replyToTopic containing '/agent/response/'
609          reply_to_topic = workflow_context.a2a_context.get("replyToTopic", "")
610          is_structured_invocation = workflow_context.a2a_context.get(
611              "is_structured_invocation", False
612          ) or "/agent/response/" in reply_to_topic
613  
614          # Build message parts
615          message_parts = []
616  
617          if is_structured_invocation and artifact_version is not None:
618              # When invoked as a sub-workflow, include StructuredInvocationResult
619              # so the parent workflow can process the response
620              from ..common.data_parts import ArtifactRef, StructuredInvocationResult
621              invocation_result = StructuredInvocationResult(
622                  type="structured_invocation_result",
623                  status="success",
624                  output_artifact_ref=ArtifactRef(
625                      name=output_artifact_name, version=artifact_version
626                  ),
627              )
628              message_parts.append(a2a.create_data_part(data=invocation_result.model_dump()))
629  
630          # Add text part
631          message_parts.append(a2a.create_text_part(text=response_text))
632  
633          # Create final task response
634          final_task = a2a.create_final_task(
635              task_id=workflow_context.a2a_context["logical_task_id"],
636              context_id=workflow_context.a2a_context["session_id"],
637              final_status=a2a.create_task_status(
638                  state=TaskState.completed,
639                  message=a2a.create_agent_parts_message(parts=message_parts),
640              ),
641              metadata={
642                  "workflow_name": self.workflow_name,
643                  "agent_name": self.workflow_name,  # For compatibility with peer agent response handling
644                  "output": final_output,
645                  "produced_artifacts": produced_artifacts,
646              },
647          )
648  
649          # Publish response
650          response_topic = workflow_context.a2a_context.get(
651              "replyToTopic"
652          ) or a2a.get_client_response_topic(
653              self.namespace, workflow_context.a2a_context["client_id"]
654          )
655  
656          response = a2a.create_success_response(
657              result=final_task,
658              request_id=workflow_context.a2a_context["jsonrpc_request_id"],
659          )
660  
661          self.publish_a2a_message(
662              payload=response.model_dump(exclude_none=True),
663              topic=response_topic,
664              user_properties={
665                  "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {})
666              },
667          )
668  
669          # ACK original message
670          original_message = workflow_context.get_original_solace_message()
671          if original_message:
672              original_message.call_acknowledgements()
673  
674          await self._cleanup_workflow_state(workflow_context)
675  
676      async def finalize_workflow_failure(
677          self, workflow_context: WorkflowExecutionContext, error: Exception
678      ):
679          """Finalize failed workflow execution and publish error."""
680          log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]"
681          log.warning(f"{log_id} Finalizing workflow failure: {error}")
682  
683          # Execute exit handlers first (passing error info)
684          await self._execute_exit_handlers(workflow_context, "failure", error)
685  
686          # Create output artifact with the error information
687          # Use unique filename: <workflow_name>_<4-digit-uuid>_result.json
688          unique_suffix = uuid.uuid4().hex[:4]
689          output_artifact_name = f"{self.workflow_name}_{unique_suffix}_result.json"
690  
691          user_id = workflow_context.a2a_context["user_id"]
692          session_id = workflow_context.a2a_context["session_id"]
693  
694          # Prepare artifact content with status and error
695          artifact_content = {
696              "status": "error",
697              "message": str(error),
698          }
699          content_bytes = json.dumps(artifact_content).encode("utf-8")
700  
701          metadata_dict = {
702              "description": f"Error output from workflow '{self.workflow_name}'",
703              "source": "workflow_execution",
704              "workflow_name": self.workflow_name,
705          }
706  
707          try:
708              save_result = await save_artifact_with_metadata(
709                  artifact_service=self.artifact_service,
710                  app_name=self.workflow_name,
711                  user_id=user_id,
712                  session_id=session_id,
713                  filename=output_artifact_name,
714                  content_bytes=content_bytes,
715                  mime_type="application/json",
716                  metadata_dict=metadata_dict,
717                  timestamp=datetime.now(timezone.utc),
718                  tags=[ARTIFACT_TAG_WORKING],
719              )
720  
721              if save_result["status"] != "success":
722                  log.error(f"{log_id} Failed to save workflow error artifact: {save_result.get('message')}")
723                  artifact_version = None
724              else:
725                  artifact_version = save_result.get("data_version")
726                  log.info(f"{log_id} Created workflow error artifact: {output_artifact_name} v{artifact_version}")
727          except Exception as e:
728              log.exception(f"{log_id} Error saving workflow error artifact: {e}")
729              artifact_version = None
730  
731          # Publish failure event
732          await self.publish_workflow_event(
733              workflow_context,
734              WorkflowExecutionResultData(
735                  type="workflow_execution_result",
736                  status="error",
737                  error_message=str(error),
738              ),
739          )
740  
741          # Build produced_artifacts list for the response
742          produced_artifacts = []
743          if artifact_version is not None:
744              produced_artifacts.append({
745                  "filename": output_artifact_name,
746                  "version": artifact_version,
747              })
748  
749          # Create response message text that includes artifact reference
750          if artifact_version is not None:
751              response_text = f"Workflow failed: {str(error)}. Error artifact: {output_artifact_name}:v{artifact_version}"
752          else:
753              response_text = f"Workflow failed: {str(error)}"
754  
755          # Check if this was a structured invocation (from gateway, workflow, or agent)
756          reply_to_topic = workflow_context.a2a_context.get("replyToTopic", "")
757          is_structured_invocation = workflow_context.a2a_context.get(
758              "is_structured_invocation", False
759          ) or "/agent/response/" in reply_to_topic
760  
761          # Build message parts
762          message_parts = []
763  
764          if is_structured_invocation:
765              # When invoked as structured invocation, include StructuredInvocationResult
766              # so the caller (gateway, workflow, agent) can process the error response
767              from ..common.data_parts import ArtifactRef, StructuredInvocationResult
768              invocation_result = StructuredInvocationResult(
769                  type="structured_invocation_result",
770                  status="error",
771                  error_message=str(error),
772                  output_artifact_ref=ArtifactRef(
773                      name=output_artifact_name, version=artifact_version
774                  ) if artifact_version is not None else None,
775              )
776              message_parts.append(a2a.create_data_part(data=invocation_result.model_dump()))
777  
778          # Add text part
779          message_parts.append(a2a.create_text_part(text=response_text))
780  
781          # Create final task response
782          final_task = a2a.create_final_task(
783              task_id=workflow_context.a2a_context["logical_task_id"],
784              context_id=workflow_context.a2a_context["session_id"],
785              final_status=a2a.create_task_status(
786                  state=TaskState.failed,
787                  message=a2a.create_agent_parts_message(parts=message_parts),
788              ),
789              metadata={
790                  "workflow_name": self.workflow_name,
791                  "agent_name": self.workflow_name,  # For compatibility with peer agent response handling
792                  "produced_artifacts": produced_artifacts,
793              },
794          )
795  
796          # Publish response
797          response_topic = workflow_context.a2a_context.get(
798              "replyToTopic"
799          ) or a2a.get_client_response_topic(
800              self.namespace, workflow_context.a2a_context["client_id"]
801          )
802  
803          response = a2a.create_success_response(
804              result=final_task,
805              request_id=workflow_context.a2a_context["jsonrpc_request_id"],
806          )
807  
808          self.publish_a2a_message(
809              payload=response.model_dump(exclude_none=True),
810              topic=response_topic,
811              user_properties={
812                  "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {})
813              },
814          )
815  
816          # ACK original message (we handled the error gracefully)
817          original_message = workflow_context.get_original_solace_message()
818          if original_message:
819              original_message.call_acknowledgements()
820  
821          await self._cleanup_workflow_state(workflow_context)
822  
823      async def finalize_workflow_cancelled(
824          self, workflow_context: WorkflowExecutionContext
825      ):
826          """Finalize cancelled workflow execution and publish cancellation status."""
827          log_id = f"{self.log_identifier}[Workflow:{workflow_context.workflow_task_id}]"
828          log.info(f"{log_id} Finalizing workflow cancellation")
829  
830          # Execute exit handlers (passing cancellation info)
831          await self._execute_exit_handlers(workflow_context, "cancelled")
832  
833          # Publish cancellation event
834          await self.publish_workflow_event(
835              workflow_context,
836              WorkflowExecutionResultData(
837                  type="workflow_execution_result",
838                  status="cancelled",
839                  error_message="Workflow was cancelled",
840              ),
841          )
842  
843          # Create final task response with cancelled state
844          final_task = a2a.create_final_task(
845              task_id=workflow_context.a2a_context["logical_task_id"],
846              context_id=workflow_context.a2a_context["session_id"],
847              final_status=a2a.create_task_status(
848                  state=TaskState.canceled,
849                  message=a2a.create_agent_text_message(
850                      text="Workflow was cancelled"
851                  ),
852              ),
853              metadata={
854                  "workflow_name": self.workflow_name,
855                  "agent_name": self.workflow_name,
856              },
857          )
858  
859          # Publish response
860          response_topic = workflow_context.a2a_context.get(
861              "replyToTopic"
862          ) or a2a.get_client_response_topic(
863              self.namespace, workflow_context.a2a_context["client_id"]
864          )
865  
866          response = a2a.create_success_response(
867              result=final_task,
868              request_id=workflow_context.a2a_context["jsonrpc_request_id"],
869          )
870  
871          self.publish_a2a_message(
872              payload=response.model_dump(exclude_none=True),
873              topic=response_topic,
874              user_properties={
875                  "a2aUserConfig": workflow_context.a2a_context.get("a2a_user_config", {})
876              },
877          )
878  
879          # ACK original message
880          original_message = workflow_context.get_original_solace_message()
881          if original_message:
882              original_message.call_acknowledgements()
883  
884          await self._cleanup_workflow_state(workflow_context)
885          log.info(f"{log_id} Workflow cancellation finalized")
886  
887      async def _construct_final_output(
888          self, workflow_context: WorkflowExecutionContext
889      ) -> Dict[str, Any]:
890          """Construct final output from workflow state using output mapping."""
891          mapping = self.workflow_definition.output_mapping
892          state = workflow_context.workflow_state
893  
894          final_output = {}
895          for key, value_def in mapping.items():
896              final_output[key] = self.dag_executor.resolve_value(value_def, state)
897  
898          return final_output
899  
900      async def _update_workflow_state(
901          self,
902          workflow_context: WorkflowExecutionContext,
903          workflow_state: WorkflowExecutionState,
904      ):
905          """Persist workflow state to session service."""
906          session = await self._get_workflow_session(workflow_context)
907          session.state["workflow_execution"] = workflow_state.model_dump()
908          # Note: Session state is persisted automatically by the SessionService
909          # when managed through ADK operations (get_session, append_event, etc.)
910  
911      async def _cleanup_workflow_state(self, workflow_context: WorkflowExecutionContext):
912          """Clean up workflow state on completion."""
913          # Set TTL on session state for auto-cleanup
914          session = await self._get_workflow_session(workflow_context)
915  
916          # Mark workflow complete
917          state = workflow_context.workflow_state
918          state.metadata["completion_time"] = datetime.now(timezone.utc).isoformat()
919          state.metadata["status"] = "completed"
920  
921          session.state["workflow_execution"] = state.model_dump()
922          # Note: Session state is persisted automatically by the SessionService
923  
924          # Clean up any remaining cache entries for timeout tracking
925          # These should normally be removed when nodes complete, but this is a safety net
926          for sub_task_id in workflow_context.get_all_sub_task_ids():
927              self.cache_service.remove_data(sub_task_id)
928  
929          # Remove from active workflows
930          with self.active_workflows_lock:
931              self.active_workflows.pop(workflow_context.workflow_task_id, None)
932  
933      async def _get_workflow_session(self, workflow_context: WorkflowExecutionContext):
934          """Retrieve the ADK session for the workflow."""
935          return await self.session_service.get_session(
936              app_name=self.workflow_name,
937              user_id=workflow_context.a2a_context["user_id"],
938              session_id=workflow_context.a2a_context["session_id"],
939          )
940  
941      async def _load_node_output(
942          self,
943          node_id: str,
944          artifact_name: str,
945          artifact_version: int,
946          workflow_context: WorkflowExecutionContext,
947          sub_task_id: Optional[str] = None,
948      ) -> Any:
949          """Load a node's output artifact.
950  
951          Artifacts are namespace-scoped by the ScopedArtifactServiceWrapper,
952          so the app_name parameter is automatically transformed to the namespace
953          when artifact_scope is "namespace". This allows all agents and workflows
954          in the same namespace to access the same artifact store.
955  
956          We use the parent workflow session ID to load artifacts, as agents are expected
957          to save their outputs to the shared parent session scope.
958          """
959          user_id = workflow_context.a2a_context["user_id"]
960          # Use the parent session ID (caller's session) to ensure artifacts are shared/persisted
961          workflow_session_id = workflow_context.a2a_context["session_id"]
962  
963          # If sub_task_id is not provided, look it up from the node_id
964          if not sub_task_id:
965              sub_task_id = workflow_context.get_sub_task_for_node(node_id)
966              if not sub_task_id:
967                  raise ValueError(f"No sub-task ID found for node {node_id}")
968  
969          # The app_name doesn't matter in namespace mode - the ScopedArtifactServiceWrapper
970          # will replace it with self.namespace. But we pass workflow_name for consistency.
971          artifact = await self.artifact_service.load_artifact(
972              app_name=self.workflow_name,
973              user_id=user_id,
974              session_id=workflow_session_id,
975              filename=artifact_name,
976              version=artifact_version,
977          )
978  
979          if not artifact or not artifact.inline_data:
980              raise ValueError(
981                  f"Artifact {artifact_name} v{artifact_version} not found in session {workflow_session_id}"
982              )
983  
984          return json.loads(artifact.inline_data.data.decode("utf-8"))
985  
986      def cleanup(self):
987          """Clean up resources on component shutdown."""
988          log.info(f"{self.log_identifier} Cleaning up workflow executor")
989  
990          # Cancel active workflows
991          with self.active_workflows_lock:
992              for workflow_context in self.active_workflows.values():
993                  workflow_context.cancel()
994              self.active_workflows.clear()
995  
996          # Call base class cleanup (stops async loop)
997          super().cleanup()