test_template_resolution.py
1 """ 2 Unit tests for template resolution in workflow DAG executor. 3 4 Tests the resolve_value() and _resolve_template() methods which are pure functions 5 that transform template strings into resolved values based on workflow state. 6 """ 7 8 import pytest 9 from datetime import datetime, timezone 10 from unittest.mock import Mock 11 12 from solace_agent_mesh.workflow.dag_executor import DAGExecutor 13 from solace_agent_mesh.workflow.workflow_execution_context import WorkflowExecutionState 14 from solace_agent_mesh.workflow.app import WorkflowDefinition, AgentNode 15 16 17 def create_minimal_dag_executor() -> DAGExecutor: 18 """Create a minimal DAGExecutor for testing resolve_value.""" 19 # Minimal workflow definition with one node 20 workflow_def = WorkflowDefinition( 21 description="Test workflow", 22 nodes=[ 23 AgentNode(id="test_node", type="agent", agent_name="TestAgent") 24 ], 25 output_mapping={"result": "{{test_node.output}}"}, 26 ) 27 # Mock the host component - not used by resolve_value 28 mock_host = Mock() 29 return DAGExecutor(workflow_def, mock_host) 30 31 32 def create_workflow_state(node_outputs: dict) -> WorkflowExecutionState: 33 """Create a WorkflowExecutionState with given node outputs.""" 34 return WorkflowExecutionState( 35 workflow_name="test_workflow", 36 execution_id="test_exec_001", 37 start_time=datetime.now(timezone.utc), 38 node_outputs=node_outputs, 39 ) 40 41 42 class TestResolveWorkflowInput: 43 """Tests for resolving {{workflow.input.*}} templates.""" 44 45 def test_resolve_workflow_input_simple(self): 46 """{{workflow.input.x}} resolves to the value from workflow input.""" 47 executor = create_minimal_dag_executor() 48 state = create_workflow_state({ 49 "workflow_input": {"output": {"x": 42}} 50 }) 51 52 result = executor.resolve_value("{{workflow.input.x}}", state) 53 assert result == 42 54 55 def test_resolve_workflow_input_nested(self): 56 """{{workflow.input.a.b.c}} resolves nested paths.""" 57 executor = create_minimal_dag_executor() 58 state = create_workflow_state({ 59 "workflow_input": {"output": {"a": {"b": {"c": "deep_value"}}}} 60 }) 61 62 result = executor.resolve_value("{{workflow.input.a.b.c}}", state) 63 assert result == "deep_value" 64 65 def test_resolve_workflow_input_missing_field_returns_none(self): 66 """Missing workflow input field returns None (for coalesce support).""" 67 executor = create_minimal_dag_executor() 68 state = create_workflow_state({ 69 "workflow_input": {"output": {"x": 1}} 70 }) 71 72 result = executor.resolve_value("{{workflow.input.nonexistent}}", state) 73 assert result is None 74 75 def test_resolve_workflow_input_not_initialized_raises(self): 76 """Referencing workflow input before initialization raises ValueError.""" 77 executor = create_minimal_dag_executor() 78 state = create_workflow_state({}) # No workflow_input 79 80 with pytest.raises(ValueError, match="Workflow input has not been initialized"): 81 executor.resolve_value("{{workflow.input.x}}", state) 82 83 84 class TestResolveNodeOutput: 85 """Tests for resolving {{node.output.*}} templates.""" 86 87 def test_resolve_node_output_simple(self): 88 """{{node.output.field}} resolves to the value from node output.""" 89 executor = create_minimal_dag_executor() 90 state = create_workflow_state({ 91 "step1": {"output": {"result": "success"}} 92 }) 93 94 result = executor.resolve_value("{{step1.output.result}}", state) 95 assert result == "success" 96 97 def test_resolve_node_output_nested(self): 98 """{{node.output.a.b}} resolves nested paths in node output.""" 99 executor = create_minimal_dag_executor() 100 state = create_workflow_state({ 101 "process_node": {"output": {"data": {"items": [1, 2, 3]}}} 102 }) 103 104 result = executor.resolve_value("{{process_node.output.data.items}}", state) 105 assert result == [1, 2, 3] 106 107 def test_resolve_node_output_missing_node_returns_none(self): 108 """Referencing non-existent node returns None (for skipped nodes).""" 109 executor = create_minimal_dag_executor() 110 state = create_workflow_state({ 111 "existing_node": {"output": {"x": 1}} 112 }) 113 114 result = executor.resolve_value("{{nonexistent_node.output.x}}", state) 115 assert result is None 116 117 def test_resolve_node_output_missing_field_raises(self): 118 """Missing field in existing node output raises ValueError.""" 119 executor = create_minimal_dag_executor() 120 state = create_workflow_state({ 121 "step1": {"output": {"x": 1}} 122 }) 123 124 with pytest.raises(ValueError, match="Output field 'nonexistent' not found"): 125 executor.resolve_value("{{step1.output.nonexistent}}", state) 126 127 128 class TestResolveLiteralValues: 129 """Tests for literal value passthrough.""" 130 131 def test_literal_string_passthrough(self): 132 """Non-template strings are returned unchanged.""" 133 executor = create_minimal_dag_executor() 134 state = create_workflow_state({}) 135 136 result = executor.resolve_value("hello world", state) 137 assert result == "hello world" 138 139 def test_literal_number_passthrough(self): 140 """Numbers are returned unchanged.""" 141 executor = create_minimal_dag_executor() 142 state = create_workflow_state({}) 143 144 assert executor.resolve_value(42, state) == 42 145 assert executor.resolve_value(3.14, state) == 3.14 146 147 def test_literal_bool_passthrough(self): 148 """Booleans are returned unchanged.""" 149 executor = create_minimal_dag_executor() 150 state = create_workflow_state({}) 151 152 assert executor.resolve_value(True, state) is True 153 assert executor.resolve_value(False, state) is False 154 155 def test_literal_none_passthrough(self): 156 """None is returned unchanged.""" 157 executor = create_minimal_dag_executor() 158 state = create_workflow_state({}) 159 160 result = executor.resolve_value(None, state) 161 assert result is None 162 163 def test_literal_dict_passthrough(self): 164 """Plain dicts (not operators) are returned unchanged.""" 165 executor = create_minimal_dag_executor() 166 state = create_workflow_state({}) 167 168 # Dict with multiple keys is not an operator, passed through 169 input_dict = {"key1": "value1", "key2": "value2"} 170 result = executor.resolve_value(input_dict, state) 171 assert result == input_dict 172 173 174 class TestCoalesceOperator: 175 """Tests for the coalesce operator.""" 176 177 def test_coalesce_returns_first_non_null(self): 178 """Coalesce returns first non-null value.""" 179 executor = create_minimal_dag_executor() 180 state = create_workflow_state({ 181 "workflow_input": {"output": {}} 182 }) 183 184 result = executor.resolve_value( 185 {"coalesce": [None, "fallback", "ignored"]}, 186 state 187 ) 188 assert result == "fallback" 189 190 def test_coalesce_with_template_resolution(self): 191 """Coalesce resolves templates before checking null.""" 192 executor = create_minimal_dag_executor() 193 state = create_workflow_state({ 194 "workflow_input": {"output": {"optional": None, "default": "default_val"}} 195 }) 196 197 result = executor.resolve_value( 198 {"coalesce": ["{{workflow.input.optional}}", "{{workflow.input.default}}"]}, 199 state 200 ) 201 assert result == "default_val" 202 203 def test_coalesce_all_null_returns_none(self): 204 """Coalesce returns None if all values are null.""" 205 executor = create_minimal_dag_executor() 206 state = create_workflow_state({ 207 "workflow_input": {"output": {}} 208 }) 209 210 result = executor.resolve_value( 211 {"coalesce": [None, None]}, 212 state 213 ) 214 assert result is None 215 216 def test_coalesce_requires_list(self): 217 """Coalesce operator requires a list argument.""" 218 executor = create_minimal_dag_executor() 219 state = create_workflow_state({}) 220 221 with pytest.raises(ValueError, match="'coalesce' operator requires a list"): 222 executor.resolve_value({"coalesce": "not_a_list"}, state) 223 224 225 class TestConcatOperator: 226 """Tests for the concat operator.""" 227 228 def test_concat_joins_strings(self): 229 """Concat joins string values.""" 230 executor = create_minimal_dag_executor() 231 state = create_workflow_state({}) 232 233 result = executor.resolve_value( 234 {"concat": ["hello", " ", "world"]}, 235 state 236 ) 237 assert result == "hello world" 238 239 def test_concat_with_template_resolution(self): 240 """Concat resolves templates before joining.""" 241 executor = create_minimal_dag_executor() 242 state = create_workflow_state({ 243 "workflow_input": {"output": {"name": "Alice"}} 244 }) 245 246 result = executor.resolve_value( 247 {"concat": ["Hello, ", "{{workflow.input.name}}", "!"]}, 248 state 249 ) 250 assert result == "Hello, Alice!" 251 252 def test_concat_converts_numbers_to_string(self): 253 """Concat converts non-string values to strings.""" 254 executor = create_minimal_dag_executor() 255 state = create_workflow_state({}) 256 257 result = executor.resolve_value( 258 {"concat": ["Value: ", 42]}, 259 state 260 ) 261 assert result == "Value: 42" 262 263 def test_concat_skips_none_values(self): 264 """Concat skips None values in the list.""" 265 executor = create_minimal_dag_executor() 266 state = create_workflow_state({}) 267 268 result = executor.resolve_value( 269 {"concat": ["a", None, "b"]}, 270 state 271 ) 272 assert result == "ab" 273 274 def test_concat_requires_list(self): 275 """Concat operator requires a list argument.""" 276 executor = create_minimal_dag_executor() 277 state = create_workflow_state({}) 278 279 with pytest.raises(ValueError, match="'concat' operator requires a list"): 280 executor.resolve_value({"concat": "not_a_list"}, state) 281 282 283 class TestMapLoopVariables: 284 """Tests for map/loop special variables.""" 285 286 def test_resolve_map_item(self): 287 """{{_map_item}} resolves to the current map iteration item.""" 288 executor = create_minimal_dag_executor() 289 state = create_workflow_state({ 290 "_map_item": {"output": {"id": "item_123", "value": 100}} 291 }) 292 293 result = executor.resolve_value("{{_map_item.id}}", state) 294 assert result == "item_123" 295 296 def test_resolve_map_index(self): 297 """{{_map_index}} resolves to the current map iteration index.""" 298 executor = create_minimal_dag_executor() 299 state = create_workflow_state({ 300 "_map_index": {"output": 5} 301 }) 302 303 result = executor.resolve_value("{{_map_index}}", state) 304 assert result == 5 305 306 def test_argo_item_alias(self): 307 """{{item}} is aliased to {{_map_item}} for Argo compatibility.""" 308 executor = create_minimal_dag_executor() 309 state = create_workflow_state({ 310 "_map_item": {"output": "current_item_value"} 311 }) 312 313 result = executor.resolve_value("{{item}}", state) 314 assert result == "current_item_value" 315 316 def test_argo_item_field_alias(self): 317 """{{item.field}} is aliased to {{_map_item.field}}.""" 318 executor = create_minimal_dag_executor() 319 state = create_workflow_state({ 320 "_map_item": {"output": {"name": "test_name"}} 321 }) 322 323 result = executor.resolve_value("{{item.name}}", state) 324 assert result == "test_name" 325 326 327 class TestArgoParametersAlias: 328 """Tests for Argo workflow.parameters alias.""" 329 330 def test_workflow_parameters_alias(self): 331 """{{workflow.parameters.x}} is aliased to {{workflow.input.x}}.""" 332 executor = create_minimal_dag_executor() 333 state = create_workflow_state({ 334 "workflow_input": {"output": {"x": "argo_style_value"}} 335 }) 336 337 result = executor.resolve_value("{{workflow.parameters.x}}", state) 338 assert result == "argo_style_value"