test_map_loop_workflow_targets.py
1 """ 2 Unit tests for map and loop nodes with workflow targets. 3 4 Tests that map and loop nodes correctly handle both agent nodes and workflow nodes 5 as their target nodes, ensuring proper node type detection and execution. 6 """ 7 8 import pytest 9 from pydantic import ValidationError 10 11 from solace_agent_mesh.workflow.app import ( 12 WorkflowDefinition, 13 AgentNode, 14 MapNode, 15 LoopNode, 16 WorkflowInvokeNode, 17 ) 18 19 20 class TestMapNodeWithAgentTarget: 21 """Tests for map nodes with agent targets.""" 22 23 def test_map_with_agent_target_valid(self): 24 """Map node with agent target should parse correctly.""" 25 workflow = WorkflowDefinition( 26 description="Map with agent target", 27 nodes=[ 28 AgentNode(id="prepare", type="agent", agent_name="PrepareAgent"), 29 MapNode( 30 id="process_all", 31 type="map", 32 node="process_item", 33 items="{{prepare.output.items}}", 34 depends_on=["prepare"], 35 ), 36 AgentNode(id="process_item", type="agent", agent_name="ProcessAgent"), 37 ], 38 output_mapping={"results": "{{process_all.output}}"}, 39 ) 40 41 assert workflow.nodes[1].node == "process_item" 42 assert workflow.nodes[2].agent_name == "ProcessAgent" 43 assert len(workflow.nodes) == 3 44 45 46 class TestMapNodeWithWorkflowTarget: 47 """Tests for map nodes with workflow targets.""" 48 49 def test_map_with_workflow_target_valid(self): 50 """Map node with workflow target should parse correctly.""" 51 workflow = WorkflowDefinition( 52 description="Map with workflow target", 53 nodes=[ 54 AgentNode(id="prepare", type="agent", agent_name="PrepareAgent"), 55 MapNode( 56 id="process_all", 57 type="map", 58 node="process_workflow", 59 items="{{prepare.output.items}}", 60 depends_on=["prepare"], 61 ), 62 WorkflowInvokeNode( 63 id="process_workflow", 64 type="workflow", 65 workflow_name="ProcessWorkflow", 66 ), 67 ], 68 output_mapping={"results": "{{process_all.output}}"}, 69 ) 70 71 assert workflow.nodes[1].node == "process_workflow" 72 assert workflow.nodes[2].workflow_name == "ProcessWorkflow" 73 assert len(workflow.nodes) == 3 74 75 def test_map_with_workflow_target_has_correct_type(self): 76 """Map node target can be identified as workflow type.""" 77 workflow = WorkflowDefinition( 78 description="Map with workflow target", 79 nodes=[ 80 MapNode( 81 id="map_workflows", 82 type="map", 83 node="sub_workflow", 84 items="{{workflow.input.items}}", 85 ), 86 WorkflowInvokeNode( 87 id="sub_workflow", 88 type="workflow", 89 workflow_name="SubWorkflow", 90 ), 91 ], 92 output_mapping={"results": "{{map_workflows.output}}"}, 93 ) 94 95 target_node = workflow.nodes[1] 96 assert target_node.type == "workflow" 97 assert isinstance(target_node, WorkflowInvokeNode) 98 99 100 class TestLoopNodeWithAgentTarget: 101 """Tests for loop nodes with agent targets.""" 102 103 def test_loop_with_agent_target_valid(self): 104 """Loop node with agent target should parse correctly.""" 105 workflow = WorkflowDefinition( 106 description="Loop with agent target", 107 nodes=[ 108 LoopNode( 109 id="retry_agent", 110 type="loop", 111 node="attempt", 112 condition="{{retry_agent.output.retry}} == true", 113 ), 114 AgentNode(id="attempt", type="agent", agent_name="AttemptAgent"), 115 ], 116 output_mapping={"result": "{{attempt.output}}"}, 117 ) 118 119 assert workflow.nodes[0].node == "attempt" 120 assert workflow.nodes[1].agent_name == "AttemptAgent" 121 122 123 class TestLoopNodeWithWorkflowTarget: 124 """Tests for loop nodes with workflow targets.""" 125 126 def test_loop_with_workflow_target_valid(self): 127 """Loop node with workflow target should parse correctly.""" 128 workflow = WorkflowDefinition( 129 description="Loop with workflow target", 130 nodes=[ 131 LoopNode( 132 id="retry_workflow", 133 type="loop", 134 node="retry_sub", 135 condition="{{retry_workflow.output.should_retry}} == true", 136 ), 137 WorkflowInvokeNode( 138 id="retry_sub", 139 type="workflow", 140 workflow_name="RetryWorkflow", 141 ), 142 ], 143 output_mapping={"result": "{{retry_sub.output}}"}, 144 ) 145 146 assert workflow.nodes[0].node == "retry_sub" 147 assert workflow.nodes[1].workflow_name == "RetryWorkflow" 148 149 def test_loop_with_workflow_target_has_correct_type(self): 150 """Loop node target can be identified as workflow type.""" 151 workflow = WorkflowDefinition( 152 description="Loop with workflow target", 153 nodes=[ 154 LoopNode( 155 id="loop_workflows", 156 type="loop", 157 node="sub_workflow", 158 condition="{{loop_workflows.output.continue}} == true", 159 ), 160 WorkflowInvokeNode( 161 id="sub_workflow", 162 type="workflow", 163 workflow_name="SubWorkflow", 164 ), 165 ], 166 output_mapping={"result": "{{sub_workflow.output}}"}, 167 ) 168 169 target_node = workflow.nodes[1] 170 assert target_node.type == "workflow" 171 assert isinstance(target_node, WorkflowInvokeNode) 172 173 174 class TestMixedNodeTypesInMapLoop: 175 """Tests for workflows mixing map/loop nodes with both agent and workflow targets.""" 176 177 def test_workflow_with_map_agent_and_loop_workflow(self): 178 """Workflow can have both map with agent target and loop with workflow target.""" 179 workflow = WorkflowDefinition( 180 description="Mixed map and loop with different targets", 181 nodes=[ 182 AgentNode(id="start", type="agent", agent_name="StartAgent"), 183 MapNode( 184 id="process_items", 185 type="map", 186 node="process_agent", 187 items="{{start.output.items}}", 188 depends_on=["start"], 189 ), 190 AgentNode(id="process_agent", type="agent", agent_name="ProcessAgent"), 191 LoopNode( 192 id="retry_workflow", 193 type="loop", 194 node="retry_wf", 195 condition="{{process_items.output.needs_retry}} == true", 196 depends_on=["process_items"], 197 ), 198 WorkflowInvokeNode( 199 id="retry_wf", 200 type="workflow", 201 workflow_name="RetryWorkflow", 202 ), 203 ], 204 output_mapping={"final": "{{retry_wf.output}}"}, 205 ) 206 207 # Verify map target is agent 208 map_node = workflow.nodes[1] 209 map_target = workflow.nodes[2] 210 assert map_node.type == "map" 211 assert map_target.type == "agent" 212 assert map_target.agent_name == "ProcessAgent" 213 214 # Verify loop target is workflow 215 loop_node = workflow.nodes[3] 216 loop_target = workflow.nodes[4] 217 assert loop_node.type == "loop" 218 assert loop_target.type == "workflow" 219 assert loop_target.workflow_name == "RetryWorkflow" 220 221 222 class TestNodeTypeDetection: 223 """Tests for correctly detecting node types in map/loop contexts.""" 224 225 def test_workflow_node_lacks_agent_name(self): 226 """WorkflowInvokeNode should not have agent_name attribute.""" 227 workflow_node = WorkflowInvokeNode( 228 id="invoke_wf", 229 type="workflow", 230 workflow_name="MyWorkflow", 231 ) 232 233 # WorkflowInvokeNode should not have agent_name 234 assert not hasattr(workflow_node, "agent_name") or workflow_node.agent_name is None 235 assert workflow_node.workflow_name == "MyWorkflow" 236 237 def test_agent_node_has_agent_name(self): 238 """AgentNode should have agent_name attribute.""" 239 agent_node = AgentNode( 240 id="invoke_agent", 241 type="agent", 242 agent_name="MyAgent", 243 ) 244 245 assert agent_node.agent_name == "MyAgent" 246 assert hasattr(agent_node, "agent_name") 247 248 def test_target_node_type_can_be_determined(self): 249 """Target node type can be determined from node.type.""" 250 workflow = WorkflowDefinition( 251 description="Test node type determination", 252 nodes=[ 253 MapNode( 254 id="map1", 255 type="map", 256 node="target1", 257 items="{{workflow.input.items}}", 258 ), 259 AgentNode(id="target1", type="agent", agent_name="Agent1"), 260 MapNode( 261 id="map2", 262 type="map", 263 node="target2", 264 items="{{workflow.input.items}}", 265 ), 266 WorkflowInvokeNode( 267 id="target2", 268 type="workflow", 269 workflow_name="Workflow1", 270 ), 271 ], 272 output_mapping={"r1": "{{target1.output}}", "r2": "{{target2.output}}"}, 273 ) 274 275 # Get nodes by their IDs 276 node_map = {node.id: node for node in workflow.nodes} 277 278 # Verify we can determine types 279 assert node_map["target1"].type == "agent" 280 assert node_map["target2"].type == "workflow" 281 282 283 class TestWorkflowTargetInMapValidation: 284 """Tests for validation of workflow targets in map nodes.""" 285 286 def test_map_with_valid_workflow_reference(self): 287 """Map can reference an existing workflow node.""" 288 workflow = WorkflowDefinition( 289 description="Map with valid workflow reference", 290 nodes=[ 291 MapNode( 292 id="map_workflows", 293 type="map", 294 node="target_workflow", 295 items="{{workflow.input.items}}", 296 ), 297 WorkflowInvokeNode( 298 id="target_workflow", 299 type="workflow", 300 workflow_name="TargetWorkflow", 301 input={"item": "{{_map_item.output}}"}, 302 ), 303 ], 304 output_mapping={"results": "{{map_workflows.output}}"}, 305 ) 306 307 assert workflow.nodes[0].node == "target_workflow" 308 assert len(workflow.nodes) == 2 309 310 def test_workflow_with_input_mapping_in_map(self): 311 """Workflow target in map can have input mapping using _map_item.""" 312 workflow = WorkflowDefinition( 313 description="Workflow in map with input mapping", 314 nodes=[ 315 MapNode( 316 id="map_wf", 317 type="map", 318 node="sub_wf", 319 items="{{workflow.input.items}}", 320 ), 321 WorkflowInvokeNode( 322 id="sub_wf", 323 type="workflow", 324 workflow_name="SubWorkflow", 325 input={ 326 "current_item": "{{_map_item.output}}", 327 "index": "{{_map_index.output}}", 328 }, 329 ), 330 ], 331 output_mapping={"mapped_results": "{{map_wf.output}}"}, 332 ) 333 334 target_node = workflow.nodes[1] 335 assert target_node.input is not None 336 assert "current_item" in target_node.input 337 assert target_node.input["index"] == "{{_map_index.output}}"