/ tests / unit / workflow / test_template_resolution.py
test_template_resolution.py
  1  """
  2  Unit tests for template resolution in workflow DAG executor.
  3  
  4  Tests the resolve_value() and _resolve_template() methods which are pure functions
  5  that transform template strings into resolved values based on workflow state.
  6  """
  7  
  8  import pytest
  9  from datetime import datetime, timezone
 10  from unittest.mock import Mock
 11  
 12  from solace_agent_mesh.workflow.dag_executor import DAGExecutor
 13  from solace_agent_mesh.workflow.workflow_execution_context import WorkflowExecutionState
 14  from solace_agent_mesh.workflow.app import WorkflowDefinition, AgentNode
 15  
 16  
 17  def create_minimal_dag_executor() -> DAGExecutor:
 18      """Create a minimal DAGExecutor for testing resolve_value."""
 19      # Minimal workflow definition with one node
 20      workflow_def = WorkflowDefinition(
 21          description="Test workflow",
 22          nodes=[
 23              AgentNode(id="test_node", type="agent", agent_name="TestAgent")
 24          ],
 25          output_mapping={"result": "{{test_node.output}}"},
 26      )
 27      # Mock the host component - not used by resolve_value
 28      mock_host = Mock()
 29      return DAGExecutor(workflow_def, mock_host)
 30  
 31  
 32  def create_workflow_state(node_outputs: dict) -> WorkflowExecutionState:
 33      """Create a WorkflowExecutionState with given node outputs."""
 34      return WorkflowExecutionState(
 35          workflow_name="test_workflow",
 36          execution_id="test_exec_001",
 37          start_time=datetime.now(timezone.utc),
 38          node_outputs=node_outputs,
 39      )
 40  
 41  
 42  class TestResolveWorkflowInput:
 43      """Tests for resolving {{workflow.input.*}} templates."""
 44  
 45      def test_resolve_workflow_input_simple(self):
 46          """{{workflow.input.x}} resolves to the value from workflow input."""
 47          executor = create_minimal_dag_executor()
 48          state = create_workflow_state({
 49              "workflow_input": {"output": {"x": 42}}
 50          })
 51  
 52          result = executor.resolve_value("{{workflow.input.x}}", state)
 53          assert result == 42
 54  
 55      def test_resolve_workflow_input_nested(self):
 56          """{{workflow.input.a.b.c}} resolves nested paths."""
 57          executor = create_minimal_dag_executor()
 58          state = create_workflow_state({
 59              "workflow_input": {"output": {"a": {"b": {"c": "deep_value"}}}}
 60          })
 61  
 62          result = executor.resolve_value("{{workflow.input.a.b.c}}", state)
 63          assert result == "deep_value"
 64  
 65      def test_resolve_workflow_input_missing_field_returns_none(self):
 66          """Missing workflow input field returns None (for coalesce support)."""
 67          executor = create_minimal_dag_executor()
 68          state = create_workflow_state({
 69              "workflow_input": {"output": {"x": 1}}
 70          })
 71  
 72          result = executor.resolve_value("{{workflow.input.nonexistent}}", state)
 73          assert result is None
 74  
 75      def test_resolve_workflow_input_not_initialized_raises(self):
 76          """Referencing workflow input before initialization raises ValueError."""
 77          executor = create_minimal_dag_executor()
 78          state = create_workflow_state({})  # No workflow_input
 79  
 80          with pytest.raises(ValueError, match="Workflow input has not been initialized"):
 81              executor.resolve_value("{{workflow.input.x}}", state)
 82  
 83  
 84  class TestResolveNodeOutput:
 85      """Tests for resolving {{node.output.*}} templates."""
 86  
 87      def test_resolve_node_output_simple(self):
 88          """{{node.output.field}} resolves to the value from node output."""
 89          executor = create_minimal_dag_executor()
 90          state = create_workflow_state({
 91              "step1": {"output": {"result": "success"}}
 92          })
 93  
 94          result = executor.resolve_value("{{step1.output.result}}", state)
 95          assert result == "success"
 96  
 97      def test_resolve_node_output_nested(self):
 98          """{{node.output.a.b}} resolves nested paths in node output."""
 99          executor = create_minimal_dag_executor()
100          state = create_workflow_state({
101              "process_node": {"output": {"data": {"items": [1, 2, 3]}}}
102          })
103  
104          result = executor.resolve_value("{{process_node.output.data.items}}", state)
105          assert result == [1, 2, 3]
106  
107      def test_resolve_node_output_missing_node_returns_none(self):
108          """Referencing non-existent node returns None (for skipped nodes)."""
109          executor = create_minimal_dag_executor()
110          state = create_workflow_state({
111              "existing_node": {"output": {"x": 1}}
112          })
113  
114          result = executor.resolve_value("{{nonexistent_node.output.x}}", state)
115          assert result is None
116  
117      def test_resolve_node_output_missing_field_raises(self):
118          """Missing field in existing node output raises ValueError."""
119          executor = create_minimal_dag_executor()
120          state = create_workflow_state({
121              "step1": {"output": {"x": 1}}
122          })
123  
124          with pytest.raises(ValueError, match="Output field 'nonexistent' not found"):
125              executor.resolve_value("{{step1.output.nonexistent}}", state)
126  
127  
128  class TestResolveLiteralValues:
129      """Tests for literal value passthrough."""
130  
131      def test_literal_string_passthrough(self):
132          """Non-template strings are returned unchanged."""
133          executor = create_minimal_dag_executor()
134          state = create_workflow_state({})
135  
136          result = executor.resolve_value("hello world", state)
137          assert result == "hello world"
138  
139      def test_literal_number_passthrough(self):
140          """Numbers are returned unchanged."""
141          executor = create_minimal_dag_executor()
142          state = create_workflow_state({})
143  
144          assert executor.resolve_value(42, state) == 42
145          assert executor.resolve_value(3.14, state) == 3.14
146  
147      def test_literal_bool_passthrough(self):
148          """Booleans are returned unchanged."""
149          executor = create_minimal_dag_executor()
150          state = create_workflow_state({})
151  
152          assert executor.resolve_value(True, state) is True
153          assert executor.resolve_value(False, state) is False
154  
155      def test_literal_none_passthrough(self):
156          """None is returned unchanged."""
157          executor = create_minimal_dag_executor()
158          state = create_workflow_state({})
159  
160          result = executor.resolve_value(None, state)
161          assert result is None
162  
163      def test_literal_dict_passthrough(self):
164          """Plain dicts (not operators) are returned unchanged."""
165          executor = create_minimal_dag_executor()
166          state = create_workflow_state({})
167  
168          # Dict with multiple keys is not an operator, passed through
169          input_dict = {"key1": "value1", "key2": "value2"}
170          result = executor.resolve_value(input_dict, state)
171          assert result == input_dict
172  
173  
174  class TestCoalesceOperator:
175      """Tests for the coalesce operator."""
176  
177      def test_coalesce_returns_first_non_null(self):
178          """Coalesce returns first non-null value."""
179          executor = create_minimal_dag_executor()
180          state = create_workflow_state({
181              "workflow_input": {"output": {}}
182          })
183  
184          result = executor.resolve_value(
185              {"coalesce": [None, "fallback", "ignored"]},
186              state
187          )
188          assert result == "fallback"
189  
190      def test_coalesce_with_template_resolution(self):
191          """Coalesce resolves templates before checking null."""
192          executor = create_minimal_dag_executor()
193          state = create_workflow_state({
194              "workflow_input": {"output": {"optional": None, "default": "default_val"}}
195          })
196  
197          result = executor.resolve_value(
198              {"coalesce": ["{{workflow.input.optional}}", "{{workflow.input.default}}"]},
199              state
200          )
201          assert result == "default_val"
202  
203      def test_coalesce_all_null_returns_none(self):
204          """Coalesce returns None if all values are null."""
205          executor = create_minimal_dag_executor()
206          state = create_workflow_state({
207              "workflow_input": {"output": {}}
208          })
209  
210          result = executor.resolve_value(
211              {"coalesce": [None, None]},
212              state
213          )
214          assert result is None
215  
216      def test_coalesce_requires_list(self):
217          """Coalesce operator requires a list argument."""
218          executor = create_minimal_dag_executor()
219          state = create_workflow_state({})
220  
221          with pytest.raises(ValueError, match="'coalesce' operator requires a list"):
222              executor.resolve_value({"coalesce": "not_a_list"}, state)
223  
224  
225  class TestConcatOperator:
226      """Tests for the concat operator."""
227  
228      def test_concat_joins_strings(self):
229          """Concat joins string values."""
230          executor = create_minimal_dag_executor()
231          state = create_workflow_state({})
232  
233          result = executor.resolve_value(
234              {"concat": ["hello", " ", "world"]},
235              state
236          )
237          assert result == "hello world"
238  
239      def test_concat_with_template_resolution(self):
240          """Concat resolves templates before joining."""
241          executor = create_minimal_dag_executor()
242          state = create_workflow_state({
243              "workflow_input": {"output": {"name": "Alice"}}
244          })
245  
246          result = executor.resolve_value(
247              {"concat": ["Hello, ", "{{workflow.input.name}}", "!"]},
248              state
249          )
250          assert result == "Hello, Alice!"
251  
252      def test_concat_converts_numbers_to_string(self):
253          """Concat converts non-string values to strings."""
254          executor = create_minimal_dag_executor()
255          state = create_workflow_state({})
256  
257          result = executor.resolve_value(
258              {"concat": ["Value: ", 42]},
259              state
260          )
261          assert result == "Value: 42"
262  
263      def test_concat_skips_none_values(self):
264          """Concat skips None values in the list."""
265          executor = create_minimal_dag_executor()
266          state = create_workflow_state({})
267  
268          result = executor.resolve_value(
269              {"concat": ["a", None, "b"]},
270              state
271          )
272          assert result == "ab"
273  
274      def test_concat_requires_list(self):
275          """Concat operator requires a list argument."""
276          executor = create_minimal_dag_executor()
277          state = create_workflow_state({})
278  
279          with pytest.raises(ValueError, match="'concat' operator requires a list"):
280              executor.resolve_value({"concat": "not_a_list"}, state)
281  
282  
283  class TestMapLoopVariables:
284      """Tests for map/loop special variables."""
285  
286      def test_resolve_map_item(self):
287          """{{_map_item}} resolves to the current map iteration item."""
288          executor = create_minimal_dag_executor()
289          state = create_workflow_state({
290              "_map_item": {"output": {"id": "item_123", "value": 100}}
291          })
292  
293          result = executor.resolve_value("{{_map_item.id}}", state)
294          assert result == "item_123"
295  
296      def test_resolve_map_index(self):
297          """{{_map_index}} resolves to the current map iteration index."""
298          executor = create_minimal_dag_executor()
299          state = create_workflow_state({
300              "_map_index": {"output": 5}
301          })
302  
303          result = executor.resolve_value("{{_map_index}}", state)
304          assert result == 5
305  
306      def test_argo_item_alias(self):
307          """{{item}} is aliased to {{_map_item}} for Argo compatibility."""
308          executor = create_minimal_dag_executor()
309          state = create_workflow_state({
310              "_map_item": {"output": "current_item_value"}
311          })
312  
313          result = executor.resolve_value("{{item}}", state)
314          assert result == "current_item_value"
315  
316      def test_argo_item_field_alias(self):
317          """{{item.field}} is aliased to {{_map_item.field}}."""
318          executor = create_minimal_dag_executor()
319          state = create_workflow_state({
320              "_map_item": {"output": {"name": "test_name"}}
321          })
322  
323          result = executor.resolve_value("{{item.name}}", state)
324          assert result == "test_name"
325  
326  
327  class TestArgoParametersAlias:
328      """Tests for Argo workflow.parameters alias."""
329  
330      def test_workflow_parameters_alias(self):
331          """{{workflow.parameters.x}} is aliased to {{workflow.input.x}}."""
332          executor = create_minimal_dag_executor()
333          state = create_workflow_state({
334              "workflow_input": {"output": {"x": "argo_style_value"}}
335          })
336  
337          result = executor.resolve_value("{{workflow.parameters.x}}", state)
338          assert result == "argo_style_value"