ADVANCED_EXAMPLES.md
1 # DriftKit Workflow Test Framework - Advanced Examples 2 3 This document provides advanced examples and patterns for testing complex workflow scenarios using the DriftKit Workflow Test Framework. 4 5 ## Table of Contents 6 7 1. [Stateful Workflow Testing](#stateful-workflow-testing) 8 2. [Multi-Stage Pipeline Testing](#multi-stage-pipeline-testing) 9 3. [Event-Driven Workflow Testing](#event-driven-workflow-testing) 10 4. [Saga Pattern Testing](#saga-pattern-testing) 11 5. [Dynamic Workflow Testing](#dynamic-workflow-testing) 12 6. [Integration Test Patterns](#integration-test-patterns) 13 14 ## Stateful Workflow Testing 15 16 ### Testing Workflows with Complex State Management 17 18 ```java 19 public class StatefulWorkflowTest extends WorkflowTestBase { 20 21 @Test 22 void testShoppingCartWorkflow() throws Exception { 23 // Setup stateful workflow 24 WorkflowBuilder<CartCommand, CartState> builder = WorkflowBuilder 25 .define("cart-workflow", CartCommand.class, CartState.class) 26 .then("process-command", (cmd, ctx) -> { 27 CartState state = (CartState) ctx.get("cart-state", new CartState()); 28 29 switch (cmd.type()) { 30 case ADD_ITEM -> { 31 state.addItem(cmd.item()); 32 ctx.set("cart-state", state); 33 return StepResult.continueWith(state); 34 } 35 case REMOVE_ITEM -> { 36 state.removeItem(cmd.itemId()); 37 ctx.set("cart-state", state); 38 return StepResult.continueWith(state); 39 } 40 case CHECKOUT -> { 41 if (state.isEmpty()) { 42 return StepResult.fail("Cannot checkout empty cart"); 43 } 44 return StepResult.continueWith(state); 45 } 46 default -> { 47 return StepResult.fail("Unknown command"); 48 } 49 } 50 }) 51 .branch( 52 ctx -> { 53 CartCommand cmd = (CartCommand) ctx.getTriggerData(); 54 return cmd.type() == CommandType.CHECKOUT; 55 }, 56 checkout -> checkout 57 .then("calculate-total", (state, ctx) -> { 58 CartState cart = (CartState) state; 59 double total = cart.calculateTotal(); 60 return StepResult.continueWith(new CheckoutData(cart, total)); 61 }) 62 .then("process-payment", (data, ctx) -> { 63 // Payment processing 64 return StepResult.continueWith(new PaymentResult(true, "PAY-123")); 65 }) 66 .then("complete-order", (payment, ctx) -> { 67 CartState cart = (CartState) ctx.get("cart-state"); 68 return StepResult.finish(new OrderComplete(cart.getItemCount(), "ORD-123")); 69 }), 70 continuation -> continuation 71 .then("update-inventory", (state, ctx) -> { 72 // Update inventory for add/remove 73 return StepResult.finish(state); 74 }) 75 ); 76 77 engine.register(builder); 78 79 // Test sequence of operations 80 // Add items 81 CartState state1 = executeWorkflow("cart-workflow", 82 new CartCommand(CommandType.ADD_ITEM, new Item("PROD-1", 29.99))); 83 assertEquals(1, state1.getItemCount()); 84 85 // Add more items 86 CartState state2 = executeWorkflow("cart-workflow", 87 new CartCommand(CommandType.ADD_ITEM, new Item("PROD-2", 49.99))); 88 assertEquals(2, state2.getItemCount()); 89 90 // Checkout 91 OrderComplete order = executeWorkflow("cart-workflow", 92 new CartCommand(CommandType.CHECKOUT, null)); 93 assertNotNull(order); 94 assertEquals(2, order.itemCount()); 95 96 // Verify execution paths 97 assertions.assertStep("cart-workflow", "calculate-total").wasExecuted(); 98 assertions.assertStep("cart-workflow", "process-payment").wasExecuted(); 99 assertions.assertStep("cart-workflow", "complete-order").wasExecuted(); 100 } 101 102 // Domain classes 103 record CartCommand(CommandType type, Item item, String itemId) { 104 CartCommand(CommandType type, Item item) { 105 this(type, item, null); 106 } 107 } 108 109 enum CommandType { ADD_ITEM, REMOVE_ITEM, CHECKOUT } 110 111 static class CartState { 112 private final List<Item> items = new ArrayList<>(); 113 114 void addItem(Item item) { items.add(item); } 115 void removeItem(String itemId) { 116 items.removeIf(item -> item.id().equals(itemId)); 117 } 118 int getItemCount() { return items.size(); } 119 boolean isEmpty() { return items.isEmpty(); } 120 double calculateTotal() { 121 return items.stream().mapToDouble(Item::price).sum(); 122 } 123 } 124 125 record Item(String id, double price) {} 126 record CheckoutData(CartState cart, double total) {} 127 record PaymentResult(boolean success, String transactionId) {} 128 record OrderComplete(int itemCount, String orderId) {} 129 } 130 ``` 131 132 ## Multi-Stage Pipeline Testing 133 134 ### Testing Complex Data Processing Pipelines 135 136 ```java 137 public class DataPipelineWorkflowTest extends WorkflowTestBase { 138 139 @Test 140 void testETLPipeline() throws Exception { 141 // Setup ETL pipeline workflow 142 WorkflowBuilder<DataSource, ProcessedData> builder = WorkflowBuilder 143 .define("etl-pipeline", DataSource.class, ProcessedData.class) 144 145 // Extract stage 146 .then("validate-source", (source, ctx) -> { 147 if (!source.isValid()) { 148 return StepResult.fail("Invalid data source"); 149 } 150 return StepResult.continueWith(source); 151 }) 152 .then("extract-data", (source, ctx) -> { 153 // Mock external data extraction 154 return StepResult.continueWith(new RawData(source.id(), generateTestData())); 155 }) 156 157 // Transform stage 158 .then("clean-data", (raw, ctx) -> { 159 List<Record> cleaned = raw.records().stream() 160 .filter(r -> r.isValid()) 161 .collect(Collectors.toList()); 162 return StepResult.continueWith(new CleanedData(raw.sourceId(), cleaned)); 163 }) 164 .then("enrich-data", (cleaned, ctx) -> { 165 List<EnrichedRecord> enriched = cleaned.records().stream() 166 .map(r -> enrichRecord(r)) 167 .collect(Collectors.toList()); 168 return StepResult.continueWith(new EnrichedData(cleaned.sourceId(), enriched)); 169 }) 170 .then("aggregate-data", (enriched, ctx) -> { 171 Map<String, AggregateMetrics> aggregates = enriched.records().stream() 172 .collect(Collectors.groupingBy( 173 EnrichedRecord::category, 174 Collectors.collectingAndThen( 175 Collectors.toList(), 176 records -> calculateMetrics(records) 177 ) 178 )); 179 return StepResult.continueWith(new AggregatedData(enriched.sourceId(), aggregates)); 180 }) 181 182 // Load stage 183 .then("validate-output", (aggregated, ctx) -> { 184 if (aggregated.metrics().isEmpty()) { 185 return StepResult.fail("No data to load"); 186 } 187 return StepResult.continueWith(aggregated); 188 }) 189 .then("load-to-warehouse", (aggregated, ctx) -> { 190 // Mock data warehouse loading 191 String loadId = "LOAD-" + System.currentTimeMillis(); 192 return StepResult.finish(new ProcessedData( 193 aggregated.sourceId(), 194 loadId, 195 aggregated.metrics().size() 196 )); 197 }); 198 199 engine.register(builder); 200 201 // Mock external services 202 orchestrator.mock() 203 .workflow("etl-pipeline") 204 .step("extract-data") 205 .always() 206 .thenReturn(DataSource.class, source -> { 207 // Simulate different data volumes 208 int recordCount = source.id().contains("LARGE") ? 10000 : 100; 209 List<Record> records = IntStream.range(0, recordCount) 210 .mapToObj(i -> new Record("REC-" + i, "Category-" + (i % 5), i * 1.5)) 211 .collect(Collectors.toList()); 212 return StepResult.continueWith(new RawData(source.id(), records)); 213 }); 214 215 // Test small dataset 216 ProcessedData result = executeWorkflow("etl-pipeline", 217 new DataSource("SMALL-001", true)); 218 assertEquals(5, result.aggregateCount()); // 5 categories 219 220 // Test large dataset with performance assertion 221 long startTime = System.currentTimeMillis(); 222 ProcessedData largeResult = executeWorkflow("etl-pipeline", 223 new DataSource("LARGE-001", true)); 224 long duration = System.currentTimeMillis() - startTime; 225 226 assertTrue(duration < 5000, "Large dataset processing took too long: " + duration + "ms"); 227 assertEquals(5, largeResult.aggregateCount()); 228 229 // Verify all stages executed 230 assertions.assertExecutionOrder() 231 .step("validate-source") 232 .step("extract-data") 233 .step("clean-data") 234 .step("enrich-data") 235 .step("aggregate-data") 236 .step("validate-output") 237 .step("load-to-warehouse"); 238 } 239 240 // Helper methods 241 private List<Record> generateTestData() { 242 return IntStream.range(0, 100) 243 .mapToObj(i -> new Record("REC-" + i, "Cat-" + (i % 5), i * 1.5)) 244 .collect(Collectors.toList()); 245 } 246 247 private EnrichedRecord enrichRecord(Record record) { 248 return new EnrichedRecord( 249 record.id(), 250 record.category(), 251 record.value(), 252 record.value() * 1.1, // Add 10% enrichment 253 System.currentTimeMillis() 254 ); 255 } 256 257 private AggregateMetrics calculateMetrics(List<EnrichedRecord> records) { 258 double sum = records.stream().mapToDouble(EnrichedRecord::enrichedValue).sum(); 259 double avg = sum / records.size(); 260 return new AggregateMetrics(records.size(), sum, avg); 261 } 262 263 // Domain classes 264 record DataSource(String id, boolean isValid) {} 265 record Record(String id, String category, double value) { 266 boolean isValid() { return value >= 0; } 267 } 268 record RawData(String sourceId, List<Record> records) {} 269 record CleanedData(String sourceId, List<Record> records) {} 270 record EnrichedRecord(String id, String category, double value, double enrichedValue, long timestamp) {} 271 record EnrichedData(String sourceId, List<EnrichedRecord> records) {} 272 record AggregateMetrics(int count, double sum, double average) {} 273 record AggregatedData(String sourceId, Map<String, AggregateMetrics> metrics) {} 274 record ProcessedData(String sourceId, String loadId, int aggregateCount) {} 275 } 276 ``` 277 278 ## Event-Driven Workflow Testing 279 280 ### Testing Workflows with Event Emissions and Reactions 281 282 ```java 283 public class EventDrivenWorkflowTest extends WorkflowTestBase { 284 285 private final List<WorkflowEvent> capturedEvents = new ArrayList<>(); 286 287 @BeforeEach 288 void setUp() { 289 // Setup event listener 290 engine.addEventListener(event -> capturedEvents.add(event)); 291 } 292 293 @Test 294 void testOrderFulfillmentWithEvents() throws Exception { 295 // Create event-driven order fulfillment workflow 296 WorkflowBuilder<Order, FulfillmentResult> builder = WorkflowBuilder 297 .define("fulfillment-workflow", Order.class, FulfillmentResult.class) 298 .then("reserve-inventory", (order, ctx) -> { 299 ctx.emit(new Event("INVENTORY_CHECK", order.orderId())); 300 301 boolean available = checkInventory(order); 302 if (!available) { 303 ctx.emit(new Event("INVENTORY_SHORTAGE", order.orderId())); 304 return StepResult.fail("Insufficient inventory"); 305 } 306 307 ctx.emit(new Event("INVENTORY_RESERVED", order.orderId())); 308 return StepResult.continueWith(new ReservedOrder(order, "RES-123")); 309 }) 310 .then("allocate-shipping", (reserved, ctx) -> { 311 ctx.emit(new Event("SHIPPING_ALLOCATION_START", reserved.order().orderId())); 312 313 ShippingMethod method = selectShippingMethod(reserved.order()); 314 ctx.emit(new Event("SHIPPING_METHOD_SELECTED", method.name())); 315 316 return StepResult.continueWith(new AllocatedOrder(reserved, method)); 317 }) 318 .then("generate-labels", (allocated, ctx) -> { 319 ctx.emit(new Event("LABEL_GENERATION_START", allocated.reserved().order().orderId())); 320 321 String trackingNumber = "TRACK-" + System.currentTimeMillis(); 322 ctx.emit(new Event("TRACKING_NUMBER_GENERATED", trackingNumber)); 323 324 return StepResult.continueWith(new LabeledOrder(allocated, trackingNumber)); 325 }) 326 .then("notify-warehouse", (labeled, ctx) -> { 327 ctx.emit(new Event("WAREHOUSE_NOTIFICATION", labeled.trackingNumber())); 328 329 return StepResult.finish(new FulfillmentResult( 330 labeled.allocated().reserved().order().orderId(), 331 labeled.trackingNumber(), 332 FulfillmentStatus.READY_TO_SHIP 333 )); 334 }); 335 336 engine.register(builder); 337 338 // Test workflow execution 339 Order order = new Order("ORD-789", Arrays.asList( 340 new OrderItem("SKU-1", 2), 341 new OrderItem("SKU-2", 1) 342 )); 343 344 FulfillmentResult result = executeWorkflow("fulfillment-workflow", order); 345 346 // Verify result 347 assertEquals("ORD-789", result.orderId()); 348 assertEquals(FulfillmentStatus.READY_TO_SHIP, result.status()); 349 assertNotNull(result.trackingNumber()); 350 351 // Verify events were emitted in correct order 352 List<String> eventTypes = capturedEvents.stream() 353 .map(WorkflowEvent::getType) 354 .collect(Collectors.toList()); 355 356 assertEquals(Arrays.asList( 357 "INVENTORY_CHECK", 358 "INVENTORY_RESERVED", 359 "SHIPPING_ALLOCATION_START", 360 "SHIPPING_METHOD_SELECTED", 361 "LABEL_GENERATION_START", 362 "TRACKING_NUMBER_GENERATED", 363 "WAREHOUSE_NOTIFICATION" 364 ), eventTypes); 365 366 // Test inventory shortage scenario 367 orchestrator.mock() 368 .workflow("fulfillment-workflow") 369 .step("reserve-inventory") 370 .always() 371 .thenReturn(Order.class, ord -> { 372 ctx.emit(new Event("INVENTORY_CHECK", ord.orderId())); 373 ctx.emit(new Event("INVENTORY_SHORTAGE", ord.orderId())); 374 return StepResult.fail("Insufficient inventory"); 375 }); 376 377 capturedEvents.clear(); 378 379 assertThrows(Exception.class, () -> 380 executeWorkflow("fulfillment-workflow", order) 381 ); 382 383 // Verify shortage event was emitted 384 assertTrue(capturedEvents.stream() 385 .anyMatch(e -> e.getType().equals("INVENTORY_SHORTAGE"))); 386 } 387 388 // Helper methods 389 private boolean checkInventory(Order order) { 390 return !order.orderId().contains("OOS"); // Out of stock 391 } 392 393 private ShippingMethod selectShippingMethod(Order order) { 394 return order.items().size() > 5 ? ShippingMethod.FREIGHT : ShippingMethod.STANDARD; 395 } 396 397 // Domain classes 398 record Order(String orderId, List<OrderItem> items) {} 399 record OrderItem(String sku, int quantity) {} 400 record ReservedOrder(Order order, String reservationId) {} 401 enum ShippingMethod { STANDARD, EXPRESS, FREIGHT } 402 record AllocatedOrder(ReservedOrder reserved, ShippingMethod method) {} 403 record LabeledOrder(AllocatedOrder allocated, String trackingNumber) {} 404 enum FulfillmentStatus { READY_TO_SHIP, SHIPPED, DELIVERED } 405 record FulfillmentResult(String orderId, String trackingNumber, FulfillmentStatus status) {} 406 record Event(String type, Object data) {} 407 } 408 ``` 409 410 ## Saga Pattern Testing 411 412 ### Testing Distributed Transaction Workflows 413 414 ```java 415 public class SagaWorkflowTest extends WorkflowTestBase { 416 417 @Test 418 void testHotelBookingSaga() throws Exception { 419 // Create saga workflow with compensation 420 WorkflowBuilder<BookingRequest, BookingConfirmation> builder = WorkflowBuilder 421 .define("booking-saga", BookingRequest.class, BookingConfirmation.class) 422 423 // Step 1: Reserve hotel room 424 .then("reserve-room", (request, ctx) -> { 425 String reservationId = "ROOM-" + System.currentTimeMillis(); 426 ctx.set("room-reservation", reservationId); 427 return StepResult.continueWith(new RoomReserved(reservationId, request)); 428 }) 429 430 // Step 2: Charge payment 431 .then("charge-payment", (roomReserved, ctx) -> { 432 try { 433 String paymentId = processPayment(roomReserved.request().creditCard()); 434 ctx.set("payment-id", paymentId); 435 return StepResult.continueWith(new PaymentCharged(paymentId, roomReserved)); 436 } catch (PaymentException e) { 437 // Trigger compensation 438 ctx.set("compensation-needed", true); 439 return StepResult.fail("Payment failed: " + e.getMessage()); 440 } 441 }) 442 443 // Step 3: Send confirmation 444 .then("send-confirmation", (paymentCharged, ctx) -> { 445 try { 446 String confirmationId = sendEmail(paymentCharged.roomReserved().request().email()); 447 return StepResult.finish(new BookingConfirmation( 448 paymentCharged.roomReserved().reservationId(), 449 paymentCharged.paymentId(), 450 confirmationId 451 )); 452 } catch (Exception e) { 453 ctx.set("compensation-needed", true); 454 return StepResult.fail("Confirmation failed"); 455 } 456 }) 457 458 // Compensation flow 459 .onError((error, ctx) -> { 460 if (ctx.get("compensation-needed", false)) { 461 // Compensate in reverse order 462 String paymentId = (String) ctx.get("payment-id"); 463 if (paymentId != null) { 464 refundPayment(paymentId); 465 } 466 467 String roomReservation = (String) ctx.get("room-reservation"); 468 if (roomReservation != null) { 469 cancelRoomReservation(roomReservation); 470 } 471 } 472 return StepResult.fail("Booking failed, compensations executed"); 473 }); 474 475 engine.register(builder); 476 477 // Test successful saga 478 BookingRequest request = new BookingRequest( 479 "user@example.com", 480 "2024-06-01", 481 "2024-06-05", 482 "VISA-1234" 483 ); 484 485 BookingConfirmation confirmation = executeWorkflow("booking-saga", request); 486 assertNotNull(confirmation.reservationId()); 487 assertNotNull(confirmation.paymentId()); 488 assertNotNull(confirmation.confirmationId()); 489 490 // Test saga with payment failure 491 orchestrator.mock() 492 .workflow("booking-saga") 493 .step("charge-payment") 494 .always() 495 .thenFail(new PaymentException("Card declined")); 496 497 // Track compensation calls 498 AtomicBoolean roomCancelled = new AtomicBoolean(false); 499 AtomicBoolean paymentRefunded = new AtomicBoolean(false); 500 501 orchestrator.mock() 502 .workflow("booking-saga") 503 .step("onError") 504 .always() 505 .thenReturn(Object.class, err -> { 506 roomCancelled.set(true); 507 paymentRefunded.set(true); 508 return StepResult.fail("Compensated"); 509 }); 510 511 // Execute and expect failure 512 assertThrows(Exception.class, () -> 513 executeWorkflow("booking-saga", request) 514 ); 515 516 // Verify compensations were executed 517 assertTrue(roomCancelled.get(), "Room reservation should be cancelled"); 518 assertTrue(paymentRefunded.get(), "Payment should be refunded"); 519 } 520 521 // Saga helper methods 522 private String processPayment(String creditCard) { 523 if (creditCard.contains("FAIL")) { 524 throw new PaymentException("Payment processing failed"); 525 } 526 return "PAY-" + System.currentTimeMillis(); 527 } 528 529 private String sendEmail(String email) { 530 return "CONF-" + System.currentTimeMillis(); 531 } 532 533 private void refundPayment(String paymentId) { 534 // Compensation logic 535 } 536 537 private void cancelRoomReservation(String reservationId) { 538 // Compensation logic 539 } 540 541 // Domain classes 542 record BookingRequest(String email, String checkIn, String checkOut, String creditCard) {} 543 record RoomReserved(String reservationId, BookingRequest request) {} 544 record PaymentCharged(String paymentId, RoomReserved roomReserved) {} 545 record BookingConfirmation(String reservationId, String paymentId, String confirmationId) {} 546 547 static class PaymentException extends RuntimeException { 548 PaymentException(String message) { super(message); } 549 } 550 } 551 ``` 552 553 ## Dynamic Workflow Testing 554 555 ### Testing Workflows with Dynamic Step Generation 556 557 ```java 558 public class DynamicWorkflowTest extends WorkflowTestBase { 559 560 @Test 561 void testDynamicApprovalWorkflow() throws Exception { 562 // Create workflow with dynamic approval steps 563 WorkflowBuilder<ApprovalRequest, ApprovalResult> builder = WorkflowBuilder 564 .define("dynamic-approval", ApprovalRequest.class, ApprovalResult.class) 565 566 .then("determine-approvers", (request, ctx) -> { 567 List<String> approvers = determineApprovers(request); 568 ctx.set("approvers", approvers); 569 ctx.set("approvals", new HashMap<String, Boolean>()); 570 return StepResult.continueWith(new ApprovalProcess(request, approvers)); 571 }) 572 573 .dynamicSteps("approval-loop", (process, ctx) -> { 574 List<String> approvers = (List<String>) ctx.get("approvers"); 575 Map<String, Boolean> approvals = (Map<String, Boolean>) ctx.get("approvals"); 576 577 // Generate a step for each approver 578 return approvers.stream() 579 .filter(approver -> !approvals.containsKey(approver)) 580 .findFirst() 581 .map(approver -> { 582 return StepDefinition.create( 583 "approve-" + approver, 584 (ApprovalProcess proc, WorkflowContext c) -> { 585 // Simulate approval 586 boolean approved = !approver.contains("REJECT"); 587 approvals.put(approver, approved); 588 c.set("approvals", approvals); 589 590 if (!approved) { 591 return StepResult.fail("Rejected by " + approver); 592 } 593 594 // Check if all approvals are complete 595 if (approvals.size() == approvers.size()) { 596 return StepResult.continueWith(new AllApproved(proc.request())); 597 } 598 599 // Continue to next approver 600 return StepResult.continueWith(proc); 601 } 602 ); 603 }) 604 .orElse(null); 605 }) 606 607 .then("finalize", (allApproved, ctx) -> { 608 Map<String, Boolean> approvals = (Map<String, Boolean>) ctx.get("approvals"); 609 return StepResult.finish(new ApprovalResult( 610 allApproved.request().id(), 611 true, 612 approvals 613 )); 614 }); 615 616 engine.register(builder); 617 618 // Test with multiple approvers 619 ApprovalRequest request = new ApprovalRequest( 620 "REQ-123", 621 ApprovalType.BUDGET, 622 50000.0 623 ); 624 625 ApprovalResult result = executeWorkflow("dynamic-approval", request); 626 assertTrue(result.approved()); 627 assertEquals(3, result.approverDecisions().size()); // Manager, Director, VP 628 629 // Verify dynamic steps were executed 630 assertions.assertStep("dynamic-approval", "approve-MANAGER").wasExecuted(); 631 assertions.assertStep("dynamic-approval", "approve-DIRECTOR").wasExecuted(); 632 assertions.assertStep("dynamic-approval", "approve-VP").wasExecuted(); 633 634 // Test rejection scenario 635 ApprovalRequest rejectRequest = new ApprovalRequest( 636 "REQ-456", 637 ApprovalType.BUDGET, 638 100000.0 // Will require REJECT-CFO 639 ); 640 641 assertThrows(Exception.class, () -> 642 executeWorkflow("dynamic-approval", rejectRequest) 643 ); 644 } 645 646 // Helper method to determine approvers based on request 647 private List<String> determineApprovers(ApprovalRequest request) { 648 List<String> approvers = new ArrayList<>(); 649 650 if (request.amount() < 10000) { 651 approvers.add("MANAGER"); 652 } else if (request.amount() < 50000) { 653 approvers.add("MANAGER"); 654 approvers.add("DIRECTOR"); 655 } else if (request.amount() < 100000) { 656 approvers.add("MANAGER"); 657 approvers.add("DIRECTOR"); 658 approvers.add("VP"); 659 } else { 660 approvers.add("MANAGER"); 661 approvers.add("DIRECTOR"); 662 approvers.add("VP"); 663 approvers.add("REJECT-CFO"); // Will reject 664 } 665 666 return approvers; 667 } 668 669 // Domain classes 670 enum ApprovalType { BUDGET, HIRE, PROJECT } 671 record ApprovalRequest(String id, ApprovalType type, double amount) {} 672 record ApprovalProcess(ApprovalRequest request, List<String> approvers) {} 673 record AllApproved(ApprovalRequest request) {} 674 record ApprovalResult(String requestId, boolean approved, Map<String, Boolean> approverDecisions) {} 675 } 676 ``` 677 678 ## Integration Test Patterns 679 680 ### Testing Workflows with External System Integration 681 682 ```java 683 @SpringBootTest 684 @TestContainers 685 public class IntegrationWorkflowTest extends WorkflowTestBase { 686 687 @Container 688 static PostgreSQLContainer<?> postgres = new PostgreSQLContainer<>("postgres:14") 689 .withDatabaseName("workflow_test") 690 .withUsername("test") 691 .withPassword("test"); 692 693 @Container 694 static GenericContainer<?> redis = new GenericContainer<>("redis:7") 695 .withExposedPorts(6379); 696 697 @Autowired 698 private DataSource dataSource; 699 700 @Autowired 701 private RedisTemplate<String, Object> redisTemplate; 702 703 @DynamicPropertySource 704 static void properties(DynamicPropertyRegistry registry) { 705 registry.add("spring.datasource.url", postgres::getJdbcUrl); 706 registry.add("spring.datasource.username", postgres::getUsername); 707 registry.add("spring.datasource.password", postgres::getPassword); 708 registry.add("spring.redis.host", redis::getHost); 709 registry.add("spring.redis.port", () -> redis.getMappedPort(6379)); 710 } 711 712 @Test 713 void testDataProcessingWithRealDatabases() throws Exception { 714 // Create workflow that uses real databases 715 WorkflowBuilder<DataImportRequest, DataImportResult> builder = WorkflowBuilder 716 .define("data-import", DataImportRequest.class, DataImportResult.class) 717 718 .then("validate-schema", (request, ctx) -> { 719 // Validate against real database schema 720 try (Connection conn = dataSource.getConnection()) { 721 DatabaseMetaData meta = conn.getMetaData(); 722 ResultSet tables = meta.getTables(null, null, request.tableName(), null); 723 if (!tables.next()) { 724 return StepResult.fail("Table does not exist: " + request.tableName()); 725 } 726 } 727 return StepResult.continueWith(request); 728 }) 729 730 .then("cache-metadata", (request, ctx) -> { 731 // Store metadata in Redis 732 String cacheKey = "import:" + request.importId(); 733 ImportMetadata metadata = new ImportMetadata( 734 request.importId(), 735 request.tableName(), 736 System.currentTimeMillis() 737 ); 738 redisTemplate.opsForValue().set(cacheKey, metadata, Duration.ofHours(1)); 739 return StepResult.continueWith(new ValidatedImport(request, metadata)); 740 }) 741 742 .then("import-data", (validated, ctx) -> { 743 // Import data to real database 744 try (Connection conn = dataSource.getConnection()) { 745 // Start transaction 746 conn.setAutoCommit(false); 747 748 String sql = "INSERT INTO " + validated.request().tableName() + 749 " (id, data, created_at) VALUES (?, ?, ?)"; 750 751 try (PreparedStatement stmt = conn.prepareStatement(sql)) { 752 for (Map<String, Object> record : validated.request().records()) { 753 stmt.setString(1, (String) record.get("id")); 754 stmt.setString(2, (String) record.get("data")); 755 stmt.setTimestamp(3, new Timestamp(System.currentTimeMillis())); 756 stmt.addBatch(); 757 } 758 759 int[] results = stmt.executeBatch(); 760 conn.commit(); 761 762 return StepResult.continueWith(new ImportStats( 763 validated, 764 results.length, 765 0 766 )); 767 } catch (Exception e) { 768 conn.rollback(); 769 throw e; 770 } 771 } 772 }) 773 774 .then("update-cache", (stats, ctx) -> { 775 // Update Redis with import results 776 String cacheKey = "import:" + stats.validated().request().importId() + ":stats"; 777 redisTemplate.opsForValue().set(cacheKey, stats, Duration.ofDays(7)); 778 779 return StepResult.finish(new DataImportResult( 780 stats.validated().request().importId(), 781 stats.successCount(), 782 stats.errorCount() 783 )); 784 }); 785 786 engine.register(builder); 787 788 // Setup test data 789 createTestTable(); 790 791 List<Map<String, Object>> testRecords = Arrays.asList( 792 Map.of("id", "1", "data", "Test record 1"), 793 Map.of("id", "2", "data", "Test record 2"), 794 Map.of("id", "3", "data", "Test record 3") 795 ); 796 797 DataImportRequest request = new DataImportRequest( 798 "IMP-001", 799 "test_imports", 800 testRecords 801 ); 802 803 // Execute workflow 804 DataImportResult result = executeWorkflow("data-import", request); 805 806 // Verify results 807 assertEquals(3, result.successCount()); 808 assertEquals(0, result.errorCount()); 809 810 // Verify data in PostgreSQL 811 try (Connection conn = dataSource.getConnection(); 812 Statement stmt = conn.createStatement(); 813 ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM test_imports")) { 814 assertTrue(rs.next()); 815 assertEquals(3, rs.getInt(1)); 816 } 817 818 // Verify cache in Redis 819 ImportMetadata cached = (ImportMetadata) redisTemplate.opsForValue() 820 .get("import:IMP-001"); 821 assertNotNull(cached); 822 assertEquals("test_imports", cached.tableName()); 823 824 ImportStats stats = (ImportStats) redisTemplate.opsForValue() 825 .get("import:IMP-001:stats"); 826 assertNotNull(stats); 827 assertEquals(3, stats.successCount()); 828 } 829 830 private void createTestTable() throws SQLException { 831 try (Connection conn = dataSource.getConnection(); 832 Statement stmt = conn.createStatement()) { 833 stmt.execute(""" 834 CREATE TABLE IF NOT EXISTS test_imports ( 835 id VARCHAR(50) PRIMARY KEY, 836 data TEXT, 837 created_at TIMESTAMP 838 ) 839 """); 840 } 841 } 842 843 // Domain classes 844 record DataImportRequest(String importId, String tableName, List<Map<String, Object>> records) {} 845 record ImportMetadata(String importId, String tableName, long timestamp) implements Serializable {} 846 record ValidatedImport(DataImportRequest request, ImportMetadata metadata) {} 847 record ImportStats(ValidatedImport validated, int successCount, int errorCount) implements Serializable {} 848 record DataImportResult(String importId, int successCount, int errorCount) {} 849 } 850 ``` 851 852 ## Summary 853 854 These advanced examples demonstrate: 855 856 1. **Stateful Workflows** - Managing complex state across workflow steps 857 2. **Multi-Stage Pipelines** - Testing ETL and data processing workflows 858 3. **Event-Driven Patterns** - Workflows that emit and react to events 859 4. **Saga Pattern** - Distributed transactions with compensation 860 5. **Dynamic Workflows** - Workflows that generate steps at runtime 861 6. **Integration Testing** - Testing with real external systems using TestContainers 862 863 Each example shows best practices for: 864 - Proper mock usage 865 - Comprehensive assertions 866 - Performance testing 867 - Error handling 868 - Integration with external systems 869 870 Use these patterns as templates for testing your own complex workflow scenarios.