conditional.py
1 """ 2 Conditional expression evaluation for workflow flow control. 3 4 Supports Argo Workflows-compatible template syntax with aliases: 5 - {{item}} -> {{_map_item}} (Argo loop variable) 6 - {{workflow.parameters.x}} -> {{workflow.input.x}} (Argo input syntax) 7 """ 8 9 import ast 10 import logging 11 import operator 12 import re 13 from typing import Any 14 15 from ..workflow_execution_context import WorkflowExecutionState 16 17 log = logging.getLogger(__name__) 18 19 20 # Comparison operators supported in conditions 21 _COMPARE_OPS = { 22 ast.Eq: operator.eq, 23 ast.NotEq: operator.ne, 24 ast.Lt: operator.lt, 25 ast.LtE: operator.le, 26 ast.Gt: operator.gt, 27 ast.GtE: operator.ge, 28 ast.In: lambda a, b: a in b, 29 ast.NotIn: lambda a, b: a not in b, 30 } 31 32 33 def _safe_eval_expression(expr: str) -> Any: 34 """ 35 Safely evaluate a simple expression using Python's AST. 36 37 Supports: 38 - Comparisons: ==, !=, <, <=, >, >=, in, not in 39 - Boolean operators: and, or, not 40 - Literals: strings, numbers, booleans (true/false), null 41 - Parentheses for grouping 42 43 Args: 44 expr: The expression string to evaluate 45 46 Returns: 47 The result of evaluating the expression 48 49 Raises: 50 ValueError: If the expression contains unsupported syntax 51 """ 52 try: 53 tree = ast.parse(expr, mode="eval") 54 except SyntaxError as e: 55 raise ValueError(f"Invalid expression syntax: {e}") from e 56 57 return _eval_node(tree.body) 58 59 60 def _eval_node(node: ast.AST) -> Any: 61 """ 62 Recursively evaluate an AST node. 63 64 Args: 65 node: The AST node to evaluate 66 67 Returns: 68 The result of evaluating the node 69 """ 70 if isinstance(node, ast.Compare): 71 # Handle comparisons: x == y, x > y, x in y, etc. 72 left = _eval_node(node.left) 73 for op, comparator in zip(node.ops, node.comparators): 74 right = _eval_node(comparator) 75 op_func = _COMPARE_OPS.get(type(op)) 76 if op_func is None: 77 raise ValueError(f"Unsupported comparison operator: {type(op).__name__}") 78 if not op_func(left, right): 79 return False 80 left = right 81 return True 82 83 elif isinstance(node, ast.BoolOp): 84 # Handle boolean operations: x and y, x or y 85 if isinstance(node.op, ast.And): 86 return all(_eval_node(v) for v in node.values) 87 elif isinstance(node.op, ast.Or): 88 return any(_eval_node(v) for v in node.values) 89 else: 90 raise ValueError(f"Unsupported boolean operator: {type(node.op).__name__}") 91 92 elif isinstance(node, ast.UnaryOp): 93 # Handle unary operations: not x, -x 94 operand = _eval_node(node.operand) 95 if isinstance(node.op, ast.Not): 96 return not operand 97 elif isinstance(node.op, ast.USub): 98 return -operand 99 elif isinstance(node.op, ast.UAdd): 100 return +operand 101 else: 102 raise ValueError(f"Unsupported unary operator: {type(node.op).__name__}") 103 104 elif isinstance(node, ast.Constant): 105 # Handle literals: strings, numbers, booleans, None 106 return node.value 107 108 elif isinstance(node, ast.Name): 109 # Handle identifiers: true, false, null (and Python's True, False, None) 110 name = node.id.lower() 111 if name == "true": 112 return True 113 elif name == "false": 114 return False 115 elif name == "null" or name == "none": 116 return None 117 else: 118 raise ValueError(f"Unknown identifier: {node.id}") 119 120 elif isinstance(node, ast.Expr): 121 # Handle expression wrapper 122 return _eval_node(node.value) 123 124 else: 125 raise ValueError(f"Unsupported expression type: {type(node).__name__}") 126 127 128 class ConditionalEvaluationError(Exception): 129 """Raised when conditional expression evaluation fails.""" 130 131 pass 132 133 134 # Argo-compatible template aliases 135 TEMPLATE_ALIASES = { 136 # Argo uses 'item' for loop variable, SAM uses '_map_item' 137 "{{item}}": "{{_map_item}}", 138 "{{item.": "{{_map_item.", 139 # Argo uses 'workflow.parameters', SAM uses 'workflow.input' 140 "workflow.parameters.": "workflow.input.", 141 } 142 143 144 def _apply_template_aliases(expression: str) -> str: 145 """ 146 Apply Argo-compatible aliases to template expression. 147 148 Transforms: 149 - {{item}} -> {{_map_item}} 150 - {{item.field}} -> {{_map_item.field}} 151 - {{workflow.parameters.x}} -> {{workflow.input.x}} 152 """ 153 result = expression 154 for alias, target in TEMPLATE_ALIASES.items(): 155 result = result.replace(alias, target) 156 return result 157 158 159 def evaluate_condition( 160 condition_expr: str, workflow_state: WorkflowExecutionState 161 ) -> bool: 162 """ 163 Safely evaluate conditional expression. 164 Returns boolean result. 165 166 Supports Argo-style aliases: 167 - {{item}} for map loop variable 168 - {{workflow.parameters.x}} for workflow input 169 """ 170 # Apply template aliases for Argo compatibility 171 condition_expr = _apply_template_aliases(condition_expr) 172 173 try: 174 # Helper to resolve a single match 175 def replace_match(match): 176 path = match.group(1).strip() 177 parts = path.split(".") 178 179 # Navigate path in workflow state (similar to DAGExecutor._resolve_template) 180 if parts[0] == "workflow" and parts[1] == "input": 181 if "workflow_input" not in workflow_state.node_outputs: 182 raise ValueError("Workflow input has not been initialized") 183 data = workflow_state.node_outputs["workflow_input"]["output"] 184 parts = parts[2:] 185 # Handle workflow.status and workflow.error for exit handlers 186 elif parts[0] == "workflow" and len(parts) >= 2: 187 if "workflow" not in workflow_state.node_outputs: 188 raise ValueError("Workflow status has not been initialized") 189 data = workflow_state.node_outputs["workflow"] 190 parts = parts[1:] 191 else: 192 node_id = parts[0] 193 if node_id not in workflow_state.node_outputs: 194 raise ValueError(f"Referenced node '{node_id}' has not completed") 195 data = workflow_state.node_outputs[node_id] 196 parts = parts[1:] 197 198 # Traverse remaining parts 199 for part in parts: 200 if isinstance(data, dict) and part in data: 201 data = data[part] 202 elif data is None: 203 # Allow graceful handling of None values in path 204 return "None" 205 else: 206 raise ValueError(f"Field '{part}' not found in path: {path}") 207 208 return str(data) 209 210 # Replace all {{...}} patterns with their resolved string values 211 clean_expr = re.sub(r"\{\{(.+?)\}\}", replace_match, condition_expr) 212 213 log.debug(f"Evaluated condition: '{condition_expr}' -> '{clean_expr}'") 214 215 result = _safe_eval_expression(clean_expr) 216 return bool(result) 217 except Exception as e: 218 raise ConditionalEvaluationError( 219 f"Failed to evaluate condition '{condition_expr}': {e}" 220 ) from e