ADVANCED_EXAMPLES.md
  1  # DriftKit Workflow Test Framework - Advanced Examples
  2  
  3  This document provides advanced examples and patterns for testing complex workflow scenarios using the DriftKit Workflow Test Framework.
  4  
  5  ## Table of Contents
  6  
  7  1. [Stateful Workflow Testing](#stateful-workflow-testing)
  8  2. [Multi-Stage Pipeline Testing](#multi-stage-pipeline-testing)
  9  3. [Event-Driven Workflow Testing](#event-driven-workflow-testing)
 10  4. [Saga Pattern Testing](#saga-pattern-testing)
 11  5. [Dynamic Workflow Testing](#dynamic-workflow-testing)
 12  6. [Integration Test Patterns](#integration-test-patterns)
 13  
 14  ## Stateful Workflow Testing
 15  
 16  ### Testing Workflows with Complex State Management
 17  
 18  ```java
 19  public class StatefulWorkflowTest extends WorkflowTestBase {
 20      
 21      @Test
 22      void testShoppingCartWorkflow() throws Exception {
 23          // Setup stateful workflow
 24          WorkflowBuilder<CartCommand, CartState> builder = WorkflowBuilder
 25              .define("cart-workflow", CartCommand.class, CartState.class)
 26              .then("process-command", (cmd, ctx) -> {
 27                  CartState state = (CartState) ctx.get("cart-state", new CartState());
 28                  
 29                  switch (cmd.type()) {
 30                      case ADD_ITEM -> {
 31                          state.addItem(cmd.item());
 32                          ctx.set("cart-state", state);
 33                          return StepResult.continueWith(state);
 34                      }
 35                      case REMOVE_ITEM -> {
 36                          state.removeItem(cmd.itemId());
 37                          ctx.set("cart-state", state);
 38                          return StepResult.continueWith(state);
 39                      }
 40                      case CHECKOUT -> {
 41                          if (state.isEmpty()) {
 42                              return StepResult.fail("Cannot checkout empty cart");
 43                          }
 44                          return StepResult.continueWith(state);
 45                      }
 46                      default -> {
 47                          return StepResult.fail("Unknown command");
 48                      }
 49                  }
 50              })
 51              .branch(
 52                  ctx -> {
 53                      CartCommand cmd = (CartCommand) ctx.getTriggerData();
 54                      return cmd.type() == CommandType.CHECKOUT;
 55                  },
 56                  checkout -> checkout
 57                      .then("calculate-total", (state, ctx) -> {
 58                          CartState cart = (CartState) state;
 59                          double total = cart.calculateTotal();
 60                          return StepResult.continueWith(new CheckoutData(cart, total));
 61                      })
 62                      .then("process-payment", (data, ctx) -> {
 63                          // Payment processing
 64                          return StepResult.continueWith(new PaymentResult(true, "PAY-123"));
 65                      })
 66                      .then("complete-order", (payment, ctx) -> {
 67                          CartState cart = (CartState) ctx.get("cart-state");
 68                          return StepResult.finish(new OrderComplete(cart.getItemCount(), "ORD-123"));
 69                      }),
 70                  continuation -> continuation
 71                      .then("update-inventory", (state, ctx) -> {
 72                          // Update inventory for add/remove
 73                          return StepResult.finish(state);
 74                      })
 75              );
 76          
 77          engine.register(builder);
 78          
 79          // Test sequence of operations
 80          // Add items
 81          CartState state1 = executeWorkflow("cart-workflow", 
 82              new CartCommand(CommandType.ADD_ITEM, new Item("PROD-1", 29.99)));
 83          assertEquals(1, state1.getItemCount());
 84          
 85          // Add more items
 86          CartState state2 = executeWorkflow("cart-workflow", 
 87              new CartCommand(CommandType.ADD_ITEM, new Item("PROD-2", 49.99)));
 88          assertEquals(2, state2.getItemCount());
 89          
 90          // Checkout
 91          OrderComplete order = executeWorkflow("cart-workflow", 
 92              new CartCommand(CommandType.CHECKOUT, null));
 93          assertNotNull(order);
 94          assertEquals(2, order.itemCount());
 95          
 96          // Verify execution paths
 97          assertions.assertStep("cart-workflow", "calculate-total").wasExecuted();
 98          assertions.assertStep("cart-workflow", "process-payment").wasExecuted();
 99          assertions.assertStep("cart-workflow", "complete-order").wasExecuted();
100      }
101      
102      // Domain classes
103      record CartCommand(CommandType type, Item item, String itemId) {
104          CartCommand(CommandType type, Item item) {
105              this(type, item, null);
106          }
107      }
108      
109      enum CommandType { ADD_ITEM, REMOVE_ITEM, CHECKOUT }
110      
111      static class CartState {
112          private final List<Item> items = new ArrayList<>();
113          
114          void addItem(Item item) { items.add(item); }
115          void removeItem(String itemId) { 
116              items.removeIf(item -> item.id().equals(itemId)); 
117          }
118          int getItemCount() { return items.size(); }
119          boolean isEmpty() { return items.isEmpty(); }
120          double calculateTotal() {
121              return items.stream().mapToDouble(Item::price).sum();
122          }
123      }
124      
125      record Item(String id, double price) {}
126      record CheckoutData(CartState cart, double total) {}
127      record PaymentResult(boolean success, String transactionId) {}
128      record OrderComplete(int itemCount, String orderId) {}
129  }
130  ```
131  
132  ## Multi-Stage Pipeline Testing
133  
134  ### Testing Complex Data Processing Pipelines
135  
136  ```java
137  public class DataPipelineWorkflowTest extends WorkflowTestBase {
138      
139      @Test
140      void testETLPipeline() throws Exception {
141          // Setup ETL pipeline workflow
142          WorkflowBuilder<DataSource, ProcessedData> builder = WorkflowBuilder
143              .define("etl-pipeline", DataSource.class, ProcessedData.class)
144              
145              // Extract stage
146              .then("validate-source", (source, ctx) -> {
147                  if (!source.isValid()) {
148                      return StepResult.fail("Invalid data source");
149                  }
150                  return StepResult.continueWith(source);
151              })
152              .then("extract-data", (source, ctx) -> {
153                  // Mock external data extraction
154                  return StepResult.continueWith(new RawData(source.id(), generateTestData()));
155              })
156              
157              // Transform stage
158              .then("clean-data", (raw, ctx) -> {
159                  List<Record> cleaned = raw.records().stream()
160                      .filter(r -> r.isValid())
161                      .collect(Collectors.toList());
162                  return StepResult.continueWith(new CleanedData(raw.sourceId(), cleaned));
163              })
164              .then("enrich-data", (cleaned, ctx) -> {
165                  List<EnrichedRecord> enriched = cleaned.records().stream()
166                      .map(r -> enrichRecord(r))
167                      .collect(Collectors.toList());
168                  return StepResult.continueWith(new EnrichedData(cleaned.sourceId(), enriched));
169              })
170              .then("aggregate-data", (enriched, ctx) -> {
171                  Map<String, AggregateMetrics> aggregates = enriched.records().stream()
172                      .collect(Collectors.groupingBy(
173                          EnrichedRecord::category,
174                          Collectors.collectingAndThen(
175                              Collectors.toList(),
176                              records -> calculateMetrics(records)
177                          )
178                      ));
179                  return StepResult.continueWith(new AggregatedData(enriched.sourceId(), aggregates));
180              })
181              
182              // Load stage
183              .then("validate-output", (aggregated, ctx) -> {
184                  if (aggregated.metrics().isEmpty()) {
185                      return StepResult.fail("No data to load");
186                  }
187                  return StepResult.continueWith(aggregated);
188              })
189              .then("load-to-warehouse", (aggregated, ctx) -> {
190                  // Mock data warehouse loading
191                  String loadId = "LOAD-" + System.currentTimeMillis();
192                  return StepResult.finish(new ProcessedData(
193                      aggregated.sourceId(), 
194                      loadId, 
195                      aggregated.metrics().size()
196                  ));
197              });
198          
199          engine.register(builder);
200          
201          // Mock external services
202          orchestrator.mock()
203              .workflow("etl-pipeline")
204              .step("extract-data")
205              .always()
206              .thenReturn(DataSource.class, source -> {
207                  // Simulate different data volumes
208                  int recordCount = source.id().contains("LARGE") ? 10000 : 100;
209                  List<Record> records = IntStream.range(0, recordCount)
210                      .mapToObj(i -> new Record("REC-" + i, "Category-" + (i % 5), i * 1.5))
211                      .collect(Collectors.toList());
212                  return StepResult.continueWith(new RawData(source.id(), records));
213              });
214          
215          // Test small dataset
216          ProcessedData result = executeWorkflow("etl-pipeline", 
217              new DataSource("SMALL-001", true));
218          assertEquals(5, result.aggregateCount()); // 5 categories
219          
220          // Test large dataset with performance assertion
221          long startTime = System.currentTimeMillis();
222          ProcessedData largeResult = executeWorkflow("etl-pipeline", 
223              new DataSource("LARGE-001", true));
224          long duration = System.currentTimeMillis() - startTime;
225          
226          assertTrue(duration < 5000, "Large dataset processing took too long: " + duration + "ms");
227          assertEquals(5, largeResult.aggregateCount());
228          
229          // Verify all stages executed
230          assertions.assertExecutionOrder()
231              .step("validate-source")
232              .step("extract-data")
233              .step("clean-data")
234              .step("enrich-data")
235              .step("aggregate-data")
236              .step("validate-output")
237              .step("load-to-warehouse");
238      }
239      
240      // Helper methods
241      private List<Record> generateTestData() {
242          return IntStream.range(0, 100)
243              .mapToObj(i -> new Record("REC-" + i, "Cat-" + (i % 5), i * 1.5))
244              .collect(Collectors.toList());
245      }
246      
247      private EnrichedRecord enrichRecord(Record record) {
248          return new EnrichedRecord(
249              record.id(),
250              record.category(),
251              record.value(),
252              record.value() * 1.1, // Add 10% enrichment
253              System.currentTimeMillis()
254          );
255      }
256      
257      private AggregateMetrics calculateMetrics(List<EnrichedRecord> records) {
258          double sum = records.stream().mapToDouble(EnrichedRecord::enrichedValue).sum();
259          double avg = sum / records.size();
260          return new AggregateMetrics(records.size(), sum, avg);
261      }
262      
263      // Domain classes
264      record DataSource(String id, boolean isValid) {}
265      record Record(String id, String category, double value) {
266          boolean isValid() { return value >= 0; }
267      }
268      record RawData(String sourceId, List<Record> records) {}
269      record CleanedData(String sourceId, List<Record> records) {}
270      record EnrichedRecord(String id, String category, double value, double enrichedValue, long timestamp) {}
271      record EnrichedData(String sourceId, List<EnrichedRecord> records) {}
272      record AggregateMetrics(int count, double sum, double average) {}
273      record AggregatedData(String sourceId, Map<String, AggregateMetrics> metrics) {}
274      record ProcessedData(String sourceId, String loadId, int aggregateCount) {}
275  }
276  ```
277  
278  ## Event-Driven Workflow Testing
279  
280  ### Testing Workflows with Event Emissions and Reactions
281  
282  ```java
283  public class EventDrivenWorkflowTest extends WorkflowTestBase {
284      
285      private final List<WorkflowEvent> capturedEvents = new ArrayList<>();
286      
287      @BeforeEach
288      void setUp() {
289          // Setup event listener
290          engine.addEventListener(event -> capturedEvents.add(event));
291      }
292      
293      @Test
294      void testOrderFulfillmentWithEvents() throws Exception {
295          // Create event-driven order fulfillment workflow
296          WorkflowBuilder<Order, FulfillmentResult> builder = WorkflowBuilder
297              .define("fulfillment-workflow", Order.class, FulfillmentResult.class)
298              .then("reserve-inventory", (order, ctx) -> {
299                  ctx.emit(new Event("INVENTORY_CHECK", order.orderId()));
300                  
301                  boolean available = checkInventory(order);
302                  if (!available) {
303                      ctx.emit(new Event("INVENTORY_SHORTAGE", order.orderId()));
304                      return StepResult.fail("Insufficient inventory");
305                  }
306                  
307                  ctx.emit(new Event("INVENTORY_RESERVED", order.orderId()));
308                  return StepResult.continueWith(new ReservedOrder(order, "RES-123"));
309              })
310              .then("allocate-shipping", (reserved, ctx) -> {
311                  ctx.emit(new Event("SHIPPING_ALLOCATION_START", reserved.order().orderId()));
312                  
313                  ShippingMethod method = selectShippingMethod(reserved.order());
314                  ctx.emit(new Event("SHIPPING_METHOD_SELECTED", method.name()));
315                  
316                  return StepResult.continueWith(new AllocatedOrder(reserved, method));
317              })
318              .then("generate-labels", (allocated, ctx) -> {
319                  ctx.emit(new Event("LABEL_GENERATION_START", allocated.reserved().order().orderId()));
320                  
321                  String trackingNumber = "TRACK-" + System.currentTimeMillis();
322                  ctx.emit(new Event("TRACKING_NUMBER_GENERATED", trackingNumber));
323                  
324                  return StepResult.continueWith(new LabeledOrder(allocated, trackingNumber));
325              })
326              .then("notify-warehouse", (labeled, ctx) -> {
327                  ctx.emit(new Event("WAREHOUSE_NOTIFICATION", labeled.trackingNumber()));
328                  
329                  return StepResult.finish(new FulfillmentResult(
330                      labeled.allocated().reserved().order().orderId(),
331                      labeled.trackingNumber(),
332                      FulfillmentStatus.READY_TO_SHIP
333                  ));
334              });
335          
336          engine.register(builder);
337          
338          // Test workflow execution
339          Order order = new Order("ORD-789", Arrays.asList(
340              new OrderItem("SKU-1", 2),
341              new OrderItem("SKU-2", 1)
342          ));
343          
344          FulfillmentResult result = executeWorkflow("fulfillment-workflow", order);
345          
346          // Verify result
347          assertEquals("ORD-789", result.orderId());
348          assertEquals(FulfillmentStatus.READY_TO_SHIP, result.status());
349          assertNotNull(result.trackingNumber());
350          
351          // Verify events were emitted in correct order
352          List<String> eventTypes = capturedEvents.stream()
353              .map(WorkflowEvent::getType)
354              .collect(Collectors.toList());
355          
356          assertEquals(Arrays.asList(
357              "INVENTORY_CHECK",
358              "INVENTORY_RESERVED",
359              "SHIPPING_ALLOCATION_START",
360              "SHIPPING_METHOD_SELECTED",
361              "LABEL_GENERATION_START",
362              "TRACKING_NUMBER_GENERATED",
363              "WAREHOUSE_NOTIFICATION"
364          ), eventTypes);
365          
366          // Test inventory shortage scenario
367          orchestrator.mock()
368              .workflow("fulfillment-workflow")
369              .step("reserve-inventory")
370              .always()
371              .thenReturn(Order.class, ord -> {
372                  ctx.emit(new Event("INVENTORY_CHECK", ord.orderId()));
373                  ctx.emit(new Event("INVENTORY_SHORTAGE", ord.orderId()));
374                  return StepResult.fail("Insufficient inventory");
375              });
376          
377          capturedEvents.clear();
378          
379          assertThrows(Exception.class, () -> 
380              executeWorkflow("fulfillment-workflow", order)
381          );
382          
383          // Verify shortage event was emitted
384          assertTrue(capturedEvents.stream()
385              .anyMatch(e -> e.getType().equals("INVENTORY_SHORTAGE")));
386      }
387      
388      // Helper methods
389      private boolean checkInventory(Order order) {
390          return !order.orderId().contains("OOS"); // Out of stock
391      }
392      
393      private ShippingMethod selectShippingMethod(Order order) {
394          return order.items().size() > 5 ? ShippingMethod.FREIGHT : ShippingMethod.STANDARD;
395      }
396      
397      // Domain classes
398      record Order(String orderId, List<OrderItem> items) {}
399      record OrderItem(String sku, int quantity) {}
400      record ReservedOrder(Order order, String reservationId) {}
401      enum ShippingMethod { STANDARD, EXPRESS, FREIGHT }
402      record AllocatedOrder(ReservedOrder reserved, ShippingMethod method) {}
403      record LabeledOrder(AllocatedOrder allocated, String trackingNumber) {}
404      enum FulfillmentStatus { READY_TO_SHIP, SHIPPED, DELIVERED }
405      record FulfillmentResult(String orderId, String trackingNumber, FulfillmentStatus status) {}
406      record Event(String type, Object data) {}
407  }
408  ```
409  
410  ## Saga Pattern Testing
411  
412  ### Testing Distributed Transaction Workflows
413  
414  ```java
415  public class SagaWorkflowTest extends WorkflowTestBase {
416      
417      @Test
418      void testHotelBookingSaga() throws Exception {
419          // Create saga workflow with compensation
420          WorkflowBuilder<BookingRequest, BookingConfirmation> builder = WorkflowBuilder
421              .define("booking-saga", BookingRequest.class, BookingConfirmation.class)
422              
423              // Step 1: Reserve hotel room
424              .then("reserve-room", (request, ctx) -> {
425                  String reservationId = "ROOM-" + System.currentTimeMillis();
426                  ctx.set("room-reservation", reservationId);
427                  return StepResult.continueWith(new RoomReserved(reservationId, request));
428              })
429              
430              // Step 2: Charge payment
431              .then("charge-payment", (roomReserved, ctx) -> {
432                  try {
433                      String paymentId = processPayment(roomReserved.request().creditCard());
434                      ctx.set("payment-id", paymentId);
435                      return StepResult.continueWith(new PaymentCharged(paymentId, roomReserved));
436                  } catch (PaymentException e) {
437                      // Trigger compensation
438                      ctx.set("compensation-needed", true);
439                      return StepResult.fail("Payment failed: " + e.getMessage());
440                  }
441              })
442              
443              // Step 3: Send confirmation
444              .then("send-confirmation", (paymentCharged, ctx) -> {
445                  try {
446                      String confirmationId = sendEmail(paymentCharged.roomReserved().request().email());
447                      return StepResult.finish(new BookingConfirmation(
448                          paymentCharged.roomReserved().reservationId(),
449                          paymentCharged.paymentId(),
450                          confirmationId
451                      ));
452                  } catch (Exception e) {
453                      ctx.set("compensation-needed", true);
454                      return StepResult.fail("Confirmation failed");
455                  }
456              })
457              
458              // Compensation flow
459              .onError((error, ctx) -> {
460                  if (ctx.get("compensation-needed", false)) {
461                      // Compensate in reverse order
462                      String paymentId = (String) ctx.get("payment-id");
463                      if (paymentId != null) {
464                          refundPayment(paymentId);
465                      }
466                      
467                      String roomReservation = (String) ctx.get("room-reservation");
468                      if (roomReservation != null) {
469                          cancelRoomReservation(roomReservation);
470                      }
471                  }
472                  return StepResult.fail("Booking failed, compensations executed");
473              });
474          
475          engine.register(builder);
476          
477          // Test successful saga
478          BookingRequest request = new BookingRequest(
479              "user@example.com",
480              "2024-06-01",
481              "2024-06-05",
482              "VISA-1234"
483          );
484          
485          BookingConfirmation confirmation = executeWorkflow("booking-saga", request);
486          assertNotNull(confirmation.reservationId());
487          assertNotNull(confirmation.paymentId());
488          assertNotNull(confirmation.confirmationId());
489          
490          // Test saga with payment failure
491          orchestrator.mock()
492              .workflow("booking-saga")
493              .step("charge-payment")
494              .always()
495              .thenFail(new PaymentException("Card declined"));
496          
497          // Track compensation calls
498          AtomicBoolean roomCancelled = new AtomicBoolean(false);
499          AtomicBoolean paymentRefunded = new AtomicBoolean(false);
500          
501          orchestrator.mock()
502              .workflow("booking-saga")
503              .step("onError")
504              .always()
505              .thenReturn(Object.class, err -> {
506                  roomCancelled.set(true);
507                  paymentRefunded.set(true);
508                  return StepResult.fail("Compensated");
509              });
510          
511          // Execute and expect failure
512          assertThrows(Exception.class, () -> 
513              executeWorkflow("booking-saga", request)
514          );
515          
516          // Verify compensations were executed
517          assertTrue(roomCancelled.get(), "Room reservation should be cancelled");
518          assertTrue(paymentRefunded.get(), "Payment should be refunded");
519      }
520      
521      // Saga helper methods
522      private String processPayment(String creditCard) {
523          if (creditCard.contains("FAIL")) {
524              throw new PaymentException("Payment processing failed");
525          }
526          return "PAY-" + System.currentTimeMillis();
527      }
528      
529      private String sendEmail(String email) {
530          return "CONF-" + System.currentTimeMillis();
531      }
532      
533      private void refundPayment(String paymentId) {
534          // Compensation logic
535      }
536      
537      private void cancelRoomReservation(String reservationId) {
538          // Compensation logic
539      }
540      
541      // Domain classes
542      record BookingRequest(String email, String checkIn, String checkOut, String creditCard) {}
543      record RoomReserved(String reservationId, BookingRequest request) {}
544      record PaymentCharged(String paymentId, RoomReserved roomReserved) {}
545      record BookingConfirmation(String reservationId, String paymentId, String confirmationId) {}
546      
547      static class PaymentException extends RuntimeException {
548          PaymentException(String message) { super(message); }
549      }
550  }
551  ```
552  
553  ## Dynamic Workflow Testing
554  
555  ### Testing Workflows with Dynamic Step Generation
556  
557  ```java
558  public class DynamicWorkflowTest extends WorkflowTestBase {
559      
560      @Test
561      void testDynamicApprovalWorkflow() throws Exception {
562          // Create workflow with dynamic approval steps
563          WorkflowBuilder<ApprovalRequest, ApprovalResult> builder = WorkflowBuilder
564              .define("dynamic-approval", ApprovalRequest.class, ApprovalResult.class)
565              
566              .then("determine-approvers", (request, ctx) -> {
567                  List<String> approvers = determineApprovers(request);
568                  ctx.set("approvers", approvers);
569                  ctx.set("approvals", new HashMap<String, Boolean>());
570                  return StepResult.continueWith(new ApprovalProcess(request, approvers));
571              })
572              
573              .dynamicSteps("approval-loop", (process, ctx) -> {
574                  List<String> approvers = (List<String>) ctx.get("approvers");
575                  Map<String, Boolean> approvals = (Map<String, Boolean>) ctx.get("approvals");
576                  
577                  // Generate a step for each approver
578                  return approvers.stream()
579                      .filter(approver -> !approvals.containsKey(approver))
580                      .findFirst()
581                      .map(approver -> {
582                          return StepDefinition.create(
583                              "approve-" + approver,
584                              (ApprovalProcess proc, WorkflowContext c) -> {
585                                  // Simulate approval
586                                  boolean approved = !approver.contains("REJECT");
587                                  approvals.put(approver, approved);
588                                  c.set("approvals", approvals);
589                                  
590                                  if (!approved) {
591                                      return StepResult.fail("Rejected by " + approver);
592                                  }
593                                  
594                                  // Check if all approvals are complete
595                                  if (approvals.size() == approvers.size()) {
596                                      return StepResult.continueWith(new AllApproved(proc.request()));
597                                  }
598                                  
599                                  // Continue to next approver
600                                  return StepResult.continueWith(proc);
601                              }
602                          );
603                      })
604                      .orElse(null);
605              })
606              
607              .then("finalize", (allApproved, ctx) -> {
608                  Map<String, Boolean> approvals = (Map<String, Boolean>) ctx.get("approvals");
609                  return StepResult.finish(new ApprovalResult(
610                      allApproved.request().id(),
611                      true,
612                      approvals
613                  ));
614              });
615          
616          engine.register(builder);
617          
618          // Test with multiple approvers
619          ApprovalRequest request = new ApprovalRequest(
620              "REQ-123",
621              ApprovalType.BUDGET,
622              50000.0
623          );
624          
625          ApprovalResult result = executeWorkflow("dynamic-approval", request);
626          assertTrue(result.approved());
627          assertEquals(3, result.approverDecisions().size()); // Manager, Director, VP
628          
629          // Verify dynamic steps were executed
630          assertions.assertStep("dynamic-approval", "approve-MANAGER").wasExecuted();
631          assertions.assertStep("dynamic-approval", "approve-DIRECTOR").wasExecuted();
632          assertions.assertStep("dynamic-approval", "approve-VP").wasExecuted();
633          
634          // Test rejection scenario
635          ApprovalRequest rejectRequest = new ApprovalRequest(
636              "REQ-456",
637              ApprovalType.BUDGET,
638              100000.0 // Will require REJECT-CFO
639          );
640          
641          assertThrows(Exception.class, () -> 
642              executeWorkflow("dynamic-approval", rejectRequest)
643          );
644      }
645      
646      // Helper method to determine approvers based on request
647      private List<String> determineApprovers(ApprovalRequest request) {
648          List<String> approvers = new ArrayList<>();
649          
650          if (request.amount() < 10000) {
651              approvers.add("MANAGER");
652          } else if (request.amount() < 50000) {
653              approvers.add("MANAGER");
654              approvers.add("DIRECTOR");
655          } else if (request.amount() < 100000) {
656              approvers.add("MANAGER");
657              approvers.add("DIRECTOR");
658              approvers.add("VP");
659          } else {
660              approvers.add("MANAGER");
661              approvers.add("DIRECTOR");
662              approvers.add("VP");
663              approvers.add("REJECT-CFO"); // Will reject
664          }
665          
666          return approvers;
667      }
668      
669      // Domain classes
670      enum ApprovalType { BUDGET, HIRE, PROJECT }
671      record ApprovalRequest(String id, ApprovalType type, double amount) {}
672      record ApprovalProcess(ApprovalRequest request, List<String> approvers) {}
673      record AllApproved(ApprovalRequest request) {}
674      record ApprovalResult(String requestId, boolean approved, Map<String, Boolean> approverDecisions) {}
675  }
676  ```
677  
678  ## Integration Test Patterns
679  
680  ### Testing Workflows with External System Integration
681  
682  ```java
683  @SpringBootTest
684  @TestContainers
685  public class IntegrationWorkflowTest extends WorkflowTestBase {
686      
687      @Container
688      static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14")
689              .withDatabaseName("workflow_test")
690              .withUsername("test")
691              .withPassword("test");
692      
693      @Container
694      static GenericContainer<?> redis = new GenericContainer<>("redis:7")
695              .withExposedPorts(6379);
696      
697      @Autowired
698      private DataSource dataSource;
699      
700      @Autowired
701      private RedisTemplate<String, Object> redisTemplate;
702      
703      @DynamicPropertySource
704      static void properties(DynamicPropertyRegistry registry) {
705          registry.add("spring.datasource.url", postgres::getJdbcUrl);
706          registry.add("spring.datasource.username", postgres::getUsername);
707          registry.add("spring.datasource.password", postgres::getPassword);
708          registry.add("spring.redis.host", redis::getHost);
709          registry.add("spring.redis.port", () -> redis.getMappedPort(6379));
710      }
711      
712      @Test
713      void testDataProcessingWithRealDatabases() throws Exception {
714          // Create workflow that uses real databases
715          WorkflowBuilder<DataImportRequest, DataImportResult> builder = WorkflowBuilder
716              .define("data-import", DataImportRequest.class, DataImportResult.class)
717              
718              .then("validate-schema", (request, ctx) -> {
719                  // Validate against real database schema
720                  try (Connection conn = dataSource.getConnection()) {
721                      DatabaseMetaData meta = conn.getMetaData();
722                      ResultSet tables = meta.getTables(null, null, request.tableName(), null);
723                      if (!tables.next()) {
724                          return StepResult.fail("Table does not exist: " + request.tableName());
725                      }
726                  }
727                  return StepResult.continueWith(request);
728              })
729              
730              .then("cache-metadata", (request, ctx) -> {
731                  // Store metadata in Redis
732                  String cacheKey = "import:" + request.importId();
733                  ImportMetadata metadata = new ImportMetadata(
734                      request.importId(),
735                      request.tableName(),
736                      System.currentTimeMillis()
737                  );
738                  redisTemplate.opsForValue().set(cacheKey, metadata, Duration.ofHours(1));
739                  return StepResult.continueWith(new ValidatedImport(request, metadata));
740              })
741              
742              .then("import-data", (validated, ctx) -> {
743                  // Import data to real database
744                  try (Connection conn = dataSource.getConnection()) {
745                      // Start transaction
746                      conn.setAutoCommit(false);
747                      
748                      String sql = "INSERT INTO " + validated.request().tableName() + 
749                                 " (id, data, created_at) VALUES (?, ?, ?)";
750                      
751                      try (PreparedStatement stmt = conn.prepareStatement(sql)) {
752                          for (Map<String, Object> record : validated.request().records()) {
753                              stmt.setString(1, (String) record.get("id"));
754                              stmt.setString(2, (String) record.get("data"));
755                              stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis()));
756                              stmt.addBatch();
757                          }
758                          
759                          int[] results = stmt.executeBatch();
760                          conn.commit();
761                          
762                          return StepResult.continueWith(new ImportStats(
763                              validated,
764                              results.length,
765                              0
766                          ));
767                      } catch (Exception e) {
768                          conn.rollback();
769                          throw e;
770                      }
771                  }
772              })
773              
774              .then("update-cache", (stats, ctx) -> {
775                  // Update Redis with import results
776                  String cacheKey = "import:" + stats.validated().request().importId() + ":stats";
777                  redisTemplate.opsForValue().set(cacheKey, stats, Duration.ofDays(7));
778                  
779                  return StepResult.finish(new DataImportResult(
780                      stats.validated().request().importId(),
781                      stats.successCount(),
782                      stats.errorCount()
783                  ));
784              });
785          
786          engine.register(builder);
787          
788          // Setup test data
789          createTestTable();
790          
791          List<Map<String, Object>> testRecords = Arrays.asList(
792              Map.of("id", "1", "data", "Test record 1"),
793              Map.of("id", "2", "data", "Test record 2"),
794              Map.of("id", "3", "data", "Test record 3")
795          );
796          
797          DataImportRequest request = new DataImportRequest(
798              "IMP-001",
799              "test_imports",
800              testRecords
801          );
802          
803          // Execute workflow
804          DataImportResult result = executeWorkflow("data-import", request);
805          
806          // Verify results
807          assertEquals(3, result.successCount());
808          assertEquals(0, result.errorCount());
809          
810          // Verify data in PostgreSQL
811          try (Connection conn = dataSource.getConnection();
812               Statement stmt = conn.createStatement();
813               ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM test_imports")) {
814              assertTrue(rs.next());
815              assertEquals(3, rs.getInt(1));
816          }
817          
818          // Verify cache in Redis
819          ImportMetadata cached = (ImportMetadata) redisTemplate.opsForValue()
820              .get("import:IMP-001");
821          assertNotNull(cached);
822          assertEquals("test_imports", cached.tableName());
823          
824          ImportStats stats = (ImportStats) redisTemplate.opsForValue()
825              .get("import:IMP-001:stats");
826          assertNotNull(stats);
827          assertEquals(3, stats.successCount());
828      }
829      
830      private void createTestTable() throws SQLException {
831          try (Connection conn = dataSource.getConnection();
832               Statement stmt = conn.createStatement()) {
833              stmt.execute("""
834                  CREATE TABLE IF NOT EXISTS test_imports (
835                      id VARCHAR(50) PRIMARY KEY,
836                      data TEXT,
837                      created_at TIMESTAMP
838                  )
839              """);
840          }
841      }
842      
843      // Domain classes
844      record DataImportRequest(String importId, String tableName, List<Map<String, Object>> records) {}
845      record ImportMetadata(String importId, String tableName, long timestamp) implements Serializable {}
846      record ValidatedImport(DataImportRequest request, ImportMetadata metadata) {}
847      record ImportStats(ValidatedImport validated, int successCount, int errorCount) implements Serializable {}
848      record DataImportResult(String importId, int successCount, int errorCount) {}
849  }
850  ```
851  
852  ## Summary
853  
854  These advanced examples demonstrate:
855  
856  1. **Stateful Workflows** - Managing complex state across workflow steps
857  2. **Multi-Stage Pipelines** - Testing ETL and data processing workflows
858  3. **Event-Driven Patterns** - Workflows that emit and react to events
859  4. **Saga Pattern** - Distributed transactions with compensation
860  5. **Dynamic Workflows** - Workflows that generate steps at runtime
861  6. **Integration Testing** - Testing with real external systems using TestContainers
862  
863  Each example shows best practices for:
864  - Proper mock usage
865  - Comprehensive assertions
866  - Performance testing
867  - Error handling
868  - Integration with external systems
869  
870  Use these patterns as templates for testing your own complex workflow scenarios.