/ tests / unit / workflow / test_map_loop_workflow_targets.py
test_map_loop_workflow_targets.py
  1  """
  2  Unit tests for map and loop nodes with workflow targets.
  3  
  4  Tests that map and loop nodes correctly handle both agent nodes and workflow nodes
  5  as their target nodes, ensuring proper node type detection and execution.
  6  """
  7  
  8  import pytest
  9  from pydantic import ValidationError
 10  
 11  from solace_agent_mesh.workflow.app import (
 12      WorkflowDefinition,
 13      AgentNode,
 14      MapNode,
 15      LoopNode,
 16      WorkflowInvokeNode,
 17  )
 18  
 19  
 20  class TestMapNodeWithAgentTarget:
 21      """Tests for map nodes with agent targets."""
 22  
 23      def test_map_with_agent_target_valid(self):
 24          """Map node with agent target should parse correctly."""
 25          workflow = WorkflowDefinition(
 26              description="Map with agent target",
 27              nodes=[
 28                  AgentNode(id="prepare", type="agent", agent_name="PrepareAgent"),
 29                  MapNode(
 30                      id="process_all",
 31                      type="map",
 32                      node="process_item",
 33                      items="{{prepare.output.items}}",
 34                      depends_on=["prepare"],
 35                  ),
 36                  AgentNode(id="process_item", type="agent", agent_name="ProcessAgent"),
 37              ],
 38              output_mapping={"results": "{{process_all.output}}"},
 39          )
 40  
 41          assert workflow.nodes[1].node == "process_item"
 42          assert workflow.nodes[2].agent_name == "ProcessAgent"
 43          assert len(workflow.nodes) == 3
 44  
 45  
 46  class TestMapNodeWithWorkflowTarget:
 47      """Tests for map nodes with workflow targets."""
 48  
 49      def test_map_with_workflow_target_valid(self):
 50          """Map node with workflow target should parse correctly."""
 51          workflow = WorkflowDefinition(
 52              description="Map with workflow target",
 53              nodes=[
 54                  AgentNode(id="prepare", type="agent", agent_name="PrepareAgent"),
 55                  MapNode(
 56                      id="process_all",
 57                      type="map",
 58                      node="process_workflow",
 59                      items="{{prepare.output.items}}",
 60                      depends_on=["prepare"],
 61                  ),
 62                  WorkflowInvokeNode(
 63                      id="process_workflow",
 64                      type="workflow",
 65                      workflow_name="ProcessWorkflow",
 66                  ),
 67              ],
 68              output_mapping={"results": "{{process_all.output}}"},
 69          )
 70  
 71          assert workflow.nodes[1].node == "process_workflow"
 72          assert workflow.nodes[2].workflow_name == "ProcessWorkflow"
 73          assert len(workflow.nodes) == 3
 74  
 75      def test_map_with_workflow_target_has_correct_type(self):
 76          """Map node target can be identified as workflow type."""
 77          workflow = WorkflowDefinition(
 78              description="Map with workflow target",
 79              nodes=[
 80                  MapNode(
 81                      id="map_workflows",
 82                      type="map",
 83                      node="sub_workflow",
 84                      items="{{workflow.input.items}}",
 85                  ),
 86                  WorkflowInvokeNode(
 87                      id="sub_workflow",
 88                      type="workflow",
 89                      workflow_name="SubWorkflow",
 90                  ),
 91              ],
 92              output_mapping={"results": "{{map_workflows.output}}"},
 93          )
 94  
 95          target_node = workflow.nodes[1]
 96          assert target_node.type == "workflow"
 97          assert isinstance(target_node, WorkflowInvokeNode)
 98  
 99  
100  class TestLoopNodeWithAgentTarget:
101      """Tests for loop nodes with agent targets."""
102  
103      def test_loop_with_agent_target_valid(self):
104          """Loop node with agent target should parse correctly."""
105          workflow = WorkflowDefinition(
106              description="Loop with agent target",
107              nodes=[
108                  LoopNode(
109                      id="retry_agent",
110                      type="loop",
111                      node="attempt",
112                      condition="{{retry_agent.output.retry}} == true",
113                  ),
114                  AgentNode(id="attempt", type="agent", agent_name="AttemptAgent"),
115              ],
116              output_mapping={"result": "{{attempt.output}}"},
117          )
118  
119          assert workflow.nodes[0].node == "attempt"
120          assert workflow.nodes[1].agent_name == "AttemptAgent"
121  
122  
123  class TestLoopNodeWithWorkflowTarget:
124      """Tests for loop nodes with workflow targets."""
125  
126      def test_loop_with_workflow_target_valid(self):
127          """Loop node with workflow target should parse correctly."""
128          workflow = WorkflowDefinition(
129              description="Loop with workflow target",
130              nodes=[
131                  LoopNode(
132                      id="retry_workflow",
133                      type="loop",
134                      node="retry_sub",
135                      condition="{{retry_workflow.output.should_retry}} == true",
136                  ),
137                  WorkflowInvokeNode(
138                      id="retry_sub",
139                      type="workflow",
140                      workflow_name="RetryWorkflow",
141                  ),
142              ],
143              output_mapping={"result": "{{retry_sub.output}}"},
144          )
145  
146          assert workflow.nodes[0].node == "retry_sub"
147          assert workflow.nodes[1].workflow_name == "RetryWorkflow"
148  
149      def test_loop_with_workflow_target_has_correct_type(self):
150          """Loop node target can be identified as workflow type."""
151          workflow = WorkflowDefinition(
152              description="Loop with workflow target",
153              nodes=[
154                  LoopNode(
155                      id="loop_workflows",
156                      type="loop",
157                      node="sub_workflow",
158                      condition="{{loop_workflows.output.continue}} == true",
159                  ),
160                  WorkflowInvokeNode(
161                      id="sub_workflow",
162                      type="workflow",
163                      workflow_name="SubWorkflow",
164                  ),
165              ],
166              output_mapping={"result": "{{sub_workflow.output}}"},
167          )
168  
169          target_node = workflow.nodes[1]
170          assert target_node.type == "workflow"
171          assert isinstance(target_node, WorkflowInvokeNode)
172  
173  
174  class TestMixedNodeTypesInMapLoop:
175      """Tests for workflows mixing map/loop nodes with both agent and workflow targets."""
176  
177      def test_workflow_with_map_agent_and_loop_workflow(self):
178          """Workflow can have both map with agent target and loop with workflow target."""
179          workflow = WorkflowDefinition(
180              description="Mixed map and loop with different targets",
181              nodes=[
182                  AgentNode(id="start", type="agent", agent_name="StartAgent"),
183                  MapNode(
184                      id="process_items",
185                      type="map",
186                      node="process_agent",
187                      items="{{start.output.items}}",
188                      depends_on=["start"],
189                  ),
190                  AgentNode(id="process_agent", type="agent", agent_name="ProcessAgent"),
191                  LoopNode(
192                      id="retry_workflow",
193                      type="loop",
194                      node="retry_wf",
195                      condition="{{process_items.output.needs_retry}} == true",
196                      depends_on=["process_items"],
197                  ),
198                  WorkflowInvokeNode(
199                      id="retry_wf",
200                      type="workflow",
201                      workflow_name="RetryWorkflow",
202                  ),
203              ],
204              output_mapping={"final": "{{retry_wf.output}}"},
205          )
206  
207          # Verify map target is agent
208          map_node = workflow.nodes[1]
209          map_target = workflow.nodes[2]
210          assert map_node.type == "map"
211          assert map_target.type == "agent"
212          assert map_target.agent_name == "ProcessAgent"
213  
214          # Verify loop target is workflow
215          loop_node = workflow.nodes[3]
216          loop_target = workflow.nodes[4]
217          assert loop_node.type == "loop"
218          assert loop_target.type == "workflow"
219          assert loop_target.workflow_name == "RetryWorkflow"
220  
221  
222  class TestNodeTypeDetection:
223      """Tests for correctly detecting node types in map/loop contexts."""
224  
225      def test_workflow_node_lacks_agent_name(self):
226          """WorkflowInvokeNode should not have agent_name attribute."""
227          workflow_node = WorkflowInvokeNode(
228              id="invoke_wf",
229              type="workflow",
230              workflow_name="MyWorkflow",
231          )
232  
233          # WorkflowInvokeNode should not have agent_name
234          assert not hasattr(workflow_node, "agent_name") or workflow_node.agent_name is None
235          assert workflow_node.workflow_name == "MyWorkflow"
236  
237      def test_agent_node_has_agent_name(self):
238          """AgentNode should have agent_name attribute."""
239          agent_node = AgentNode(
240              id="invoke_agent",
241              type="agent",
242              agent_name="MyAgent",
243          )
244  
245          assert agent_node.agent_name == "MyAgent"
246          assert hasattr(agent_node, "agent_name")
247  
248      def test_target_node_type_can_be_determined(self):
249          """Target node type can be determined from node.type."""
250          workflow = WorkflowDefinition(
251              description="Test node type determination",
252              nodes=[
253                  MapNode(
254                      id="map1",
255                      type="map",
256                      node="target1",
257                      items="{{workflow.input.items}}",
258                  ),
259                  AgentNode(id="target1", type="agent", agent_name="Agent1"),
260                  MapNode(
261                      id="map2",
262                      type="map",
263                      node="target2",
264                      items="{{workflow.input.items}}",
265                  ),
266                  WorkflowInvokeNode(
267                      id="target2",
268                      type="workflow",
269                      workflow_name="Workflow1",
270                  ),
271              ],
272              output_mapping={"r1": "{{target1.output}}", "r2": "{{target2.output}}"},
273          )
274  
275          # Get nodes by their IDs
276          node_map = {node.id: node for node in workflow.nodes}
277  
278          # Verify we can determine types
279          assert node_map["target1"].type == "agent"
280          assert node_map["target2"].type == "workflow"
281  
282  
283  class TestWorkflowTargetInMapValidation:
284      """Tests for validation of workflow targets in map nodes."""
285  
286      def test_map_with_valid_workflow_reference(self):
287          """Map can reference an existing workflow node."""
288          workflow = WorkflowDefinition(
289              description="Map with valid workflow reference",
290              nodes=[
291                  MapNode(
292                      id="map_workflows",
293                      type="map",
294                      node="target_workflow",
295                      items="{{workflow.input.items}}",
296                  ),
297                  WorkflowInvokeNode(
298                      id="target_workflow",
299                      type="workflow",
300                      workflow_name="TargetWorkflow",
301                      input={"item": "{{_map_item.output}}"},
302                  ),
303              ],
304              output_mapping={"results": "{{map_workflows.output}}"},
305          )
306  
307          assert workflow.nodes[0].node == "target_workflow"
308          assert len(workflow.nodes) == 2
309  
310      def test_workflow_with_input_mapping_in_map(self):
311          """Workflow target in map can have input mapping using _map_item."""
312          workflow = WorkflowDefinition(
313              description="Workflow in map with input mapping",
314              nodes=[
315                  MapNode(
316                      id="map_wf",
317                      type="map",
318                      node="sub_wf",
319                      items="{{workflow.input.items}}",
320                  ),
321                  WorkflowInvokeNode(
322                      id="sub_wf",
323                      type="workflow",
324                      workflow_name="SubWorkflow",
325                      input={
326                          "current_item": "{{_map_item.output}}",
327                          "index": "{{_map_index.output}}",
328                      },
329                  ),
330              ],
331              output_mapping={"mapped_results": "{{map_wf.output}}"},
332          )
333  
334          target_node = workflow.nodes[1]
335          assert target_node.input is not None
336          assert "current_item" in target_node.input
337          assert target_node.input["index"] == "{{_map_index.output}}"