README.md
1 # DriftKit Workflow Engine Core 2 3 **Advanced workflow orchestration engine for AI applications with comprehensive support for conversational AI, chatbots, and human-in-the-loop workflows** 4 5 ## 🌟 Overview 6 7 DriftKit Workflow Engine is a powerful framework that unifies multiple workflow paradigms: 8 - **Traditional Workflows** - Step-by-step business process automation 9 - **Conversational Workflows** - Multi-turn chat interactions with automatic message tracking 10 - **Human-in-the-Loop** - Seamless suspension and resumption for human input 11 - **Async Processing** - Long-running operations with progress tracking 12 - **Multi-Agent Systems** - Orchestration of multiple AI agents 13 14 ## 🎯 Key Features 15 16 ### 1. Chat & Conversational AI Support 17 - **Automatic Message Tracking** - All suspend/finish messages are automatically saved to ChatStore 18 - **Session Management** - Conversations linked to chatId for continuity 19 - **Multi-turn Dialogues** - Complex conversational flows with branching 20 - **Context Preservation** - Full state maintained across suspensions 21 - **No Manual Saving** - Framework handles all chat persistence 22 23 ### 2. Human-in-the-Loop Capabilities 24 - **Workflow Suspension** - Use `StepResult.suspend()` to pause for human input 25 - **Type-Safe Resumption** - Specify expected input class for type safety 26 - **Context Preservation** - Full workflow state maintained during suspension 27 - **Multiple Suspension Points** - Workflows can suspend multiple times 28 - **Flexible Data Collection** - Support for any custom data types 29 30 ### 3. Core Workflow Features 31 - **Annotation-Based** - Define workflows with `@Workflow`, `@InitialStep`, `@Step` 32 - **Fluent API** - Programmatic workflow construction with `WorkflowBuilder` 33 - **Async Processing** - `@AsyncStep` for long-running operations 34 - **Conditional Routing** - Branch based on data or conditions 35 - **Error Handling** - Comprehensive exception management with retry policies 36 37 ## 📚 Core Concepts 38 39 ### Workflow Definition 40 41 Workflows can be defined using annotations or the fluent API: 42 43 #### Annotation-Based Approach 44 45 ```java 46 @Workflow( 47 id = "customer-onboarding", 48 version = "1.0", 49 description = "Customer onboarding process" 50 ) 51 public class CustomerOnboardingWorkflow { 52 53 @InitialStep 54 public StepResult<WelcomeMessage> startOnboarding(StartEvent event, WorkflowContext context) { 55 WelcomeMessage welcome = new WelcomeMessage(); 56 welcome.setText("Welcome! Let's get you started."); 57 welcome.setNextSteps(Arrays.asList("Personal Info", "Preferences", "Verification")); 58 59 // Suspend workflow and wait for customer info 60 return StepResult.suspend(welcome, CustomerInfo.class); 61 } 62 63 @Step 64 public StepResult<?> processCustomerInfo(CustomerInfo info, WorkflowContext context) { 65 // Validate customer data 66 if (!isValid(info)) { 67 ValidationError error = new ValidationError(); 68 error.setMessage("Please correct the following issues"); 69 error.setFields(getInvalidFields(info)); 70 71 // Suspend again for corrections 72 return StepResult.suspend(error, CustomerInfo.class); 73 } 74 75 // Continue to next step 76 context.put("customerInfo", info); 77 return StepResult.continueWith(new InfoVerified(info)); 78 } 79 } 80 ``` 81 82 #### Fluent API Approach 83 84 ```java 85 var workflow = WorkflowBuilder 86 .define("customer-onboarding", StartRequest.class, OnboardingResult.class) 87 .then("welcome", (StartRequest req) -> { 88 WelcomeMessage welcome = new WelcomeMessage("Welcome " + req.getName()); 89 return StepResult.suspend(welcome, CustomerInfo.class); 90 }) 91 .then("validate", (CustomerInfo info) -> { 92 if (isValid(info)) { 93 return StepResult.continueWith(new ValidatedInfo(info)); 94 } else { 95 return StepResult.suspend(new ValidationError(), CustomerInfo.class); 96 } 97 }) 98 .then("complete", (ValidatedInfo info) -> { 99 return StepResult.finish(new OnboardingResult(info)); 100 }) 101 .build(); 102 ``` 103 104 ### Step Result Types 105 106 The framework provides several result types for controlling workflow execution: 107 108 #### 1. Continue - Move to Next Step 109 ```java 110 return StepResult.continueWith(data); 111 ``` 112 113 #### 2. Suspend - Wait for Human Input 114 ```java 115 // Suspend and specify expected input type 116 return StepResult.suspend(promptData, ExpectedInputClass.class); 117 118 // With metadata 119 Map<String, String> metadata = Map.of("priority", "high"); 120 return StepResult.suspend(promptData, ExpectedInputClass.class, metadata); 121 ``` 122 123 #### 3. Branch - Conditional Routing 124 ```java 125 return StepResult.branch(new SpecificEvent(data)); 126 ``` 127 128 #### 4. Async - Long-Running Operations 129 ```java 130 return StepResult.async( 131 "taskId", // Unique task identifier 132 30000L, // Timeout in milliseconds 133 taskArgs, // Arguments for async handler 134 immediateData // Data to return immediately 135 ); 136 ``` 137 138 #### 5. Finish - Complete Workflow 139 ```java 140 return StepResult.finish(finalResult); 141 ``` 142 143 #### 6. Fail - Handle Errors 144 ```java 145 return StepResult.fail(exception); 146 // or 147 return StepResult.fail("Error message"); 148 ``` 149 150 ## 🤝 Human-in-the-Loop Patterns 151 152 ### Basic Suspension Pattern 153 154 ```java 155 @Workflow(id = "feedback-collection", version = "1.0") 156 public class FeedbackWorkflow { 157 158 @InitialStep 159 public StepResult<FeedbackRequest> requestFeedback(StartEvent event, WorkflowContext context) { 160 FeedbackRequest request = new FeedbackRequest(); 161 request.setQuestion("How was your experience?"); 162 request.setOptions(Arrays.asList("Excellent", "Good", "Fair", "Poor")); 163 164 // Workflow suspends here until user provides feedback 165 return StepResult.suspend(request, UserFeedback.class); 166 } 167 168 @Step 169 public StepResult<?> processFeedback(UserFeedback feedback, WorkflowContext context) { 170 if ("Poor".equals(feedback.getRating())) { 171 // Ask for more details 172 DetailRequest detailRequest = new DetailRequest(); 173 detailRequest.setMessage("We're sorry to hear that. Can you tell us more?"); 174 175 return StepResult.suspend(detailRequest, DetailedFeedback.class); 176 } 177 178 // Complete workflow 179 return StepResult.finish(new FeedbackResult(feedback)); 180 } 181 } 182 ``` 183 184 ### Multi-Step Data Collection 185 186 ```java 187 @Workflow(id = "multi-step-form", version = "1.0") 188 public class MultiStepFormWorkflow { 189 190 @InitialStep 191 public StepResult<PersonalInfoForm> collectPersonalInfo(StartEvent event, WorkflowContext context) { 192 PersonalInfoForm form = new PersonalInfoForm(); 193 form.setTitle("Step 1: Personal Information"); 194 form.setFields(Arrays.asList("name", "email", "phone")); 195 196 return StepResult.suspend(form, PersonalInfo.class); 197 } 198 199 @Step 200 public StepResult<AddressForm> collectAddress(PersonalInfo info, WorkflowContext context) { 201 context.put("personalInfo", info); 202 203 AddressForm form = new AddressForm(); 204 form.setTitle("Step 2: Address Information"); 205 206 return StepResult.suspend(form, AddressInfo.class); 207 } 208 209 @Step 210 public StepResult<PreferencesForm> collectPreferences(AddressInfo address, WorkflowContext context) { 211 context.put("addressInfo", address); 212 213 PreferencesForm form = new PreferencesForm(); 214 form.setTitle("Step 3: Your Preferences"); 215 216 return StepResult.suspend(form, Preferences.class); 217 } 218 219 @Step 220 public StepResult<RegistrationComplete> completeRegistration(Preferences prefs, WorkflowContext context) { 221 PersonalInfo personal = context.get("personalInfo", PersonalInfo.class); 222 AddressInfo address = context.get("addressInfo", AddressInfo.class); 223 224 // Process complete registration 225 RegistrationComplete result = new RegistrationComplete(); 226 result.setUserId(generateUserId()); 227 result.setWelcomeMessage("Registration complete! Welcome " + personal.getName()); 228 229 return StepResult.finish(result); 230 } 231 } 232 ``` 233 234 ## 💬 Chat Workflow Features 235 236 ### Automatic Message Tracking 237 238 When using ChatStore with WorkflowEngine, all messages are automatically tracked: 239 240 ```java 241 // Configure engine with ChatStore 242 ChatStore chatStore = new InMemoryChatStore(new SimpleTextTokenizer()); 243 WorkflowEngineConfig config = WorkflowEngineConfig.builder() 244 .chatStore(chatStore) 245 .build(); 246 247 WorkflowEngine engine = new WorkflowEngine(config); 248 249 // Define workflow - NO manual chat saving needed! 250 var chatWorkflow = WorkflowBuilder 251 .define("support-chat", ChatInput.class, ChatResult.class) 252 .then("greet", (ChatInput input) -> { 253 // This message is automatically saved to ChatStore 254 Greeting greeting = new Greeting("Hello! How can I help you today?"); 255 return StepResult.suspend(greeting, UserQuery.class); 256 }) 257 .then("respond", (UserQuery query) -> { 258 // User input is automatically saved 259 // Generate response 260 Response response = generateResponse(query); 261 262 // This is also automatically saved 263 return StepResult.finish(response); 264 }) 265 .build(); 266 267 // Execute with chatId - all messages are auto-tracked! 268 engine.execute("support-chat", new ChatInput("Start"), "chat-123"); 269 ``` 270 271 ### Conversation Flow Example 272 273 ```java 274 @Workflow(id = "support-assistant", version = "1.0") 275 public class SupportAssistantWorkflow { 276 277 @InitialStep 278 public StepResult<MenuOptions> presentMenu(StartEvent event, WorkflowContext context) { 279 MenuOptions menu = new MenuOptions(); 280 menu.setGreeting("Welcome to support! What can I help you with?"); 281 menu.setOptions(Arrays.asList( 282 "Check Order Status", 283 "Technical Support", 284 "Billing Question", 285 "Other" 286 )); 287 288 return StepResult.suspend(menu, UserChoice.class); 289 } 290 291 @Step(nextClasses = {OrderQuery.class, TechIssue.class, BillingQuery.class, GeneralQuery.class}) 292 public StepResult<?> routeByChoice(UserChoice choice, WorkflowContext context) { 293 return switch (choice.getSelection()) { 294 case "Check Order Status" -> StepResult.branch(new OrderQuery()); 295 case "Technical Support" -> StepResult.branch(new TechIssue()); 296 case "Billing Question" -> StepResult.branch(new BillingQuery()); 297 default -> StepResult.branch(new GeneralQuery()); 298 }; 299 } 300 301 @Step 302 public StepResult<OrderStatusPrompt> handleOrderQuery(OrderQuery query, WorkflowContext context) { 303 OrderStatusPrompt prompt = new OrderStatusPrompt(); 304 prompt.setMessage("I can help you check your order status."); 305 prompt.setInstruction("Please provide your order number:"); 306 307 return StepResult.suspend(prompt, OrderNumber.class); 308 } 309 310 @Step 311 public StepResult<OrderStatus> checkOrderStatus(OrderNumber orderNum, WorkflowContext context) { 312 // Look up order (could be async) 313 OrderStatus status = orderService.getStatus(orderNum.getValue()); 314 315 // If not found, ask again 316 if (status == null) { 317 OrderNotFound notFound = new OrderNotFound(); 318 notFound.setMessage("I couldn't find order " + orderNum.getValue()); 319 notFound.setSuggestion("Please double-check the order number"); 320 321 return StepResult.suspend(notFound, OrderNumber.class); 322 } 323 324 // Complete with order status 325 return StepResult.finish(status); 326 } 327 } 328 ``` 329 330 ## 🔄 Async Processing 331 332 ### Basic Async Pattern 333 334 ```java 335 @Workflow(id = "document-processing", version = "1.0") 336 public class DocumentProcessingWorkflow { 337 338 @InitialStep 339 public StepResult<ProcessingStatus> startProcessing(DocumentRequest request, WorkflowContext context) { 340 context.put("documentId", request.getDocumentId()); 341 342 // Return async result with immediate status 343 Map<String, Object> taskArgs = Map.of( 344 "documentUrl", request.getDocumentUrl(), 345 "documentType", request.getType() 346 ); 347 348 ProcessingStatus immediateStatus = new ProcessingStatus(); 349 immediateStatus.setStatus("PROCESSING_STARTED"); 350 immediateStatus.setMessage("Document processing has begun"); 351 immediateStatus.setEstimatedTime(30); 352 353 return StepResult.async( 354 "process-doc-" + request.getDocumentId(), 355 30000L, // 30 second timeout 356 taskArgs, 357 immediateStatus 358 ); 359 } 360 361 @AsyncStep("process-doc-*") 362 public StepResult<ProcessingResult> processDocumentAsync( 363 Map<String, Object> taskArgs, 364 WorkflowContext context, 365 TaskProgressReporter progress) { 366 367 String documentUrl = (String) taskArgs.get("documentUrl"); 368 369 try { 370 progress.updateProgress(10, "Downloading document..."); 371 byte[] content = downloadDocument(documentUrl); 372 373 progress.updateProgress(40, "Extracting text..."); 374 String text = extractText(content); 375 376 progress.updateProgress(70, "Analyzing content..."); 377 Analysis analysis = analyzeDocument(text); 378 379 progress.updateProgress(100, "Processing complete"); 380 381 ProcessingResult result = new ProcessingResult(); 382 result.setDocumentId(context.get("documentId", String.class)); 383 result.setText(text); 384 result.setAnalysis(analysis); 385 386 return StepResult.finish(result); 387 388 } catch (Exception e) { 389 return StepResult.fail(e); 390 } 391 } 392 } 393 ``` 394 395 ### Async with Progress Updates 396 397 ```java 398 @AsyncStep("complex-analysis") 399 public StepResult<AnalysisResult> performComplexAnalysis( 400 Map<String, Object> args, 401 WorkflowContext context, 402 TaskProgressReporter progress) { 403 404 List<String> items = (List<String>) args.get("items"); 405 List<ItemResult> results = new ArrayList<>(); 406 407 for (int i = 0; i < items.size(); i++) { 408 // Check if cancelled 409 if (progress.isCancelled()) { 410 return StepResult.fail("Analysis cancelled by user"); 411 } 412 413 // Update progress 414 int percentComplete = (i * 100) / items.size(); 415 progress.updateProgress(percentComplete, "Processing item " + (i + 1) + " of " + items.size()); 416 417 // Process item 418 ItemResult itemResult = processItem(items.get(i)); 419 results.add(itemResult); 420 } 421 422 progress.updateProgress(100, "Analysis complete"); 423 424 return StepResult.finish(new AnalysisResult(results)); 425 } 426 ``` 427 428 ## 📋 Schema Annotations for Data Classes 429 430 Use schema annotations to provide metadata for UI generation: 431 432 ```java 433 @Data 434 @NoArgsConstructor 435 @SchemaName("customerInfo") 436 @SchemaDescription("Customer information form") 437 public class CustomerInfo { 438 439 @SchemaProperty( 440 description = "Full name", 441 required = true, 442 nameId = "customer.name" 443 ) 444 private String name; 445 446 @SchemaProperty( 447 description = "Email address", 448 required = true, 449 pattern = "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$" 450 ) 451 private String email; 452 453 @SchemaProperty( 454 description = "Account type", 455 values = {"personal", "business", "enterprise"} 456 ) 457 private String accountType; 458 459 @SchemaProperty( 460 description = "Years of experience", 461 min = 0, 462 max = 50 463 ) 464 private Integer experience; 465 } 466 ``` 467 468 ## 🚀 Quick Start Guide 469 470 ### 1. Define Your Workflow 471 472 ```java 473 @Workflow(id = "simple-chat", version = "1.0") 474 public class SimpleChatWorkflow { 475 476 @InitialStep 477 public StepResult<WelcomeMessage> start(StartEvent event, WorkflowContext context) { 478 WelcomeMessage welcome = new WelcomeMessage(); 479 welcome.setText("Hello! What's your name?"); 480 481 return StepResult.suspend(welcome, UserName.class); 482 } 483 484 @Step 485 public StepResult<PersonalizedGreeting> greetUser(UserName name, WorkflowContext context) { 486 PersonalizedGreeting greeting = new PersonalizedGreeting(); 487 greeting.setMessage("Nice to meet you, " + name.getValue() + "!"); 488 greeting.setNextPrompt("How can I help you today?"); 489 490 return StepResult.suspend(greeting, UserRequest.class); 491 } 492 493 @Step 494 public StepResult<HelpResponse> provideHelp(UserRequest request, WorkflowContext context) { 495 HelpResponse response = processRequest(request); 496 return StepResult.finish(response); 497 } 498 } 499 ``` 500 501 ### 2. Configure and Register 502 503 ```java 504 // Create configuration 505 WorkflowEngineConfig config = WorkflowEngineConfig.builder() 506 .chatStore(new InMemoryChatStore(new SimpleTextTokenizer())) 507 .build(); 508 509 // Create engine 510 WorkflowEngine engine = new WorkflowEngine(config); 511 512 // Register workflow 513 engine.register(new SimpleChatWorkflow()); 514 ``` 515 516 ### 3. Execute Workflow 517 518 ```java 519 // Start workflow 520 String chatId = "chat-123"; 521 WorkflowExecution<HelpResponse> execution = engine.execute("simple-chat", null, chatId); 522 523 // Check if suspended 524 if (execution.isSuspended()) { 525 // Get suspension data (e.g., WelcomeMessage) 526 Object suspensionData = execution.getCurrentResult(); 527 528 // Later, resume with user input 529 UserName userName = new UserName("John"); 530 execution = engine.resume(execution.getRunId(), userName); 531 } 532 533 // Continue resuming until complete 534 while (execution.isSuspended()) { 535 // Get user input based on suspension data 536 Object userInput = getUserInput(execution.getCurrentResult()); 537 execution = engine.resume(execution.getRunId(), userInput); 538 } 539 540 // Get final result 541 HelpResponse result = execution.getResult(); 542 ``` 543 544 ## 🛠️ Advanced Features 545 546 ### Retry Policies 547 548 ```java 549 @Step( 550 name = "processPayment", 551 retryPolicy = @RetryPolicy( 552 maxAttempts = 3, 553 initialDelay = 1000, 554 maxDelay = 10000, 555 multiplier = 2.0 556 ) 557 ) 558 public StepResult<PaymentResult> processPayment(PaymentRequest request, WorkflowContext context) { 559 // Payment processing with automatic retry 560 } 561 ``` 562 563 ### Cross-Workflow Calls 564 565 ```java 566 @Step 567 public StepResult<?> callOtherWorkflow(DataEvent data, WorkflowContext context) { 568 // Prepare input for other workflow 569 OtherWorkflowInput input = new OtherWorkflowInput(data.getValue()); 570 571 // Call external workflow 572 return StepResult.external( 573 "other-workflow-id", 574 input, 575 "processExternalResult" // Next step after external workflow completes 576 ); 577 } 578 ``` 579 580 ### Conditional Routing 581 582 ```java 583 @Step 584 public StepResult<?> routeBasedOnCondition(ProcessedData data, WorkflowContext context) { 585 if (data.getScore() > 0.8) { 586 return StepResult.branch(new HighScoreEvent(data)); 587 } else if (data.getScore() > 0.5) { 588 return StepResult.branch(new MediumScoreEvent(data)); 589 } else { 590 return StepResult.branch(new LowScoreEvent(data)); 591 } 592 } 593 ``` 594 595 ## 📊 Monitoring and Testing 596 597 ### Workflow State Access 598 599 ```java 600 // Get workflow instance 601 Optional<WorkflowInstance> instance = engine.getWorkflowInstance(runId); 602 603 // Check status 604 WorkflowStatus status = instance.get().getStatus(); 605 // CREATED, RUNNING, SUSPENDED, COMPLETED, FAILED 606 607 // Get current step 608 String currentStep = instance.get().getCurrentStepId(); 609 610 // Get execution history 611 List<String> executedSteps = instance.get().getExecutedSteps(); 612 ``` 613 614 ### Testing Workflows 615 616 ```java 617 @Test 618 void testChatWorkflow() { 619 // Setup 620 WorkflowEngine engine = new WorkflowEngine(); 621 engine.register(new SimpleChatWorkflow()); 622 623 // Execute 624 var execution = engine.execute("simple-chat", null); 625 626 // Verify suspension 627 assertTrue(execution.isSuspended()); 628 assertEquals(WelcomeMessage.class, execution.getCurrentResult().getClass()); 629 630 // Resume with input 631 execution = engine.resume(execution.getRunId(), new UserName("Test User")); 632 633 // Continue until complete 634 while (execution.isSuspended()) { 635 execution = engine.resume(execution.getRunId(), createTestInput()); 636 } 637 638 // Verify result 639 assertNotNull(execution.getResult()); 640 } 641 ``` 642 643 ## 🎯 Best Practices 644 645 ### 1. Workflow Design 646 - Keep steps focused and single-purpose 647 - Use meaningful step names and descriptions 648 - Implement proper error handling 649 - Design for resumability 650 651 ### 2. Data Modeling 652 - Create clear, typed data classes for inputs/outputs 653 - Use schema annotations for better documentation 654 - Keep suspension data lightweight 655 - Avoid storing sensitive data in workflow context 656 657 ### 3. Chat Workflows 658 - Let the framework handle message persistence 659 - Use meaningful chat IDs for conversation tracking 660 - Design conversational flows to be natural 661 - Provide clear prompts and options 662 663 ### 4. Async Operations 664 - Use progress reporting for long-running tasks 665 - Implement cancellation checks 666 - Set appropriate timeouts 667 - Handle failures gracefully 668 669 ## 📚 Examples 670 671 Complete working examples can be found in the test directory: 672 - `SimplifiedChatWorkflow` - Basic chat with auto-tracking 673 - `OnboardingWorkflow` - Multi-step onboarding process 674 - `ChatWorkflowExample` - Advanced chat patterns 675 - `AsyncWorkflowExample` - Async processing patterns 676 677 --- 678 679 **DriftKit Workflow Engine** - Building sophisticated AI workflows with human-in-the-loop capabilities has never been easier.