conditional.py
  1  """
  2  Conditional expression evaluation for workflow flow control.
  3  
  4  Supports Argo Workflows-compatible template syntax with aliases:
  5  - {{item}} -> {{_map_item}} (Argo loop variable)
  6  - {{workflow.parameters.x}} -> {{workflow.input.x}} (Argo input syntax)
  7  """
  8  
  9  import ast
 10  import logging
 11  import operator
 12  import re
 13  from typing import Any
 14  
 15  from ..workflow_execution_context import WorkflowExecutionState
 16  
 17  log = logging.getLogger(__name__)
 18  
 19  
 20  # Comparison operators supported in conditions
 21  _COMPARE_OPS = {
 22      ast.Eq: operator.eq,
 23      ast.NotEq: operator.ne,
 24      ast.Lt: operator.lt,
 25      ast.LtE: operator.le,
 26      ast.Gt: operator.gt,
 27      ast.GtE: operator.ge,
 28      ast.In: lambda a, b: a in b,
 29      ast.NotIn: lambda a, b: a not in b,
 30  }
 31  
 32  
 33  def _safe_eval_expression(expr: str) -> Any:
 34      """
 35      Safely evaluate a simple expression using Python's AST.
 36  
 37      Supports:
 38      - Comparisons: ==, !=, <, <=, >, >=, in, not in
 39      - Boolean operators: and, or, not
 40      - Literals: strings, numbers, booleans (true/false), null
 41      - Parentheses for grouping
 42  
 43      Args:
 44          expr: The expression string to evaluate
 45  
 46      Returns:
 47          The result of evaluating the expression
 48  
 49      Raises:
 50          ValueError: If the expression contains unsupported syntax
 51      """
 52      try:
 53          tree = ast.parse(expr, mode="eval")
 54      except SyntaxError as e:
 55          raise ValueError(f"Invalid expression syntax: {e}") from e
 56  
 57      return _eval_node(tree.body)
 58  
 59  
 60  def _eval_node(node: ast.AST) -> Any:
 61      """
 62      Recursively evaluate an AST node.
 63  
 64      Args:
 65          node: The AST node to evaluate
 66  
 67      Returns:
 68          The result of evaluating the node
 69      """
 70      if isinstance(node, ast.Compare):
 71          # Handle comparisons: x == y, x > y, x in y, etc.
 72          left = _eval_node(node.left)
 73          for op, comparator in zip(node.ops, node.comparators):
 74              right = _eval_node(comparator)
 75              op_func = _COMPARE_OPS.get(type(op))
 76              if op_func is None:
 77                  raise ValueError(f"Unsupported comparison operator: {type(op).__name__}")
 78              if not op_func(left, right):
 79                  return False
 80              left = right
 81          return True
 82  
 83      elif isinstance(node, ast.BoolOp):
 84          # Handle boolean operations: x and y, x or y
 85          if isinstance(node.op, ast.And):
 86              return all(_eval_node(v) for v in node.values)
 87          elif isinstance(node.op, ast.Or):
 88              return any(_eval_node(v) for v in node.values)
 89          else:
 90              raise ValueError(f"Unsupported boolean operator: {type(node.op).__name__}")
 91  
 92      elif isinstance(node, ast.UnaryOp):
 93          # Handle unary operations: not x, -x
 94          operand = _eval_node(node.operand)
 95          if isinstance(node.op, ast.Not):
 96              return not operand
 97          elif isinstance(node.op, ast.USub):
 98              return -operand
 99          elif isinstance(node.op, ast.UAdd):
100              return +operand
101          else:
102              raise ValueError(f"Unsupported unary operator: {type(node.op).__name__}")
103  
104      elif isinstance(node, ast.Constant):
105          # Handle literals: strings, numbers, booleans, None
106          return node.value
107  
108      elif isinstance(node, ast.Name):
109          # Handle identifiers: true, false, null (and Python's True, False, None)
110          name = node.id.lower()
111          if name == "true":
112              return True
113          elif name == "false":
114              return False
115          elif name == "null" or name == "none":
116              return None
117          else:
118              raise ValueError(f"Unknown identifier: {node.id}")
119  
120      elif isinstance(node, ast.Expr):
121          # Handle expression wrapper
122          return _eval_node(node.value)
123  
124      else:
125          raise ValueError(f"Unsupported expression type: {type(node).__name__}")
126  
127  
128  class ConditionalEvaluationError(Exception):
129      """Raised when conditional expression evaluation fails."""
130  
131      pass
132  
133  
134  # Argo-compatible template aliases
135  TEMPLATE_ALIASES = {
136      # Argo uses 'item' for loop variable, SAM uses '_map_item'
137      "{{item}}": "{{_map_item}}",
138      "{{item.": "{{_map_item.",
139      # Argo uses 'workflow.parameters', SAM uses 'workflow.input'
140      "workflow.parameters.": "workflow.input.",
141  }
142  
143  
144  def _apply_template_aliases(expression: str) -> str:
145      """
146      Apply Argo-compatible aliases to template expression.
147  
148      Transforms:
149      - {{item}} -> {{_map_item}}
150      - {{item.field}} -> {{_map_item.field}}
151      - {{workflow.parameters.x}} -> {{workflow.input.x}}
152      """
153      result = expression
154      for alias, target in TEMPLATE_ALIASES.items():
155          result = result.replace(alias, target)
156      return result
157  
158  
159  def evaluate_condition(
160      condition_expr: str, workflow_state: WorkflowExecutionState
161  ) -> bool:
162      """
163      Safely evaluate conditional expression.
164      Returns boolean result.
165  
166      Supports Argo-style aliases:
167      - {{item}} for map loop variable
168      - {{workflow.parameters.x}} for workflow input
169      """
170      # Apply template aliases for Argo compatibility
171      condition_expr = _apply_template_aliases(condition_expr)
172  
173      try:
174          # Helper to resolve a single match
175          def replace_match(match):
176              path = match.group(1).strip()
177              parts = path.split(".")
178  
179              # Navigate path in workflow state (similar to DAGExecutor._resolve_template)
180              if parts[0] == "workflow" and parts[1] == "input":
181                  if "workflow_input" not in workflow_state.node_outputs:
182                      raise ValueError("Workflow input has not been initialized")
183                  data = workflow_state.node_outputs["workflow_input"]["output"]
184                  parts = parts[2:]
185              # Handle workflow.status and workflow.error for exit handlers
186              elif parts[0] == "workflow" and len(parts) >= 2:
187                  if "workflow" not in workflow_state.node_outputs:
188                      raise ValueError("Workflow status has not been initialized")
189                  data = workflow_state.node_outputs["workflow"]
190                  parts = parts[1:]
191              else:
192                  node_id = parts[0]
193                  if node_id not in workflow_state.node_outputs:
194                      raise ValueError(f"Referenced node '{node_id}' has not completed")
195                  data = workflow_state.node_outputs[node_id]
196                  parts = parts[1:]
197  
198              # Traverse remaining parts
199              for part in parts:
200                  if isinstance(data, dict) and part in data:
201                      data = data[part]
202                  elif data is None:
203                      # Allow graceful handling of None values in path
204                      return "None"
205                  else:
206                      raise ValueError(f"Field '{part}' not found in path: {path}")
207  
208              return str(data)
209  
210          # Replace all {{...}} patterns with their resolved string values
211          clean_expr = re.sub(r"\{\{(.+?)\}\}", replace_match, condition_expr)
212  
213          log.debug(f"Evaluated condition: '{condition_expr}' -> '{clean_expr}'")
214  
215          result = _safe_eval_expression(clean_expr)
216          return bool(result)
217      except Exception as e:
218          raise ConditionalEvaluationError(
219              f"Failed to evaluate condition '{condition_expr}': {e}"
220          ) from e