executor.rs
1 use std::collections::{BTreeMap, BTreeSet, HashSet}; 2 use std::convert::Infallible; 3 use std::fmt::{Debug, Formatter}; 4 use std::io::{Error, Read, Write}; 5 use std::sync::Arc; 6 use std::time::SystemTime; 7 8 use anyhow::anyhow; 9 use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId}; 10 use fedimint_core::db::{ 11 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction, 12 IDatabaseTransactionOpsCoreTyped, 13 }; 14 use fedimint_core::encoding::{Decodable, DecodeError, Encodable}; 15 use fedimint_core::fmt_utils::AbbreviateJson; 16 use fedimint_core::maybe_add_send_sync; 17 use fedimint_core::module::registry::ModuleDecoderRegistry; 18 use fedimint_core::task::TaskGroup; 19 use fedimint_core::util::BoxFuture; 20 use fedimint_logging::LOG_CLIENT_REACTOR; 21 use futures::future::{self, select_all}; 22 use futures::stream::{FuturesUnordered, StreamExt}; 23 use tokio::select; 24 use tokio::sync::{mpsc, oneshot, Mutex}; 25 use tracing::{debug, error, info, trace, warn, Instrument}; 26 27 use super::state::StateTransitionFunction; 28 use crate::sm::notifier::Notifier; 29 use crate::sm::state::{DynContext, DynState}; 30 use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition}; 31 use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext}; 32 33 /// After how many attempts a DB transaction is aborted with an error 34 const MAX_DB_ATTEMPTS: Option<usize> = Some(100); 35 36 pub type ContextGen = 37 Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>; 38 39 /// Prefixes for executor DB entries 40 enum ExecutorDbPrefixes { 41 /// See [`ActiveStateKey`] 42 ActiveStates = 0xa1, 43 /// See [`InactiveStateKey`] 44 InactiveStates = 0xa2, 45 } 46 47 /// Executor that drives forward state machines under its management. 48 /// 49 /// Each state transition is atomic and supposed to be idempotent such that a 50 /// stop/crash of the executor at any point can be recovered from on restart. 51 /// The executor is aware of the concept of Fedimint modules and can give state 52 /// machines a different [execution context](super::state::Context) depending on 53 /// the owning module, making it very flexible. 54 #[derive(Clone, Debug)] 55 pub struct Executor { 56 inner: Arc<ExecutorInner>, 57 } 58 59 struct ExecutorInner { 60 db: Database, 61 context: Mutex<Option<ContextGen>>, 62 module_contexts: BTreeMap<ModuleInstanceId, DynContext>, 63 valid_module_ids: BTreeSet<ModuleInstanceId>, 64 notifier: Notifier, 65 shutdown_executor: Mutex<Option<oneshot::Sender<()>>>, 66 /// Any time executor should notice state machine update (e.g. because it 67 /// was created), it's must be sent through this channel for it to notice. 68 sm_update_tx: mpsc::UnboundedSender<DynState>, 69 sm_update_rx: Mutex<Option<mpsc::UnboundedReceiver<DynState>>>, 70 client_task_group: TaskGroup, 71 } 72 73 /// Builder to which module clients can be attached and used to build an 74 /// [`Executor`] supporting these. 75 #[derive(Debug, Default)] 76 pub struct ExecutorBuilder { 77 module_contexts: BTreeMap<ModuleInstanceId, DynContext>, 78 valid_module_ids: BTreeSet<ModuleInstanceId>, 79 } 80 81 impl Executor { 82 /// Creates an [`ExecutorBuilder`] 83 pub fn builder() -> ExecutorBuilder { 84 ExecutorBuilder::default() 85 } 86 87 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> { 88 self.inner.get_active_states().await 89 } 90 91 /// Adds a number of state machines to the executor atomically. They will be 92 /// driven to completion automatically in the background. 93 /// 94 /// **Attention**: do not use before background task is started! 95 // TODO: remove warning once finality is an inherent state attribute 96 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> { 97 self.inner 98 .db 99 .autocommit( 100 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())), 101 MAX_DB_ATTEMPTS, 102 ) 103 .await 104 .map_err(|e| match e { 105 AutocommitError::CommitFailed { 106 last_error, 107 attempts, 108 } => last_error.context(format!("Failed to commit after {attempts} attempts")), 109 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"), 110 })?; 111 112 // TODO: notify subscribers to state changes? 113 114 Ok(()) 115 } 116 117 /// Adds a number of state machines to the executor atomically with other DB 118 /// changes is `dbtx`. See [`Executor::add_state_machines`] for more 119 /// details. 120 /// 121 /// ## Panics 122 /// If called before background task is started using 123 /// [`Executor::start_executor`]! 124 // TODO: remove warning once finality is an inherent state attribute 125 pub async fn add_state_machines_dbtx( 126 &self, 127 dbtx: &mut DatabaseTransaction<'_>, 128 states: Vec<DynState>, 129 ) -> AddStateMachinesResult { 130 for state in states { 131 if !self 132 .inner 133 .valid_module_ids 134 .contains(&state.module_instance_id()) 135 { 136 return Err(AddStateMachinesError::Other(anyhow!("Unknown module"))); 137 } 138 139 let is_active_state = dbtx 140 .get_value(&ActiveStateKey::from_state(state.clone())) 141 .await 142 .is_some(); 143 let is_inactive_state = dbtx 144 .get_value(&InactiveStateKey::from_state(state.clone())) 145 .await 146 .is_some(); 147 148 if is_active_state || is_inactive_state { 149 return Err(AddStateMachinesError::StateAlreadyExists); 150 } 151 152 // In case of recovery functions, the module itself is not yet initialized, 153 // so we can't check if the state is terminal. However the 154 // [`Self::get_transitions_for`] function will double check and 155 // deactivate any terminal states that would slip past this check. 156 if let Some(module_context) = 157 self.inner.module_contexts.get(&state.module_instance_id()) 158 { 159 let context = { 160 let context_gen_guard = self.inner.context.lock().await; 161 let context_gen = context_gen_guard 162 .as_ref() 163 .expect("should be initialized at this point"); 164 context_gen(state.module_instance_id(), state.operation_id()) 165 }; 166 167 if state.is_terminal(module_context, &context) { 168 return Err(AddStateMachinesError::Other(anyhow!( 169 "State is already terminal, adding it to the executor doesn't make sense." 170 ))); 171 } 172 } 173 174 dbtx.insert_new_entry( 175 &ActiveStateKey::from_state(state.clone()), 176 &ActiveStateMeta::default(), 177 ) 178 .await; 179 let notify_sender = self.inner.notifier.sender(); 180 let sm_updates_tx = self.inner.sm_update_tx.clone(); 181 dbtx.on_commit(move || { 182 notify_sender.notify(state.clone()); 183 let _ = sm_updates_tx.send(state); 184 }); 185 } 186 187 Ok(()) 188 } 189 190 /// **Mostly used for testing** 191 /// 192 /// Check if state exists in the database as part of an actively running 193 /// state machine. 194 pub async fn contains_active_state<S: State>( 195 &self, 196 instance: ModuleInstanceId, 197 state: S, 198 ) -> bool { 199 let state = DynState::from_typed(instance, state); 200 self.inner 201 .get_active_states() 202 .await 203 .into_iter() 204 .any(|(s, _)| s == state) 205 } 206 207 // TODO: unify querying fns 208 /// **Mostly used for testing** 209 /// 210 /// Check if state exists in the database as inactive. If the state is 211 /// terminal it means the corresponding state machine finished its 212 /// execution. If the state is non-terminal it means the state machine was 213 /// in that state at some point but moved on since then. 214 pub async fn contains_inactive_state<S: State>( 215 &self, 216 instance: ModuleInstanceId, 217 state: S, 218 ) -> bool { 219 let state = DynState::from_typed(instance, state); 220 self.inner 221 .get_inactive_states() 222 .await 223 .into_iter() 224 .any(|(s, _)| s == state) 225 } 226 227 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta { 228 self.inner 229 .db 230 .wait_key_exists(&InactiveStateKey::from_state(state)) 231 .await 232 } 233 234 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta { 235 self.inner 236 .db 237 .wait_key_exists(&ActiveStateKey::from_state(state)) 238 .await 239 } 240 241 /// Starts the background thread that runs the state machines. This cannot 242 /// be done when building the executor since some global contexts in turn 243 /// may depend on the executor, forming a cyclic dependency. 244 /// 245 /// ## Panics 246 /// If called more than once. 247 pub async fn start_executor(&self, context_gen: ContextGen) { 248 let replaced_old_context_gen = self 249 .inner 250 .context 251 .lock() 252 .await 253 .replace(context_gen.clone()) 254 .is_some(); 255 assert!( 256 !replaced_old_context_gen, 257 "start_executor was called previously" 258 ); 259 let sm_update_rx = self 260 .inner 261 .sm_update_rx 262 .lock() 263 .await 264 .take() 265 .expect("start_executor was called previously: no sm_update_rx available"); 266 267 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>(); 268 269 let replaced_old_shutdown_sender = self 270 .inner 271 .shutdown_executor 272 .lock() 273 .await 274 .replace(shutdown_sender) 275 .is_some(); 276 assert!( 277 !replaced_old_shutdown_sender, 278 "start_executor was called previously" 279 ); 280 281 let task_runner_inner = self.inner.clone(); 282 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move { 283 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx); 284 let task_group_shutdown_rx = task_handle.make_shutdown_rx().await; 285 select! { 286 _ = task_group_shutdown_rx => { 287 debug!("Shutting down state machine executor runner due to task group shutdown signal"); 288 }, 289 shutdown_happened_sender = shutdown_receiver => { 290 match shutdown_happened_sender { 291 Ok(()) => { 292 debug!("Shutting down state machine executor runner due to explicit shutdown signal"); 293 }, 294 Err(_) => { 295 warn!("Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"); 296 } 297 } 298 }, 299 _ = executor_runner => { 300 error!("State machine executor runner exited unexpectedly!"); 301 }, 302 }; 303 }); 304 } 305 306 /// Stops the background task that runs the state machines. 307 /// 308 /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that 309 /// will be signalled when the main loop of the background task has 310 /// exited. This can be useful to block until the executor has stopped 311 /// to avoid errors due to the async runtime shutting down while the 312 /// task is still running. 313 /// 314 /// If no shutdown signal was sent it returns `None`. This can happen if 315 /// `stop_executor` is called multiple times. 316 /// 317 /// ## Panics 318 /// If called in parallel with [`start_executor`](Self::start_executor). 319 pub fn stop_executor(&self) -> Option<()> { 320 self.inner.stop_executor() 321 } 322 323 /// Returns a reference to the [`Notifier`] that can be used to subscribe to 324 /// state transitions 325 pub fn notifier(&self) -> &Notifier { 326 &self.inner.notifier 327 } 328 } 329 330 impl Drop for ExecutorInner { 331 fn drop(&mut self) { 332 self.stop_executor(); 333 } 334 } 335 336 struct TransitionForActiveState { 337 outcome: serde_json::Value, 338 state: DynState, 339 meta: ActiveStateMeta, 340 transition_fn: StateTransitionFunction<DynState>, 341 } 342 343 impl ExecutorInner { 344 async fn run( 345 &self, 346 global_context_gen: ContextGen, 347 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>, 348 ) { 349 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task"); 350 if let Err(err) = self 351 .run_state_machines_executor_inner(global_context_gen, sm_update_rx) 352 .await 353 { 354 warn!( 355 %err, 356 "An unexpected error occurred during a state transition" 357 ); 358 } 359 } 360 361 async fn get_transition_for( 362 &self, 363 state: &DynState, 364 meta: ActiveStateMeta, 365 global_context_gen: &ContextGen, 366 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> { 367 let module_instance = state.module_instance_id(); 368 let context = &self 369 .module_contexts 370 .get(&module_instance) 371 .expect("Unknown module"); 372 let transitions = state 373 .transitions( 374 context, 375 &global_context_gen(module_instance, state.operation_id()), 376 ) 377 .into_iter() 378 .map(|transition| { 379 let state = state.clone(); 380 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move { 381 let StateTransition { 382 trigger, 383 transition, 384 } = transition; 385 TransitionForActiveState { 386 outcome: trigger.await, 387 state, 388 transition_fn: transition, 389 meta, 390 } 391 }); 392 f 393 }) 394 .collect::<Vec<_>>(); 395 if transitions.is_empty() { 396 // In certain cases a terminal (no transitions) state could get here due to 397 // module bug. Inactivate it to prevent accumulation of such states. 398 // See [`Self::add_state_machines_dbtx`]. 399 warn!(module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."); 400 self.db 401 .autocommit::<_, _, anyhow::Error>( 402 |dbtx, _| { 403 Box::pin(async { 404 let k = InactiveStateKey::from_state(state.clone()); 405 let v = ActiveStateMeta::default().into_inactive(); 406 dbtx.remove_entry(&ActiveStateKey::from_state(state.clone())) 407 .await; 408 dbtx.insert_entry(&k, &v).await; 409 Ok(()) 410 }) 411 }, 412 None, 413 ) 414 .await 415 .expect("Autocommit here can't fail"); 416 } 417 418 transitions 419 } 420 421 async fn run_state_machines_executor_inner( 422 &self, 423 global_context_gen: ContextGen, 424 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>, 425 ) -> anyhow::Result<()> { 426 let active_states = self.get_active_states().await; 427 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states); 428 for (state, _meta) in active_states { 429 self.sm_update_tx 430 .send(state) 431 .expect("Must be able to send state machine to own opened channel"); 432 } 433 434 /// All futures in the executor resolve to this type, so the handling 435 /// code can tell them apart. 436 enum ExecutorLoopEvent { 437 /// Notification about `DynState` arrived and should be handled, 438 /// usually added to the list of pending futures. 439 New { state: DynState }, 440 /// One of trigger futures of a state machine finished and 441 /// returned transition function to run 442 Triggered(TransitionForActiveState), 443 /// Transition function and all the accounting around it are done 444 Completed { 445 state: DynState, 446 outcome: ActiveOrInactiveState, 447 }, 448 /// New job receiver disconnected, that can only mean termination 449 Disconnected, 450 } 451 452 // Keeps track of things already running, so we can deduplicate, just 453 // in case. 454 let mut currently_running_sms = HashSet::<DynState>::new(); 455 // All things happening in parallel go into here 456 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> = 457 FuturesUnordered::new(); 458 459 loop { 460 let event = tokio::select! { 461 new = sm_update_rx.recv() => { 462 if let Some(new) = new { 463 ExecutorLoopEvent::New { 464 state: new, 465 } 466 } else { 467 ExecutorLoopEvent::Disconnected 468 } 469 }, 470 471 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"), 472 }; 473 474 // main reactor loop: wait for next thing that completed, react (possibly adding 475 // more things to `futures`) 476 match event { 477 ExecutorLoopEvent::New { state } => { 478 if currently_running_sms.contains(&state) { 479 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Received a state machine that is already running. Ignoring"); 480 continue; 481 } 482 let Some(meta) = self.get_active_state(&state).await else { 483 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Couldn't look up received state machine. Ignoring."); 484 continue; 485 }; 486 487 let transitions = self 488 .get_transition_for(&state, meta, &global_context_gen) 489 .await; 490 if transitions.is_empty() { 491 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Received an active state that doesn't produce any transitions. Ignoring."); 492 continue; 493 } 494 495 let transitions_num = transitions.len(); 496 currently_running_sms.insert(state.clone()); 497 futures.push(Box::pin(async move { 498 let (first_completed_result, _index, _unused_transitions) = 499 select_all(transitions).await; 500 ExecutorLoopEvent::Triggered(first_completed_result) 501 })); 502 503 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), total = futures.len(), transitions_num, "New active state machine."); 504 } 505 ExecutorLoopEvent::Triggered(TransitionForActiveState { 506 outcome, 507 state, 508 meta, 509 transition_fn, 510 }) => { 511 debug!( 512 target: LOG_CLIENT_REACTOR, 513 operation_id = %state.operation_id(), 514 "Triggered state transition", 515 ); 516 let span = tracing::debug_span!( 517 target: LOG_CLIENT_REACTOR, 518 "sm_transition", 519 operation_id = %state.operation_id() 520 ); 521 // Perform the transition as another future, so transitions can happen in 522 // parallel. 523 // Database write conflicts might be happening quite often here, 524 // but transaction functions are supposed to be idempotent anyway, 525 // so it seems like a good stress-test in the worst case. 526 futures.push({ 527 let sm_update_tx = self.sm_update_tx.clone(); 528 let db = self.db.clone(); 529 let notifier = self.notifier.clone(); 530 let module_contexts = self.module_contexts.clone(); 531 let global_context_gen = global_context_gen.clone(); 532 Box::pin( 533 async move { 534 debug!( 535 target: LOG_CLIENT_REACTOR, 536 "Executing state transition", 537 ); 538 trace!( 539 target: LOG_CLIENT_REACTOR, 540 ?state, 541 outcome = ?AbbreviateJson(&outcome), 542 "Executing state transition (details)", 543 ); 544 545 let module_contexts = &module_contexts; 546 let global_context_gen = &global_context_gen; 547 548 let outcome = db 549 .autocommit::<'_, '_, _, _, Infallible>( 550 |dbtx, _| { 551 let state = state.clone(); 552 let transition_fn = transition_fn.clone(); 553 let transition_outcome = outcome.clone(); 554 Box::pin(async move { 555 let new_state = transition_fn( 556 &mut ClientSMDatabaseTransaction::new( 557 &mut dbtx.to_ref(), 558 state.module_instance_id(), 559 ), 560 transition_outcome, 561 state.clone(), 562 ) 563 .await; 564 dbtx.remove_entry(&ActiveStateKey::from_state( 565 state.clone(), 566 )) 567 .await; 568 dbtx.insert_entry( 569 &InactiveStateKey::from_state(state.clone()), 570 &meta.into_inactive(), 571 ) 572 .await; 573 574 let context = &module_contexts 575 .get(&state.module_instance_id()) 576 .expect("Unknown module"); 577 578 let global_context = global_context_gen( 579 state.module_instance_id(), 580 state.operation_id(), 581 ); 582 if new_state.is_terminal(context, &global_context) { 583 let k = InactiveStateKey::from_state( 584 new_state.clone(), 585 ); 586 let v = ActiveStateMeta::default().into_inactive(); 587 dbtx.insert_entry(&k, &v).await; 588 Ok(ActiveOrInactiveState::Inactive { 589 dyn_state: new_state, 590 }) 591 } else { 592 let k = ActiveStateKey::from_state( 593 new_state.clone(), 594 ); 595 let v = ActiveStateMeta::default(); 596 dbtx.insert_entry(&k, &v).await; 597 Ok(ActiveOrInactiveState::Active { 598 dyn_state: new_state, 599 meta: v, 600 }) 601 } 602 }) 603 }, 604 None, 605 ) 606 .await 607 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors"); 608 609 debug!( 610 target: LOG_CLIENT_REACTOR, 611 terminal = !outcome.is_active(), 612 ?outcome, 613 "State transition complete", 614 ); 615 616 match &outcome { 617 ActiveOrInactiveState::Active { dyn_state, meta: _ } => { 618 sm_update_tx 619 .send(dyn_state.clone()) 620 .expect("can't fail: we are the receiving end"); 621 notifier.notify(dyn_state.clone()); 622 } 623 ActiveOrInactiveState::Inactive { dyn_state } => { 624 notifier.notify(dyn_state.clone()); 625 } 626 } 627 ExecutorLoopEvent::Completed { state, outcome } 628 } 629 .instrument(span), 630 ) 631 }); 632 } 633 ExecutorLoopEvent::Completed { state, outcome } => { 634 assert!( 635 currently_running_sms.remove(&state), 636 "State must have been recorded" 637 ); 638 debug!( 639 target: LOG_CLIENT_REACTOR, 640 operation_id = %state.operation_id(), 641 outcome_active = outcome.is_active(), 642 total = futures.len(), 643 "State transition complete" 644 ); 645 trace!( 646 target: LOG_CLIENT_REACTOR, 647 ?outcome, 648 operation_id = %state.operation_id(), total = futures.len(), 649 "State transition complete" 650 ); 651 } 652 ExecutorLoopEvent::Disconnected => { 653 break; 654 } 655 } 656 } 657 658 info!(target: LOG_CLIENT_REACTOR, "Terminated."); 659 Ok(()) 660 } 661 662 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> { 663 self.db 664 .begin_transaction() 665 .await 666 .find_by_prefix(&ActiveStateKeyPrefix) 667 .await 668 // ignore states from modules that are not initialized yet 669 .filter(|(state, _)| { 670 future::ready( 671 self.module_contexts 672 .contains_key(&state.state.module_instance_id()), 673 ) 674 }) 675 .map(|(state, meta)| (state.state, meta)) 676 .collect::<Vec<_>>() 677 .await 678 } 679 680 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> { 681 // ignore states from modules that are not initialized yet 682 if !self 683 .module_contexts 684 .contains_key(&state.module_instance_id()) 685 { 686 return None; 687 } 688 self.db 689 .begin_transaction() 690 .await 691 .get_value(&ActiveStateKey::from_state(state.clone())) 692 .await 693 } 694 695 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> { 696 self.db 697 .begin_transaction() 698 .await 699 .find_by_prefix(&InactiveStateKeyPrefix) 700 .await 701 // ignore states from modules that are not initialized yet 702 .filter(|(state, _)| { 703 future::ready( 704 self.module_contexts 705 .contains_key(&state.state.module_instance_id()), 706 ) 707 }) 708 .map(|(state, meta)| (state.state, meta)) 709 .collect::<Vec<_>>() 710 .await 711 } 712 } 713 714 impl ExecutorInner { 715 /// See [`Executor::stop_executor`]. 716 fn stop_executor(&self) -> Option<()> { 717 let Some(shutdown_sender) = self 718 .shutdown_executor 719 .try_lock() 720 .expect("Only locked during startup, no collisions should be possible") 721 .take() 722 else { 723 debug!("Executor already stopped, ignoring stop request"); 724 return None; 725 }; 726 727 if shutdown_sender.send(()).is_err() { 728 warn!("Failed to send shutdown signal to executor, already dead?"); 729 } 730 731 Some(()) 732 } 733 } 734 735 impl Debug for ExecutorInner { 736 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 737 writeln!(f, "ExecutorInner {{}}") 738 } 739 } 740 741 impl ExecutorBuilder { 742 /// Allow executor being built to run state machines associated with the 743 /// supplied module 744 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C) 745 where 746 C: IntoDynInstance<DynType = DynContext>, 747 { 748 self.with_module_dyn(context.into_dyn(instance_id)); 749 } 750 751 /// Allow executor being built to run state machines associated with the 752 /// supplied module 753 pub fn with_module_dyn(&mut self, context: DynContext) { 754 self.valid_module_ids.insert(context.module_instance_id()); 755 756 if self 757 .module_contexts 758 .insert(context.module_instance_id(), context) 759 .is_some() 760 { 761 panic!("Tried to add two modules with the same instance id!"); 762 } 763 } 764 765 /// Allow executor to build state machines associated with the module id, 766 /// for which the module itself might not be available yet (otherwise it 767 /// would be registered with `[Self::with_module_dyn]`). 768 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) { 769 self.valid_module_ids.insert(module_id); 770 } 771 772 /// Build [`Executor`] and spawn background task in `tasks` executing active 773 /// state machines. The supplied database `db` must support isolation, so 774 /// cannot be an isolated DB instance itself. 775 pub async fn build( 776 self, 777 db: Database, 778 notifier: Notifier, 779 client_task_group: TaskGroup, 780 ) -> Executor { 781 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel(); 782 783 let inner = Arc::new(ExecutorInner { 784 db, 785 context: Mutex::new(None), 786 module_contexts: self.module_contexts, 787 valid_module_ids: self.valid_module_ids, 788 notifier, 789 shutdown_executor: Default::default(), 790 sm_update_tx, 791 sm_update_rx: Mutex::new(Some(sm_update_rx)), 792 client_task_group, 793 }); 794 795 debug!( 796 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(), 797 "Initialized state machine executor with module instances" 798 ); 799 Executor { inner } 800 } 801 } 802 803 /// A state that is able to make progress eventually 804 #[derive(Debug)] 805 pub struct ActiveStateKey { 806 // TODO: remove redundant operation id from state trait 807 pub operation_id: OperationId, 808 // TODO: state being a key... seems ... risky? 809 pub state: DynState, 810 } 811 812 impl ActiveStateKey { 813 pub fn from_state(state: DynState) -> ActiveStateKey { 814 ActiveStateKey { 815 operation_id: state.operation_id(), 816 state, 817 } 818 } 819 } 820 821 impl Encodable for ActiveStateKey { 822 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 823 let mut len = 0; 824 len += self.operation_id.consensus_encode(writer)?; 825 len += self.state.consensus_encode(writer)?; 826 Ok(len) 827 } 828 } 829 830 impl Decodable for ActiveStateKey { 831 fn consensus_decode<R: Read>( 832 reader: &mut R, 833 modules: &ModuleDecoderRegistry, 834 ) -> Result<Self, DecodeError> { 835 let operation_id = OperationId::consensus_decode(reader, modules)?; 836 let state = DynState::consensus_decode(reader, modules)?; 837 838 Ok(ActiveStateKey { 839 operation_id, 840 state, 841 }) 842 } 843 } 844 845 #[derive(Debug)] 846 pub struct ActiveStateKeyBytes { 847 pub operation_id: OperationId, 848 pub module_instance_id: ModuleInstanceId, 849 pub state: Vec<u8>, 850 } 851 852 impl Encodable for ActiveStateKeyBytes { 853 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> { 854 let mut len = 0; 855 len += self.operation_id.consensus_encode(writer)?; 856 len += writer.write(self.state.as_slice())?; 857 Ok(len) 858 } 859 } 860 861 impl Decodable for ActiveStateKeyBytes { 862 fn consensus_decode<R: std::io::Read>( 863 reader: &mut R, 864 modules: &ModuleDecoderRegistry, 865 ) -> Result<Self, DecodeError> { 866 let operation_id = OperationId::consensus_decode(reader, modules)?; 867 let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?; 868 let mut bytes = Vec::new(); 869 reader 870 .read_to_end(&mut bytes) 871 .map_err(DecodeError::from_err)?; 872 873 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id); 874 instance_bytes.append(&mut bytes); 875 876 Ok(ActiveStateKeyBytes { 877 operation_id, 878 module_instance_id, 879 state: instance_bytes, 880 }) 881 } 882 } 883 884 #[derive(Debug)] 885 pub(crate) struct ActiveOperationStateKeyPrefix { 886 pub operation_id: OperationId, 887 } 888 889 impl Encodable for ActiveOperationStateKeyPrefix { 890 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 891 self.operation_id.consensus_encode(writer) 892 } 893 } 894 895 impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix { 896 type Record = ActiveStateKey; 897 } 898 899 #[derive(Debug)] 900 pub(crate) struct ActiveModuleOperationStateKeyPrefix { 901 pub operation_id: OperationId, 902 pub module_instance: ModuleInstanceId, 903 } 904 905 impl Encodable for ActiveModuleOperationStateKeyPrefix { 906 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 907 let mut len = 0; 908 len += self.operation_id.consensus_encode(writer)?; 909 len += self.module_instance.consensus_encode(writer)?; 910 Ok(len) 911 } 912 } 913 914 impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix { 915 type Record = ActiveStateKey; 916 } 917 918 #[derive(Debug)] 919 pub struct ActiveStateKeyPrefix; 920 921 impl Encodable for ActiveStateKeyPrefix { 922 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> { 923 Ok(0) 924 } 925 } 926 927 #[derive(Debug, Copy, Clone, Encodable, Decodable)] 928 pub struct ActiveStateMeta { 929 pub created_at: SystemTime, 930 } 931 932 impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey { 933 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8; 934 const NOTIFY_ON_MODIFY: bool = true; 935 type Key = Self; 936 type Value = ActiveStateMeta; 937 } 938 939 impl DatabaseKeyWithNotify for ActiveStateKey {} 940 941 impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix { 942 type Record = ActiveStateKey; 943 } 944 945 #[derive(Debug, Encodable, Decodable)] 946 pub(crate) struct ActiveStateKeyPrefixBytes; 947 948 impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes { 949 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8; 950 const NOTIFY_ON_MODIFY: bool = false; 951 type Key = Self; 952 type Value = ActiveStateMeta; 953 } 954 955 impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes { 956 type Record = ActiveStateKeyBytes; 957 } 958 959 impl Default for ActiveStateMeta { 960 fn default() -> Self { 961 Self { 962 created_at: fedimint_core::time::now(), 963 } 964 } 965 } 966 967 impl ActiveStateMeta { 968 fn into_inactive(self) -> InactiveStateMeta { 969 InactiveStateMeta { 970 created_at: self.created_at, 971 exited_at: fedimint_core::time::now(), 972 } 973 } 974 } 975 976 /// A past or final state of a state machine 977 #[derive(Debug, Clone)] 978 pub struct InactiveStateKey { 979 // TODO: remove redundant operation id from state trait 980 pub operation_id: OperationId, 981 pub state: DynState, 982 } 983 984 impl InactiveStateKey { 985 pub fn from_state(state: DynState) -> InactiveStateKey { 986 InactiveStateKey { 987 operation_id: state.operation_id(), 988 state, 989 } 990 } 991 } 992 993 impl Encodable for InactiveStateKey { 994 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 995 let mut len = 0; 996 len += self.operation_id.consensus_encode(writer)?; 997 len += self.state.consensus_encode(writer)?; 998 Ok(len) 999 } 1000 } 1001 1002 impl Decodable for InactiveStateKey { 1003 fn consensus_decode<R: Read>( 1004 reader: &mut R, 1005 modules: &ModuleDecoderRegistry, 1006 ) -> Result<Self, DecodeError> { 1007 let operation_id = OperationId::consensus_decode(reader, modules)?; 1008 let state = DynState::consensus_decode(reader, modules)?; 1009 1010 Ok(InactiveStateKey { 1011 operation_id, 1012 state, 1013 }) 1014 } 1015 } 1016 1017 #[derive(Debug)] 1018 pub struct InactiveStateKeyBytes { 1019 pub operation_id: OperationId, 1020 pub module_instance_id: ModuleInstanceId, 1021 pub state: Vec<u8>, 1022 } 1023 1024 impl Encodable for InactiveStateKeyBytes { 1025 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> { 1026 let mut len = 0; 1027 len += self.operation_id.consensus_encode(writer)?; 1028 len += writer.write(self.state.as_slice())?; 1029 Ok(len) 1030 } 1031 } 1032 1033 impl Decodable for InactiveStateKeyBytes { 1034 fn consensus_decode<R: std::io::Read>( 1035 reader: &mut R, 1036 modules: &ModuleDecoderRegistry, 1037 ) -> Result<Self, DecodeError> { 1038 let operation_id = OperationId::consensus_decode(reader, modules)?; 1039 let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?; 1040 let mut bytes = Vec::new(); 1041 reader 1042 .read_to_end(&mut bytes) 1043 .map_err(DecodeError::from_err)?; 1044 1045 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id); 1046 instance_bytes.append(&mut bytes); 1047 1048 Ok(InactiveStateKeyBytes { 1049 operation_id, 1050 module_instance_id, 1051 state: instance_bytes, 1052 }) 1053 } 1054 } 1055 1056 #[derive(Debug)] 1057 pub(crate) struct InactiveOperationStateKeyPrefix { 1058 pub operation_id: OperationId, 1059 } 1060 1061 impl Encodable for InactiveOperationStateKeyPrefix { 1062 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 1063 self.operation_id.consensus_encode(writer) 1064 } 1065 } 1066 1067 impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix { 1068 type Record = InactiveStateKey; 1069 } 1070 1071 #[derive(Debug)] 1072 pub(crate) struct InactiveModuleOperationStateKeyPrefix { 1073 pub operation_id: OperationId, 1074 pub module_instance: ModuleInstanceId, 1075 } 1076 1077 impl Encodable for InactiveModuleOperationStateKeyPrefix { 1078 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> { 1079 let mut len = 0; 1080 len += self.operation_id.consensus_encode(writer)?; 1081 len += self.module_instance.consensus_encode(writer)?; 1082 Ok(len) 1083 } 1084 } 1085 1086 impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix { 1087 type Record = InactiveStateKey; 1088 } 1089 1090 #[derive(Debug, Clone)] 1091 pub struct InactiveStateKeyPrefix; 1092 1093 impl Encodable for InactiveStateKeyPrefix { 1094 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> { 1095 Ok(0) 1096 } 1097 } 1098 1099 #[derive(Debug, Encodable, Decodable)] 1100 pub(crate) struct InactiveStateKeyPrefixBytes; 1101 1102 impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes { 1103 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8; 1104 const NOTIFY_ON_MODIFY: bool = false; 1105 type Key = Self; 1106 type Value = InactiveStateMeta; 1107 } 1108 1109 impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes { 1110 type Record = InactiveStateKeyBytes; 1111 } 1112 1113 #[derive(Debug, Copy, Clone, Decodable, Encodable)] 1114 pub struct InactiveStateMeta { 1115 pub created_at: SystemTime, 1116 pub exited_at: SystemTime, 1117 } 1118 1119 impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey { 1120 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8; 1121 const NOTIFY_ON_MODIFY: bool = true; 1122 type Key = Self; 1123 type Value = InactiveStateMeta; 1124 } 1125 1126 impl DatabaseKeyWithNotify for InactiveStateKey {} 1127 1128 impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix { 1129 type Record = InactiveStateKey; 1130 } 1131 1132 #[derive(Debug)] 1133 enum ActiveOrInactiveState { 1134 Active { 1135 dyn_state: DynState, 1136 #[allow(dead_code)] // currently not printed anywhere, but useful in the db 1137 meta: ActiveStateMeta, 1138 }, 1139 Inactive { 1140 dyn_state: DynState, 1141 }, 1142 } 1143 1144 impl ActiveOrInactiveState { 1145 fn is_active(&self) -> bool { 1146 match self { 1147 ActiveOrInactiveState::Active { .. } => true, 1148 ActiveOrInactiveState::Inactive { .. } => false, 1149 } 1150 } 1151 } 1152 1153 #[cfg(test)] 1154 mod tests { 1155 use std::fmt::Debug; 1156 use std::sync::Arc; 1157 use std::time::Duration; 1158 1159 use fedimint_core::core::{ 1160 Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId, 1161 }; 1162 use fedimint_core::db::mem_impl::MemDatabase; 1163 use fedimint_core::db::Database; 1164 use fedimint_core::encoding::{Decodable, Encodable}; 1165 use fedimint_core::module::registry::ModuleDecoderRegistry; 1166 use fedimint_core::runtime; 1167 use fedimint_core::task::TaskGroup; 1168 use tokio::sync::broadcast::Sender; 1169 use tracing::{info, trace}; 1170 1171 use crate::sm::state::{Context, DynContext, DynState}; 1172 use crate::sm::{Executor, Notifier, State, StateTransition}; 1173 use crate::DynGlobalClientContext; 1174 1175 #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)] 1176 enum MockStateMachine { 1177 Start, 1178 ReceivedNonNull(u64), 1179 Final, 1180 } 1181 1182 impl State for MockStateMachine { 1183 type ModuleContext = MockContext; 1184 1185 fn transitions( 1186 &self, 1187 context: &Self::ModuleContext, 1188 _global_context: &DynGlobalClientContext, 1189 ) -> Vec<StateTransition<Self>> { 1190 match self { 1191 MockStateMachine::Start => { 1192 let mut receiver1 = context.broadcast.subscribe(); 1193 let mut receiver2 = context.broadcast.subscribe(); 1194 vec![ 1195 StateTransition::new( 1196 async move { 1197 loop { 1198 let val = receiver1.recv().await.unwrap(); 1199 if val == 0 { 1200 trace!("State transition Start->Final"); 1201 break; 1202 } 1203 } 1204 }, 1205 |_dbtx, (), _state| Box::pin(async move { MockStateMachine::Final }), 1206 ), 1207 StateTransition::new( 1208 async move { 1209 loop { 1210 let val = receiver2.recv().await.unwrap(); 1211 if val != 0 { 1212 trace!("State transition Start->ReceivedNonNull"); 1213 break val; 1214 } 1215 } 1216 }, 1217 |_dbtx, value, _state| { 1218 Box::pin(async move { MockStateMachine::ReceivedNonNull(value) }) 1219 }, 1220 ), 1221 ] 1222 } 1223 MockStateMachine::ReceivedNonNull(prev_val) => { 1224 let prev_val = *prev_val; 1225 let mut receiver = context.broadcast.subscribe(); 1226 vec![StateTransition::new( 1227 async move { 1228 loop { 1229 let val = receiver.recv().await.unwrap(); 1230 if val == prev_val { 1231 trace!("State transition ReceivedNonNull->Final"); 1232 break; 1233 } 1234 } 1235 }, 1236 |_dbtx, (), _state| Box::pin(async move { MockStateMachine::Final }), 1237 )] 1238 } 1239 MockStateMachine::Final => { 1240 vec![] 1241 } 1242 } 1243 } 1244 1245 fn operation_id(&self) -> OperationId { 1246 OperationId([0u8; 32]) 1247 } 1248 } 1249 1250 impl IntoDynInstance for MockStateMachine { 1251 type DynType = DynState; 1252 1253 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType { 1254 DynState::from_typed(instance_id, self) 1255 } 1256 } 1257 1258 #[derive(Debug, Clone)] 1259 struct MockContext { 1260 broadcast: tokio::sync::broadcast::Sender<u64>, 1261 } 1262 1263 impl IntoDynInstance for MockContext { 1264 type DynType = DynContext; 1265 1266 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType { 1267 DynContext::from_typed(instance_id, self) 1268 } 1269 } 1270 1271 impl Context for MockContext {} 1272 1273 async fn get_executor() -> (Executor, Sender<u64>, Database) { 1274 let (broadcast, _) = tokio::sync::broadcast::channel(10); 1275 1276 let mut decoder_builder = Decoder::builder(); 1277 decoder_builder.with_decodable_type::<MockStateMachine>(); 1278 let decoder = decoder_builder.build(); 1279 1280 let decoders = 1281 ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]); 1282 let db = Database::new(MemDatabase::new(), decoders); 1283 1284 let mut executor_builder = Executor::builder(); 1285 executor_builder.with_module( 1286 42, 1287 MockContext { 1288 broadcast: broadcast.clone(), 1289 }, 1290 ); 1291 let executor = executor_builder 1292 .build(db.clone(), Notifier::new(db.clone()), TaskGroup::new()) 1293 .await; 1294 executor 1295 .start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake())) 1296 .await; 1297 1298 info!("Initialized test executor"); 1299 (executor, broadcast, db) 1300 } 1301 1302 #[tokio::test] 1303 #[tracing_test::traced_test] 1304 async fn test_executor() { 1305 const MOCK_INSTANCE_1: ModuleInstanceId = 42; 1306 const MOCK_INSTANCE_2: ModuleInstanceId = 21; 1307 1308 let (executor, sender, _db) = get_executor().await; 1309 executor 1310 .add_state_machines(vec![DynState::from_typed( 1311 MOCK_INSTANCE_1, 1312 MockStateMachine::Start, 1313 )]) 1314 .await 1315 .unwrap(); 1316 1317 assert!( 1318 executor 1319 .add_state_machines(vec![DynState::from_typed( 1320 MOCK_INSTANCE_1, 1321 MockStateMachine::Start 1322 )]) 1323 .await 1324 .is_err(), 1325 "Running the same state machine a second time should fail" 1326 ); 1327 1328 assert!( 1329 executor 1330 .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start) 1331 .await, 1332 "State was written to DB and waits for broadcast" 1333 ); 1334 assert!( 1335 !executor 1336 .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start) 1337 .await, 1338 "Instance separation works" 1339 ); 1340 1341 // TODO build await fn+timeout or allow manual driving of executor 1342 runtime::sleep(Duration::from_secs(1)).await; 1343 sender.send(0).unwrap(); 1344 runtime::sleep(Duration::from_secs(2)).await; 1345 1346 assert!( 1347 executor 1348 .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final) 1349 .await, 1350 "State was written to DB and waits for broadcast" 1351 ); 1352 } 1353 }