app.py
1 """ 2 WorkflowApp class and configuration models for Prescriptive Workflows. 3 4 Supports Argo Workflows-compatible syntax with SAM extensions. 5 """ 6 7 import logging 8 from typing import Any, Dict, List, Literal, Optional, Union 9 from pydantic import BaseModel, Field, model_validator 10 11 from solace_ai_connector.flow.app import App 12 from ..common import a2a 13 from ..agent.sac.app import SamAgentAppConfig, AgentCardConfig, AgentCardPublishingConfig 14 15 log = logging.getLogger(__name__) 16 17 # --- Retry Strategy Models (Argo-compatible) --- 18 19 20 class BackoffStrategy(BaseModel): 21 """Exponential backoff configuration for retries.""" 22 23 duration: str = Field( 24 default="1s", 25 description="Initial backoff duration. Supports: '5s', '1m', '1h'.", 26 ) 27 factor: float = Field( 28 default=2.0, 29 description="Multiplier for exponential backoff.", 30 ) 31 max_duration: Optional[str] = Field( 32 default=None, 33 description="Maximum backoff duration cap.", 34 alias="maxDuration", 35 ) 36 37 class Config: 38 populate_by_name = True 39 40 41 class RetryStrategy(BaseModel): 42 """ 43 Retry configuration for workflow nodes. 44 Argo-compatible with extensions. 45 """ 46 47 limit: int = Field( 48 default=3, 49 description="Maximum number of retry attempts.", 50 ) 51 retry_policy: Literal["Always", "OnFailure", "OnError"] = Field( 52 default="OnFailure", 53 description="When to retry: Always, OnFailure, OnError.", 54 alias="retryPolicy", 55 ) 56 backoff: Optional[BackoffStrategy] = Field( 57 default=None, 58 description="Exponential backoff configuration.", 59 ) 60 61 class Config: 62 populate_by_name = True 63 64 65 # --- Exit Handler Model --- 66 67 68 class ExitHandler(BaseModel): 69 """ 70 Exit handler configuration for cleanup/notification on workflow completion. 71 72 Supports conditional handlers for different outcomes. 73 """ 74 75 always: Optional[str] = Field( 76 default=None, 77 description="Node ID to execute regardless of workflow outcome.", 78 ) 79 on_success: Optional[str] = Field( 80 default=None, 81 description="Node ID to execute only on successful completion.", 82 alias="onSuccess", 83 ) 84 on_failure: Optional[str] = Field( 85 default=None, 86 description="Node ID to execute only on failure.", 87 alias="onFailure", 88 ) 89 on_cancel: Optional[str] = Field( 90 default=None, 91 description="Node ID to execute only on cancellation.", 92 alias="onCancel", 93 ) 94 95 class Config: 96 populate_by_name = True 97 98 99 # --- Workflow Node Models --- 100 101 102 class WorkflowNode(BaseModel): 103 """ 104 Base workflow node. 105 106 Supports both SAM and Argo field names: 107 - depends_on / dependencies (Argo alias) 108 """ 109 110 id: str = Field(..., description="Unique node identifier") 111 type: str = Field(..., description="Node type") 112 depends_on: Optional[List[str]] = Field( 113 default=None, 114 description="List of node IDs this node depends on.", 115 alias="dependencies", 116 ) 117 118 class Config: 119 populate_by_name = True 120 121 122 class AgentNode(WorkflowNode): 123 """ 124 Agent invocation node. 125 126 Argo-aligned features: 127 - `when`: Conditional execution clause (Argo-style) 128 - `retryStrategy`: Retry configuration 129 - `timeout`: Node-specific timeout override 130 - `instruction`: Optional guidance text sent to the target agent 131 """ 132 133 type: Literal["agent"] = "agent" 134 agent_name: str = Field(..., description="Name of agent to invoke") 135 input: Optional[Dict[str, Any]] = Field( 136 default=None, 137 description="Input mapping. If omitted, inferred from dependencies.", 138 ) 139 instruction: Optional[str] = Field( 140 default=None, 141 description=( 142 "Optional instruction/guidance text sent to the target agent. " 143 "Supports template expressions like '{{workflow.input.context}}'. " 144 "Provides context for how the agent should process the request." 145 ), 146 ) 147 148 # Optional schema overrides 149 input_schema_override: Optional[Dict[str, Any]] = None 150 output_schema_override: Optional[Dict[str, Any]] = None 151 152 # Argo-aligned fields 153 when: Optional[str] = Field( 154 default=None, 155 description=( 156 "Conditional execution expression (Argo-style). " 157 "Node only executes if expression evaluates to true." 158 ), 159 ) 160 retry_strategy: Optional[RetryStrategy] = Field( 161 default=None, 162 description="Retry configuration for this node.", 163 alias="retryStrategy", 164 ) 165 timeout: Optional[str] = Field( 166 default=None, 167 description="Node-specific timeout. Format: '30s', '5m', '1h'.", 168 ) 169 170 class Config: 171 populate_by_name = True 172 173 174 class SwitchCase(BaseModel): 175 """A single case in a switch node.""" 176 177 condition: str = Field( 178 ..., 179 description="Expression to evaluate for this case.", 180 alias="when", 181 ) 182 node: str = Field( 183 ..., 184 description="Node ID to execute if condition matches.", 185 alias="then", 186 ) 187 188 class Config: 189 populate_by_name = True 190 191 192 class SwitchNode(WorkflowNode): 193 """ 194 Multi-way conditional branching node. 195 196 Cases are evaluated in order; first match wins. 197 """ 198 199 type: Literal["switch"] = "switch" 200 cases: List[SwitchCase] = Field( 201 ..., 202 description="Ordered list of condition/node pairs. First match wins.", 203 ) 204 default: Optional[str] = Field( 205 default=None, 206 description="Node ID to execute if no cases match.", 207 ) 208 209 210 class LoopNode(WorkflowNode): 211 """ 212 While-loop node for iterative execution until condition is met. 213 214 Different from MapNode which is for-each iteration. 215 LoopNode repeats a node until a condition becomes false. 216 """ 217 218 type: Literal["loop"] = "loop" 219 node: str = Field(..., description="Node ID to execute repeatedly") 220 condition: str = Field( 221 ..., 222 description="Continue looping while this expression is true.", 223 ) 224 max_iterations: int = Field( 225 default=100, 226 description="Safety limit on number of iterations.", 227 alias="maxIterations", 228 ) 229 delay: Optional[str] = Field( 230 default=None, 231 description="Delay between loop iterations. Format: '5s', '1m', '1h', '1d'.", 232 ) 233 234 class Config: 235 populate_by_name = True 236 237 238 class MapNode(WorkflowNode): 239 """ 240 Map (parallel iteration) node. 241 242 Supports both SAM syntax and Argo-style withItems/withParam. 243 """ 244 245 type: Literal["map"] = "map" 246 247 # Primary SAM field 248 items: Optional[Union[str, Dict[str, Any]]] = Field( 249 default=None, 250 description="Array template reference or expression to iterate over.", 251 ) 252 253 # Argo aliases 254 with_param: Optional[str] = Field( 255 default=None, 256 description="Argo-style: JSON array from previous step output.", 257 alias="withParam", 258 ) 259 with_items: Optional[List[Any]] = Field( 260 default=None, 261 description="Argo-style: Static list of items to iterate over.", 262 alias="withItems", 263 ) 264 265 node: str = Field(..., description="Node ID to execute for each item") 266 max_items: Optional[int] = Field( 267 default=100, 268 description="Maximum items to process (safety limit).", 269 alias="maxItems", 270 ) 271 concurrency_limit: Optional[int] = Field( 272 default=None, 273 description="Max concurrent executions. None means unlimited.", 274 alias="concurrencyLimit", 275 ) 276 277 class Config: 278 populate_by_name = True 279 280 @model_validator(mode="after") 281 def validate_items_source(self) -> "MapNode": 282 """Ensure exactly one items source is provided.""" 283 sources = [ 284 self.items is not None, 285 self.with_param is not None, 286 self.with_items is not None, 287 ] 288 if sum(sources) == 0: 289 raise ValueError( 290 "MapNode requires one of: 'items', 'withParam', or 'withItems'" 291 ) 292 if sum(sources) > 1: 293 raise ValueError( 294 "MapNode accepts only one of: 'items', 'withParam', or 'withItems'" 295 ) 296 return self 297 298 def get_items_expression(self) -> Union[str, List[Any], Dict[str, Any]]: 299 """Return the items source regardless of which field was used.""" 300 if self.items is not None: 301 return self.items 302 if self.with_param is not None: 303 return self.with_param 304 return self.with_items 305 306 307 class WorkflowInvokeNode(WorkflowNode): 308 """ 309 Workflow invocation node. 310 311 Calls another workflow as a sub-workflow. 312 Since workflows register as agents, this reuses the agent calling mechanism. 313 314 Argo-aligned features: 315 - `when`: Conditional execution clause (Argo-style) 316 - `retryStrategy`: Retry configuration 317 - `timeout`: Node-specific timeout override 318 - `instruction`: Optional guidance text sent to the target workflow 319 """ 320 321 type: Literal["workflow"] = "workflow" 322 workflow_name: str = Field(..., description="Name of workflow to invoke") 323 input: Optional[Dict[str, Any]] = Field( 324 default=None, 325 description="Input mapping. If omitted, inferred from dependencies.", 326 ) 327 instruction: Optional[str] = Field( 328 default=None, 329 description=( 330 "Optional instruction/guidance text sent to the target workflow. " 331 "Supports template expressions like '{{workflow.input.context}}'." 332 ), 333 ) 334 335 # Optional schema overrides 336 input_schema_override: Optional[Dict[str, Any]] = None 337 output_schema_override: Optional[Dict[str, Any]] = None 338 339 # Argo-aligned fields 340 when: Optional[str] = Field( 341 default=None, 342 description=( 343 "Conditional execution expression (Argo-style). " 344 "Node only executes if expression evaluates to true." 345 ), 346 ) 347 retry_strategy: Optional[RetryStrategy] = Field( 348 default=None, 349 description="Retry configuration for this node.", 350 alias="retryStrategy", 351 ) 352 timeout: Optional[str] = Field( 353 default=None, 354 description="Node-specific timeout. Format: '30s', '5m', '1h'.", 355 ) 356 357 class Config: 358 populate_by_name = True 359 360 361 # Union type for polymorphic node list 362 WorkflowNodeUnion = Union[ 363 AgentNode, 364 SwitchNode, 365 LoopNode, 366 MapNode, 367 WorkflowInvokeNode, 368 ] 369 370 371 class WorkflowDefinition(BaseModel): 372 """ 373 Complete workflow definition. 374 375 Argo-aligned features: 376 - onExit: Exit handler for cleanup/notification 377 - failFast: Control behavior on node failure 378 - retryStrategy: Default retry strategy for all nodes 379 """ 380 381 description: str = Field(..., description="Human-readable workflow description") 382 383 version: str = Field( 384 default="1.0.0", 385 description="User-defined version of the workflow (semantic versioning recommended)", 386 ) 387 388 input_schema: Optional[Dict[str, Any]] = Field( 389 default=None, 390 description="JSON Schema for workflow input.", 391 alias="inputSchema", 392 ) 393 394 output_schema: Optional[Dict[str, Any]] = Field( 395 default=None, 396 description="JSON Schema for workflow output.", 397 alias="outputSchema", 398 ) 399 400 nodes: List[WorkflowNodeUnion] = Field( 401 ..., 402 description="Workflow nodes (DAG vertices).", 403 ) 404 405 output_mapping: Dict[str, Any] = Field( 406 ..., 407 description="Mapping from node outputs to final workflow output.", 408 alias="outputMapping", 409 ) 410 411 skills: Optional[List[Dict[str, Any]]] = Field( 412 default=None, 413 description="Workflow skills for agent card.", 414 ) 415 416 # Argo-aligned fields 417 on_exit: Optional[Union[str, ExitHandler]] = Field( 418 default=None, 419 description=( 420 "Exit handler configuration. Can be a node ID (string) or " 421 "ExitHandler object with on_success/on_failure/on_cancel/always." 422 ), 423 alias="onExit", 424 ) 425 426 fail_fast: bool = Field( 427 default=True, 428 description=( 429 "If true, stop scheduling new nodes when one fails. " 430 "Running nodes continue to completion." 431 ), 432 alias="failFast", 433 ) 434 435 max_call_depth: int = Field( 436 default=10, 437 ge=1, 438 description=( 439 "Maximum allowed call depth for sub-workflow/agent invocations. " 440 "Prevents infinite recursion." 441 ), 442 alias="maxCallDepth", 443 ) 444 445 retry_strategy: Optional[RetryStrategy] = Field( 446 default=None, 447 description="Default retry strategy for all nodes (can be overridden per-node).", 448 alias="retryStrategy", 449 ) 450 451 class Config: 452 populate_by_name = True 453 454 @model_validator(mode="after") 455 def validate_dag_structure(self) -> "WorkflowDefinition": 456 """Validate DAG has valid references and consistent control flow.""" 457 node_map = {node.id: node for node in self.nodes} 458 459 for node in self.nodes: 460 # Check dependencies reference valid nodes 461 if node.depends_on: 462 for dep in node.depends_on: 463 if dep not in node_map: 464 raise ValueError( 465 f"Node '{node.id}' depends on non-existent node '{dep}'" 466 ) 467 468 # Validate Switch Node Consistency 469 if node.type == "switch": 470 for i, case in enumerate(node.cases): 471 self._validate_branch_dependency( 472 node, case.node, f"cases[{i}].node", node_map 473 ) 474 if node.default: 475 self._validate_branch_dependency( 476 node, node.default, "default", node_map 477 ) 478 479 # Validate LoopNode target reference 480 if node.type == "loop": 481 if node.node not in node_map: 482 raise ValueError( 483 f"LoopNode '{node.id}' references non-existent node '{node.node}'" 484 ) 485 486 # Validate MapNode target reference 487 if node.type == "map": 488 if node.node not in node_map: 489 raise ValueError( 490 f"MapNode '{node.id}' references non-existent node '{node.node}'" 491 ) 492 493 # Validate exit handler references 494 if self.on_exit: 495 if isinstance(self.on_exit, str): 496 if self.on_exit not in node_map: 497 raise ValueError( 498 f"onExit references non-existent node '{self.on_exit}'" 499 ) 500 else: 501 for field, node_id in [ 502 ("always", self.on_exit.always), 503 ("on_success", self.on_exit.on_success), 504 ("on_failure", self.on_exit.on_failure), 505 ("on_cancel", self.on_exit.on_cancel), 506 ]: 507 if node_id and node_id not in node_map: 508 raise ValueError( 509 f"onExit.{field} references non-existent node '{node_id}'" 510 ) 511 512 return self 513 514 def _validate_branch_dependency( 515 self, 516 parent: WorkflowNode, 517 target_id: str, 518 branch_name: str, 519 node_map: Dict[str, WorkflowNode], 520 ): 521 """Ensure target node depends on parent node.""" 522 target = node_map.get(target_id) 523 if not target: 524 raise ValueError( 525 f"Node '{parent.id}' references non-existent {branch_name} '{target_id}'" 526 ) 527 528 if not target.depends_on or parent.id not in target.depends_on: 529 raise ValueError( 530 f"Logic Error: Node '{parent.id}' routes to '{target.id}' ({branch_name}), " 531 f"but '{target.id}' does not list '{parent.id}' in its 'depends_on' field. " 532 f"This would cause '{target.id}' to run immediately. " 533 f"Fix: Add 'depends_on: [{parent.id}]' to node '{target.id}'." 534 ) 535 536 537 class WorkflowAppConfig(SamAgentAppConfig): 538 """Workflow app configuration extends agent config.""" 539 540 agent_type: Literal["workflow"] = "workflow" 541 542 # Rename agent_name to name for clarity in workflow context 543 name: str = Field(..., description="Unique name for this workflow instance.") 544 545 # Make parent's agent_name optional and populate from name via validator 546 agent_name: Optional[str] = Field(default=None, exclude=True) 547 548 @model_validator(mode='after') 549 def populate_agent_name_from_name(self): 550 """Populate agent_name from name for compatibility with parent class.""" 551 if not self.agent_name: 552 self.agent_name = self.name 553 return self 554 555 556 # Workflow definition 557 workflow: WorkflowDefinition = Field(..., description="The workflow DAG definition") 558 559 # Workflow execution settings 560 max_workflow_execution_time_seconds: int = Field( 561 default=1800, # 30 minutes 562 description="Maximum time for entire workflow execution", 563 ) 564 default_node_timeout_seconds: int = Field( 565 default=300, # 5 minutes 566 description="Default timeout for individual nodes", 567 ) 568 node_cancellation_timeout_seconds: int = Field( 569 default=30, 570 description="Time to wait for a node to confirm cancellation before force-failing.", 571 ) 572 default_max_map_items: int = Field( 573 default=100, description="Default max items for map nodes" 574 ) 575 576 # Override optional fields from SamAgentAppConfig that might not be needed or have different defaults 577 model: Optional[Union[str, Dict[str, Any]]] = None 578 instruction: Optional[Any] = None 579 580 # Make agent_card optional as it is auto-generated from workflow definition 581 agent_card: Optional[AgentCardConfig] = Field( 582 default_factory=lambda: AgentCardConfig(), 583 description="Static definition of this agent's capabilities for discovery." 584 ) 585 586 # Make agent_card_publishing optional with defaults 587 agent_card_publishing: Optional[AgentCardPublishingConfig] = Field( 588 default_factory=lambda: AgentCardPublishingConfig(interval_seconds=10), 589 description="Settings for publishing the agent card." 590 ) 591 592 593 class WorkflowApp(App): 594 """Custom App class for workflow orchestration.""" 595 596 # Define app schema for validation (empty for now, could be extended) 597 app_schema: Dict[str, Any] = {"config_parameters": []} 598 599 def __init__(self, app_info: Dict[str, Any], **kwargs): 600 log.debug("Initializing WorkflowApp...") 601 602 app_config_dict = app_info.get("app_config", {}) 603 604 try: 605 # Validate configuration 606 app_config = WorkflowAppConfig.model_validate_and_clean(app_config_dict) 607 except Exception as e: 608 log.error(f"Workflow configuration validation failed: {e}") 609 raise 610 611 # Extract workflow-specific settings 612 namespace = app_config.namespace 613 workflow_name = app_config.name 614 615 # Auto-populate agent card with workflow schemas in skills 616 # Note: AgentCardConfig doesn't have input_schema/output_schema directly 617 # These should be specified in the agent_card.skills in the YAML config 618 # or they can be added to the workflow definition's skills field 619 620 # Generate subscriptions 621 subscriptions = self._generate_subscriptions(namespace, workflow_name) 622 623 # Build component configuration 624 component_info = { 625 "component_name": workflow_name, 626 "component_module": "solace_agent_mesh.workflow.component", 627 "component_config": {"app_config": app_config.model_dump()}, 628 "subscriptions": subscriptions, # Include subscriptions in component 629 } 630 631 # Update app_info with validated config 632 app_info["app_config"] = app_config.model_dump() 633 app_info["components"] = [component_info] # Use 'components' not 'component_list' 634 635 # Configure broker for workflow messaging 636 broker_config = app_info.setdefault("broker", {}) 637 broker_config["input_enabled"] = True 638 broker_config["output_enabled"] = True 639 log.debug("Injected broker.input_enabled=True and broker.output_enabled=True") 640 641 generated_queue_name = f"{namespace.strip('/')}/q/a2a/{workflow_name}" 642 broker_config["queue_name"] = generated_queue_name 643 log.debug("Injected generated broker.queue_name: %s", generated_queue_name) 644 645 broker_config["temporary_queue"] = app_info.get("broker", {}).get( 646 "temporary_queue", True 647 ) 648 log.debug( 649 "Set broker_config.temporary_queue = %s", broker_config["temporary_queue"] 650 ) 651 652 # Call parent App constructor 653 super().__init__(app_info, **kwargs) 654 655 def _generate_subscriptions( 656 self, namespace: str, workflow_name: str 657 ) -> List[Dict[str, str]]: 658 """Generate Solace topic subscriptions for workflow.""" 659 subscriptions = [] 660 661 # Discovery topic for agent cards 662 subscriptions.append({"topic": a2a.get_discovery_subscription_topic(namespace)}) 663 664 # Workflow's agent request topic 665 subscriptions.append( 666 {"topic": a2a.get_agent_request_topic(namespace, workflow_name)} 667 ) 668 669 # Agent response topics (wildcard) 670 subscriptions.append( 671 { 672 "topic": a2a.get_agent_response_subscription_topic( 673 namespace, workflow_name 674 ) 675 } 676 ) 677 678 # Agent status topics (wildcard) 679 subscriptions.append( 680 { 681 "topic": a2a.get_agent_status_subscription_topic( 682 namespace, workflow_name 683 ) 684 } 685 ) 686 687 return subscriptions