app.py
  1  """
  2  WorkflowApp class and configuration models for Prescriptive Workflows.
  3  
  4  Supports Argo Workflows-compatible syntax with SAM extensions.
  5  """
  6  
  7  import logging
  8  from typing import Any, Dict, List, Literal, Optional, Union
  9  from pydantic import BaseModel, Field, model_validator
 10  
 11  from solace_ai_connector.flow.app import App
 12  from ..common import a2a
 13  from ..agent.sac.app import SamAgentAppConfig, AgentCardConfig, AgentCardPublishingConfig
 14  
 15  log = logging.getLogger(__name__)
 16  
 17  # --- Retry Strategy Models (Argo-compatible) ---
 18  
 19  
 20  class BackoffStrategy(BaseModel):
 21      """Exponential backoff configuration for retries."""
 22  
 23      duration: str = Field(
 24          default="1s",
 25          description="Initial backoff duration. Supports: '5s', '1m', '1h'.",
 26      )
 27      factor: float = Field(
 28          default=2.0,
 29          description="Multiplier for exponential backoff.",
 30      )
 31      max_duration: Optional[str] = Field(
 32          default=None,
 33          description="Maximum backoff duration cap.",
 34          alias="maxDuration",
 35      )
 36  
 37      class Config:
 38          populate_by_name = True
 39  
 40  
 41  class RetryStrategy(BaseModel):
 42      """
 43      Retry configuration for workflow nodes.
 44      Argo-compatible with extensions.
 45      """
 46  
 47      limit: int = Field(
 48          default=3,
 49          description="Maximum number of retry attempts.",
 50      )
 51      retry_policy: Literal["Always", "OnFailure", "OnError"] = Field(
 52          default="OnFailure",
 53          description="When to retry: Always, OnFailure, OnError.",
 54          alias="retryPolicy",
 55      )
 56      backoff: Optional[BackoffStrategy] = Field(
 57          default=None,
 58          description="Exponential backoff configuration.",
 59      )
 60  
 61      class Config:
 62          populate_by_name = True
 63  
 64  
 65  # --- Exit Handler Model ---
 66  
 67  
 68  class ExitHandler(BaseModel):
 69      """
 70      Exit handler configuration for cleanup/notification on workflow completion.
 71  
 72      Supports conditional handlers for different outcomes.
 73      """
 74  
 75      always: Optional[str] = Field(
 76          default=None,
 77          description="Node ID to execute regardless of workflow outcome.",
 78      )
 79      on_success: Optional[str] = Field(
 80          default=None,
 81          description="Node ID to execute only on successful completion.",
 82          alias="onSuccess",
 83      )
 84      on_failure: Optional[str] = Field(
 85          default=None,
 86          description="Node ID to execute only on failure.",
 87          alias="onFailure",
 88      )
 89      on_cancel: Optional[str] = Field(
 90          default=None,
 91          description="Node ID to execute only on cancellation.",
 92          alias="onCancel",
 93      )
 94  
 95      class Config:
 96          populate_by_name = True
 97  
 98  
 99  # --- Workflow Node Models ---
100  
101  
102  class WorkflowNode(BaseModel):
103      """
104      Base workflow node.
105  
106      Supports both SAM and Argo field names:
107      - depends_on / dependencies (Argo alias)
108      """
109  
110      id: str = Field(..., description="Unique node identifier")
111      type: str = Field(..., description="Node type")
112      depends_on: Optional[List[str]] = Field(
113          default=None,
114          description="List of node IDs this node depends on.",
115          alias="dependencies",
116      )
117  
118      class Config:
119          populate_by_name = True
120  
121  
122  class AgentNode(WorkflowNode):
123      """
124      Agent invocation node.
125  
126      Argo-aligned features:
127      - `when`: Conditional execution clause (Argo-style)
128      - `retryStrategy`: Retry configuration
129      - `timeout`: Node-specific timeout override
130      - `instruction`: Optional guidance text sent to the target agent
131      """
132  
133      type: Literal["agent"] = "agent"
134      agent_name: str = Field(..., description="Name of agent to invoke")
135      input: Optional[Dict[str, Any]] = Field(
136          default=None,
137          description="Input mapping. If omitted, inferred from dependencies.",
138      )
139      instruction: Optional[str] = Field(
140          default=None,
141          description=(
142              "Optional instruction/guidance text sent to the target agent. "
143              "Supports template expressions like '{{workflow.input.context}}'. "
144              "Provides context for how the agent should process the request."
145          ),
146      )
147  
148      # Optional schema overrides
149      input_schema_override: Optional[Dict[str, Any]] = None
150      output_schema_override: Optional[Dict[str, Any]] = None
151  
152      # Argo-aligned fields
153      when: Optional[str] = Field(
154          default=None,
155          description=(
156              "Conditional execution expression (Argo-style). "
157              "Node only executes if expression evaluates to true."
158          ),
159      )
160      retry_strategy: Optional[RetryStrategy] = Field(
161          default=None,
162          description="Retry configuration for this node.",
163          alias="retryStrategy",
164      )
165      timeout: Optional[str] = Field(
166          default=None,
167          description="Node-specific timeout. Format: '30s', '5m', '1h'.",
168      )
169  
170      class Config:
171          populate_by_name = True
172  
173  
174  class SwitchCase(BaseModel):
175      """A single case in a switch node."""
176  
177      condition: str = Field(
178          ...,
179          description="Expression to evaluate for this case.",
180          alias="when",
181      )
182      node: str = Field(
183          ...,
184          description="Node ID to execute if condition matches.",
185          alias="then",
186      )
187  
188      class Config:
189          populate_by_name = True
190  
191  
192  class SwitchNode(WorkflowNode):
193      """
194      Multi-way conditional branching node.
195  
196      Cases are evaluated in order; first match wins.
197      """
198  
199      type: Literal["switch"] = "switch"
200      cases: List[SwitchCase] = Field(
201          ...,
202          description="Ordered list of condition/node pairs. First match wins.",
203      )
204      default: Optional[str] = Field(
205          default=None,
206          description="Node ID to execute if no cases match.",
207      )
208  
209  
210  class LoopNode(WorkflowNode):
211      """
212      While-loop node for iterative execution until condition is met.
213  
214      Different from MapNode which is for-each iteration.
215      LoopNode repeats a node until a condition becomes false.
216      """
217  
218      type: Literal["loop"] = "loop"
219      node: str = Field(..., description="Node ID to execute repeatedly")
220      condition: str = Field(
221          ...,
222          description="Continue looping while this expression is true.",
223      )
224      max_iterations: int = Field(
225          default=100,
226          description="Safety limit on number of iterations.",
227          alias="maxIterations",
228      )
229      delay: Optional[str] = Field(
230          default=None,
231          description="Delay between loop iterations. Format: '5s', '1m', '1h', '1d'.",
232      )
233  
234      class Config:
235          populate_by_name = True
236  
237  
238  class MapNode(WorkflowNode):
239      """
240      Map (parallel iteration) node.
241  
242      Supports both SAM syntax and Argo-style withItems/withParam.
243      """
244  
245      type: Literal["map"] = "map"
246  
247      # Primary SAM field
248      items: Optional[Union[str, Dict[str, Any]]] = Field(
249          default=None,
250          description="Array template reference or expression to iterate over.",
251      )
252  
253      # Argo aliases
254      with_param: Optional[str] = Field(
255          default=None,
256          description="Argo-style: JSON array from previous step output.",
257          alias="withParam",
258      )
259      with_items: Optional[List[Any]] = Field(
260          default=None,
261          description="Argo-style: Static list of items to iterate over.",
262          alias="withItems",
263      )
264  
265      node: str = Field(..., description="Node ID to execute for each item")
266      max_items: Optional[int] = Field(
267          default=100,
268          description="Maximum items to process (safety limit).",
269          alias="maxItems",
270      )
271      concurrency_limit: Optional[int] = Field(
272          default=None,
273          description="Max concurrent executions. None means unlimited.",
274          alias="concurrencyLimit",
275      )
276  
277      class Config:
278          populate_by_name = True
279  
280      @model_validator(mode="after")
281      def validate_items_source(self) -> "MapNode":
282          """Ensure exactly one items source is provided."""
283          sources = [
284              self.items is not None,
285              self.with_param is not None,
286              self.with_items is not None,
287          ]
288          if sum(sources) == 0:
289              raise ValueError(
290                  "MapNode requires one of: 'items', 'withParam', or 'withItems'"
291              )
292          if sum(sources) > 1:
293              raise ValueError(
294                  "MapNode accepts only one of: 'items', 'withParam', or 'withItems'"
295              )
296          return self
297  
298      def get_items_expression(self) -> Union[str, List[Any], Dict[str, Any]]:
299          """Return the items source regardless of which field was used."""
300          if self.items is not None:
301              return self.items
302          if self.with_param is not None:
303              return self.with_param
304          return self.with_items
305  
306  
307  class WorkflowInvokeNode(WorkflowNode):
308      """
309      Workflow invocation node.
310  
311      Calls another workflow as a sub-workflow.
312      Since workflows register as agents, this reuses the agent calling mechanism.
313  
314      Argo-aligned features:
315      - `when`: Conditional execution clause (Argo-style)
316      - `retryStrategy`: Retry configuration
317      - `timeout`: Node-specific timeout override
318      - `instruction`: Optional guidance text sent to the target workflow
319      """
320  
321      type: Literal["workflow"] = "workflow"
322      workflow_name: str = Field(..., description="Name of workflow to invoke")
323      input: Optional[Dict[str, Any]] = Field(
324          default=None,
325          description="Input mapping. If omitted, inferred from dependencies.",
326      )
327      instruction: Optional[str] = Field(
328          default=None,
329          description=(
330              "Optional instruction/guidance text sent to the target workflow. "
331              "Supports template expressions like '{{workflow.input.context}}'."
332          ),
333      )
334  
335      # Optional schema overrides
336      input_schema_override: Optional[Dict[str, Any]] = None
337      output_schema_override: Optional[Dict[str, Any]] = None
338  
339      # Argo-aligned fields
340      when: Optional[str] = Field(
341          default=None,
342          description=(
343              "Conditional execution expression (Argo-style). "
344              "Node only executes if expression evaluates to true."
345          ),
346      )
347      retry_strategy: Optional[RetryStrategy] = Field(
348          default=None,
349          description="Retry configuration for this node.",
350          alias="retryStrategy",
351      )
352      timeout: Optional[str] = Field(
353          default=None,
354          description="Node-specific timeout. Format: '30s', '5m', '1h'.",
355      )
356  
357      class Config:
358          populate_by_name = True
359  
360  
361  # Union type for polymorphic node list
362  WorkflowNodeUnion = Union[
363      AgentNode,
364      SwitchNode,
365      LoopNode,
366      MapNode,
367      WorkflowInvokeNode,
368  ]
369  
370  
371  class WorkflowDefinition(BaseModel):
372      """
373      Complete workflow definition.
374  
375      Argo-aligned features:
376      - onExit: Exit handler for cleanup/notification
377      - failFast: Control behavior on node failure
378      - retryStrategy: Default retry strategy for all nodes
379      """
380  
381      description: str = Field(..., description="Human-readable workflow description")
382  
383      version: str = Field(
384          default="1.0.0",
385          description="User-defined version of the workflow (semantic versioning recommended)",
386      )
387  
388      input_schema: Optional[Dict[str, Any]] = Field(
389          default=None,
390          description="JSON Schema for workflow input.",
391          alias="inputSchema",
392      )
393  
394      output_schema: Optional[Dict[str, Any]] = Field(
395          default=None,
396          description="JSON Schema for workflow output.",
397          alias="outputSchema",
398      )
399  
400      nodes: List[WorkflowNodeUnion] = Field(
401          ...,
402          description="Workflow nodes (DAG vertices).",
403      )
404  
405      output_mapping: Dict[str, Any] = Field(
406          ...,
407          description="Mapping from node outputs to final workflow output.",
408          alias="outputMapping",
409      )
410  
411      skills: Optional[List[Dict[str, Any]]] = Field(
412          default=None,
413          description="Workflow skills for agent card.",
414      )
415  
416      # Argo-aligned fields
417      on_exit: Optional[Union[str, ExitHandler]] = Field(
418          default=None,
419          description=(
420              "Exit handler configuration. Can be a node ID (string) or "
421              "ExitHandler object with on_success/on_failure/on_cancel/always."
422          ),
423          alias="onExit",
424      )
425  
426      fail_fast: bool = Field(
427          default=True,
428          description=(
429              "If true, stop scheduling new nodes when one fails. "
430              "Running nodes continue to completion."
431          ),
432          alias="failFast",
433      )
434  
435      max_call_depth: int = Field(
436          default=10,
437          ge=1,
438          description=(
439              "Maximum allowed call depth for sub-workflow/agent invocations. "
440              "Prevents infinite recursion."
441          ),
442          alias="maxCallDepth",
443      )
444  
445      retry_strategy: Optional[RetryStrategy] = Field(
446          default=None,
447          description="Default retry strategy for all nodes (can be overridden per-node).",
448          alias="retryStrategy",
449      )
450  
451      class Config:
452          populate_by_name = True
453  
454      @model_validator(mode="after")
455      def validate_dag_structure(self) -> "WorkflowDefinition":
456          """Validate DAG has valid references and consistent control flow."""
457          node_map = {node.id: node for node in self.nodes}
458  
459          for node in self.nodes:
460              # Check dependencies reference valid nodes
461              if node.depends_on:
462                  for dep in node.depends_on:
463                      if dep not in node_map:
464                          raise ValueError(
465                              f"Node '{node.id}' depends on non-existent node '{dep}'"
466                          )
467  
468              # Validate Switch Node Consistency
469              if node.type == "switch":
470                  for i, case in enumerate(node.cases):
471                      self._validate_branch_dependency(
472                          node, case.node, f"cases[{i}].node", node_map
473                      )
474                  if node.default:
475                      self._validate_branch_dependency(
476                          node, node.default, "default", node_map
477                      )
478  
479              # Validate LoopNode target reference
480              if node.type == "loop":
481                  if node.node not in node_map:
482                      raise ValueError(
483                          f"LoopNode '{node.id}' references non-existent node '{node.node}'"
484                      )
485  
486              # Validate MapNode target reference
487              if node.type == "map":
488                  if node.node not in node_map:
489                      raise ValueError(
490                          f"MapNode '{node.id}' references non-existent node '{node.node}'"
491                      )
492  
493          # Validate exit handler references
494          if self.on_exit:
495              if isinstance(self.on_exit, str):
496                  if self.on_exit not in node_map:
497                      raise ValueError(
498                          f"onExit references non-existent node '{self.on_exit}'"
499                      )
500              else:
501                  for field, node_id in [
502                      ("always", self.on_exit.always),
503                      ("on_success", self.on_exit.on_success),
504                      ("on_failure", self.on_exit.on_failure),
505                      ("on_cancel", self.on_exit.on_cancel),
506                  ]:
507                      if node_id and node_id not in node_map:
508                          raise ValueError(
509                              f"onExit.{field} references non-existent node '{node_id}'"
510                          )
511  
512          return self
513  
514      def _validate_branch_dependency(
515          self,
516          parent: WorkflowNode,
517          target_id: str,
518          branch_name: str,
519          node_map: Dict[str, WorkflowNode],
520      ):
521          """Ensure target node depends on parent node."""
522          target = node_map.get(target_id)
523          if not target:
524              raise ValueError(
525                  f"Node '{parent.id}' references non-existent {branch_name} '{target_id}'"
526              )
527  
528          if not target.depends_on or parent.id not in target.depends_on:
529              raise ValueError(
530                  f"Logic Error: Node '{parent.id}' routes to '{target.id}' ({branch_name}), "
531                  f"but '{target.id}' does not list '{parent.id}' in its 'depends_on' field. "
532                  f"This would cause '{target.id}' to run immediately. "
533                  f"Fix: Add 'depends_on: [{parent.id}]' to node '{target.id}'."
534              )
535  
536  
537  class WorkflowAppConfig(SamAgentAppConfig):
538      """Workflow app configuration extends agent config."""
539  
540      agent_type: Literal["workflow"] = "workflow"
541  
542      # Rename agent_name to name for clarity in workflow context
543      name: str = Field(..., description="Unique name for this workflow instance.")
544  
545      # Make parent's agent_name optional and populate from name via validator
546      agent_name: Optional[str] = Field(default=None, exclude=True)
547  
548      @model_validator(mode='after')
549      def populate_agent_name_from_name(self):
550          """Populate agent_name from name for compatibility with parent class."""
551          if not self.agent_name:
552              self.agent_name = self.name
553          return self
554  
555  
556      # Workflow definition
557      workflow: WorkflowDefinition = Field(..., description="The workflow DAG definition")
558  
559      # Workflow execution settings
560      max_workflow_execution_time_seconds: int = Field(
561          default=1800,  # 30 minutes
562          description="Maximum time for entire workflow execution",
563      )
564      default_node_timeout_seconds: int = Field(
565          default=300,  # 5 minutes
566          description="Default timeout for individual nodes",
567      )
568      node_cancellation_timeout_seconds: int = Field(
569          default=30,
570          description="Time to wait for a node to confirm cancellation before force-failing.",
571      )
572      default_max_map_items: int = Field(
573          default=100, description="Default max items for map nodes"
574      )
575  
576      # Override optional fields from SamAgentAppConfig that might not be needed or have different defaults
577      model: Optional[Union[str, Dict[str, Any]]] = None
578      instruction: Optional[Any] = None
579  
580      # Make agent_card optional as it is auto-generated from workflow definition
581      agent_card: Optional[AgentCardConfig] = Field(
582          default_factory=lambda: AgentCardConfig(),
583          description="Static definition of this agent's capabilities for discovery."
584      )
585      
586      # Make agent_card_publishing optional with defaults
587      agent_card_publishing: Optional[AgentCardPublishingConfig] = Field(
588          default_factory=lambda: AgentCardPublishingConfig(interval_seconds=10),
589          description="Settings for publishing the agent card."
590      )
591  
592  
593  class WorkflowApp(App):
594      """Custom App class for workflow orchestration."""
595  
596      # Define app schema for validation (empty for now, could be extended)
597      app_schema: Dict[str, Any] = {"config_parameters": []}
598  
599      def __init__(self, app_info: Dict[str, Any], **kwargs):
600          log.debug("Initializing WorkflowApp...")
601  
602          app_config_dict = app_info.get("app_config", {})
603  
604          try:
605              # Validate configuration
606              app_config = WorkflowAppConfig.model_validate_and_clean(app_config_dict)
607          except Exception as e:
608              log.error(f"Workflow configuration validation failed: {e}")
609              raise
610  
611          # Extract workflow-specific settings
612          namespace = app_config.namespace
613          workflow_name = app_config.name
614  
615          # Auto-populate agent card with workflow schemas in skills
616          # Note: AgentCardConfig doesn't have input_schema/output_schema directly
617          # These should be specified in the agent_card.skills in the YAML config
618          # or they can be added to the workflow definition's skills field
619  
620          # Generate subscriptions
621          subscriptions = self._generate_subscriptions(namespace, workflow_name)
622  
623          # Build component configuration
624          component_info = {
625              "component_name": workflow_name,
626              "component_module": "solace_agent_mesh.workflow.component",
627              "component_config": {"app_config": app_config.model_dump()},
628              "subscriptions": subscriptions,  # Include subscriptions in component
629          }
630  
631          # Update app_info with validated config
632          app_info["app_config"] = app_config.model_dump()
633          app_info["components"] = [component_info]  # Use 'components' not 'component_list'
634  
635          # Configure broker for workflow messaging
636          broker_config = app_info.setdefault("broker", {})
637          broker_config["input_enabled"] = True
638          broker_config["output_enabled"] = True
639          log.debug("Injected broker.input_enabled=True and broker.output_enabled=True")
640  
641          generated_queue_name = f"{namespace.strip('/')}/q/a2a/{workflow_name}"
642          broker_config["queue_name"] = generated_queue_name
643          log.debug("Injected generated broker.queue_name: %s", generated_queue_name)
644  
645          broker_config["temporary_queue"] = app_info.get("broker", {}).get(
646              "temporary_queue", True
647          )
648          log.debug(
649              "Set broker_config.temporary_queue = %s", broker_config["temporary_queue"]
650          )
651  
652          # Call parent App constructor
653          super().__init__(app_info, **kwargs)
654  
655      def _generate_subscriptions(
656          self, namespace: str, workflow_name: str
657      ) -> List[Dict[str, str]]:
658          """Generate Solace topic subscriptions for workflow."""
659          subscriptions = []
660  
661          # Discovery topic for agent cards
662          subscriptions.append({"topic": a2a.get_discovery_subscription_topic(namespace)})
663  
664          # Workflow's agent request topic
665          subscriptions.append(
666              {"topic": a2a.get_agent_request_topic(namespace, workflow_name)}
667          )
668  
669          # Agent response topics (wildcard)
670          subscriptions.append(
671              {
672                  "topic": a2a.get_agent_response_subscription_topic(
673                      namespace, workflow_name
674                  )
675              }
676          )
677  
678          # Agent status topics (wildcard)
679          subscriptions.append(
680              {
681                  "topic": a2a.get_agent_status_subscription_topic(
682                      namespace, workflow_name
683                  )
684              }
685          )
686  
687          return subscriptions