/ fedimint-client / src / sm / executor.rs
executor.rs
   1  use std::collections::{BTreeMap, BTreeSet, HashSet};
   2  use std::convert::Infallible;
   3  use std::fmt::{Debug, Formatter};
   4  use std::io::{Error, Read, Write};
   5  use std::sync::Arc;
   6  use std::time::SystemTime;
   7  
   8  use anyhow::anyhow;
   9  use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
  10  use fedimint_core::db::{
  11      AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
  12      IDatabaseTransactionOpsCoreTyped,
  13  };
  14  use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
  15  use fedimint_core::fmt_utils::AbbreviateJson;
  16  use fedimint_core::maybe_add_send_sync;
  17  use fedimint_core::module::registry::ModuleDecoderRegistry;
  18  use fedimint_core::task::TaskGroup;
  19  use fedimint_core::util::BoxFuture;
  20  use fedimint_logging::LOG_CLIENT_REACTOR;
  21  use futures::future::{self, select_all};
  22  use futures::stream::{FuturesUnordered, StreamExt};
  23  use tokio::select;
  24  use tokio::sync::{mpsc, oneshot, Mutex};
  25  use tracing::{debug, error, info, trace, warn, Instrument};
  26  
  27  use super::state::StateTransitionFunction;
  28  use crate::sm::notifier::Notifier;
  29  use crate::sm::state::{DynContext, DynState};
  30  use crate::sm::{ClientSMDatabaseTransaction, State, StateTransition};
  31  use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
  32  
  33  /// After how many attempts a DB transaction is aborted with an error
  34  const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
  35  
  36  pub type ContextGen =
  37      Arc<maybe_add_send_sync!(dyn Fn(ModuleInstanceId, OperationId) -> DynGlobalClientContext)>;
  38  
  39  /// Prefixes for executor DB entries
  40  enum ExecutorDbPrefixes {
  41      /// See [`ActiveStateKey`]
  42      ActiveStates = 0xa1,
  43      /// See [`InactiveStateKey`]
  44      InactiveStates = 0xa2,
  45  }
  46  
  47  /// Executor that drives forward state machines under its management.
  48  ///
  49  /// Each state transition is atomic and supposed to be idempotent such that a
  50  /// stop/crash of the executor at any point can be recovered from on restart.
  51  /// The executor is aware of the concept of Fedimint modules and can give state
  52  /// machines a different [execution context](super::state::Context) depending on
  53  /// the owning module, making it very flexible.
  54  #[derive(Clone, Debug)]
  55  pub struct Executor {
  56      inner: Arc<ExecutorInner>,
  57  }
  58  
  59  struct ExecutorInner {
  60      db: Database,
  61      context: Mutex<Option<ContextGen>>,
  62      module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
  63      valid_module_ids: BTreeSet<ModuleInstanceId>,
  64      notifier: Notifier,
  65      shutdown_executor: Mutex<Option<oneshot::Sender<()>>>,
  66      /// Any time executor should notice state machine update (e.g. because it
  67      /// was created), it's must be sent through this channel for it to notice.
  68      sm_update_tx: mpsc::UnboundedSender<DynState>,
  69      sm_update_rx: Mutex<Option<mpsc::UnboundedReceiver<DynState>>>,
  70      client_task_group: TaskGroup,
  71  }
  72  
  73  /// Builder to which module clients can be attached and used to build an
  74  /// [`Executor`] supporting these.
  75  #[derive(Debug, Default)]
  76  pub struct ExecutorBuilder {
  77      module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
  78      valid_module_ids: BTreeSet<ModuleInstanceId>,
  79  }
  80  
  81  impl Executor {
  82      /// Creates an [`ExecutorBuilder`]
  83      pub fn builder() -> ExecutorBuilder {
  84          ExecutorBuilder::default()
  85      }
  86  
  87      pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
  88          self.inner.get_active_states().await
  89      }
  90  
  91      /// Adds a number of state machines to the executor atomically. They will be
  92      /// driven to completion automatically in the background.
  93      ///
  94      /// **Attention**: do not use before background task is started!
  95      // TODO: remove warning once finality is an inherent state attribute
  96      pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
  97          self.inner
  98              .db
  99              .autocommit(
 100                  |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
 101                  MAX_DB_ATTEMPTS,
 102              )
 103              .await
 104              .map_err(|e| match e {
 105                  AutocommitError::CommitFailed {
 106                      last_error,
 107                      attempts,
 108                  } => last_error.context(format!("Failed to commit after {attempts} attempts")),
 109                  AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
 110              })?;
 111  
 112          // TODO: notify subscribers to state changes?
 113  
 114          Ok(())
 115      }
 116  
 117      /// Adds a number of state machines to the executor atomically with other DB
 118      /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
 119      /// details.
 120      ///
 121      /// ## Panics
 122      /// If called before background task is started using
 123      /// [`Executor::start_executor`]!
 124      // TODO: remove warning once finality is an inherent state attribute
 125      pub async fn add_state_machines_dbtx(
 126          &self,
 127          dbtx: &mut DatabaseTransaction<'_>,
 128          states: Vec<DynState>,
 129      ) -> AddStateMachinesResult {
 130          for state in states {
 131              if !self
 132                  .inner
 133                  .valid_module_ids
 134                  .contains(&state.module_instance_id())
 135              {
 136                  return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
 137              }
 138  
 139              let is_active_state = dbtx
 140                  .get_value(&ActiveStateKey::from_state(state.clone()))
 141                  .await
 142                  .is_some();
 143              let is_inactive_state = dbtx
 144                  .get_value(&InactiveStateKey::from_state(state.clone()))
 145                  .await
 146                  .is_some();
 147  
 148              if is_active_state || is_inactive_state {
 149                  return Err(AddStateMachinesError::StateAlreadyExists);
 150              }
 151  
 152              // In case of recovery functions, the module itself is not yet initialized,
 153              // so we can't check if the state is terminal. However the
 154              // [`Self::get_transitions_for`] function will double check and
 155              // deactivate any terminal states that would slip past this check.
 156              if let Some(module_context) =
 157                  self.inner.module_contexts.get(&state.module_instance_id())
 158              {
 159                  let context = {
 160                      let context_gen_guard = self.inner.context.lock().await;
 161                      let context_gen = context_gen_guard
 162                          .as_ref()
 163                          .expect("should be initialized at this point");
 164                      context_gen(state.module_instance_id(), state.operation_id())
 165                  };
 166  
 167                  if state.is_terminal(module_context, &context) {
 168                      return Err(AddStateMachinesError::Other(anyhow!(
 169                          "State is already terminal, adding it to the executor doesn't make sense."
 170                      )));
 171                  }
 172              }
 173  
 174              dbtx.insert_new_entry(
 175                  &ActiveStateKey::from_state(state.clone()),
 176                  &ActiveStateMeta::default(),
 177              )
 178              .await;
 179              let notify_sender = self.inner.notifier.sender();
 180              let sm_updates_tx = self.inner.sm_update_tx.clone();
 181              dbtx.on_commit(move || {
 182                  notify_sender.notify(state.clone());
 183                  let _ = sm_updates_tx.send(state);
 184              });
 185          }
 186  
 187          Ok(())
 188      }
 189  
 190      /// **Mostly used for testing**
 191      ///
 192      /// Check if state exists in the database as part of an actively running
 193      /// state machine.
 194      pub async fn contains_active_state<S: State>(
 195          &self,
 196          instance: ModuleInstanceId,
 197          state: S,
 198      ) -> bool {
 199          let state = DynState::from_typed(instance, state);
 200          self.inner
 201              .get_active_states()
 202              .await
 203              .into_iter()
 204              .any(|(s, _)| s == state)
 205      }
 206  
 207      // TODO: unify querying fns
 208      /// **Mostly used for testing**
 209      ///
 210      /// Check if state exists in the database as inactive. If the state is
 211      /// terminal it means the corresponding state machine finished its
 212      /// execution. If the state is non-terminal it means the state machine was
 213      /// in that state at some point but moved on since then.
 214      pub async fn contains_inactive_state<S: State>(
 215          &self,
 216          instance: ModuleInstanceId,
 217          state: S,
 218      ) -> bool {
 219          let state = DynState::from_typed(instance, state);
 220          self.inner
 221              .get_inactive_states()
 222              .await
 223              .into_iter()
 224              .any(|(s, _)| s == state)
 225      }
 226  
 227      pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
 228          self.inner
 229              .db
 230              .wait_key_exists(&InactiveStateKey::from_state(state))
 231              .await
 232      }
 233  
 234      pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
 235          self.inner
 236              .db
 237              .wait_key_exists(&ActiveStateKey::from_state(state))
 238              .await
 239      }
 240  
 241      /// Starts the background thread that runs the state machines. This cannot
 242      /// be done when building the executor since some global contexts in turn
 243      /// may depend on the executor, forming a cyclic dependency.
 244      ///
 245      /// ## Panics
 246      /// If called more than once.
 247      pub async fn start_executor(&self, context_gen: ContextGen) {
 248          let replaced_old_context_gen = self
 249              .inner
 250              .context
 251              .lock()
 252              .await
 253              .replace(context_gen.clone())
 254              .is_some();
 255          assert!(
 256              !replaced_old_context_gen,
 257              "start_executor was called previously"
 258          );
 259          let sm_update_rx = self
 260              .inner
 261              .sm_update_rx
 262              .lock()
 263              .await
 264              .take()
 265              .expect("start_executor was called previously: no sm_update_rx available");
 266  
 267          let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
 268  
 269          let replaced_old_shutdown_sender = self
 270              .inner
 271              .shutdown_executor
 272              .lock()
 273              .await
 274              .replace(shutdown_sender)
 275              .is_some();
 276          assert!(
 277              !replaced_old_shutdown_sender,
 278              "start_executor was called previously"
 279          );
 280  
 281          let task_runner_inner = self.inner.clone();
 282          let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
 283              let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
 284              let task_group_shutdown_rx = task_handle.make_shutdown_rx().await;
 285              select! {
 286                  _ = task_group_shutdown_rx => {
 287                      debug!("Shutting down state machine executor runner due to task group shutdown signal");
 288                  },
 289                  shutdown_happened_sender = shutdown_receiver => {
 290                      match shutdown_happened_sender {
 291                          Ok(()) => {
 292                              debug!("Shutting down state machine executor runner due to explicit shutdown signal");
 293                          },
 294                          Err(_) => {
 295                              warn!("Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)");
 296                          }
 297                      }
 298                  },
 299                  _ = executor_runner => {
 300                      error!("State machine executor runner exited unexpectedly!");
 301                  },
 302              };
 303          });
 304      }
 305  
 306      /// Stops the background task that runs the state machines.
 307      ///
 308      /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
 309      /// will be signalled when the main loop of the background task has
 310      /// exited. This can be useful to block until the executor has stopped
 311      /// to avoid errors due to the async runtime shutting down while the
 312      /// task is still running.
 313      ///
 314      /// If no shutdown signal was sent it returns `None`. This can happen if
 315      /// `stop_executor` is called multiple times.
 316      ///
 317      /// ## Panics
 318      /// If called in parallel with [`start_executor`](Self::start_executor).
 319      pub fn stop_executor(&self) -> Option<()> {
 320          self.inner.stop_executor()
 321      }
 322  
 323      /// Returns a reference to the [`Notifier`] that can be used to subscribe to
 324      /// state transitions
 325      pub fn notifier(&self) -> &Notifier {
 326          &self.inner.notifier
 327      }
 328  }
 329  
 330  impl Drop for ExecutorInner {
 331      fn drop(&mut self) {
 332          self.stop_executor();
 333      }
 334  }
 335  
 336  struct TransitionForActiveState {
 337      outcome: serde_json::Value,
 338      state: DynState,
 339      meta: ActiveStateMeta,
 340      transition_fn: StateTransitionFunction<DynState>,
 341  }
 342  
 343  impl ExecutorInner {
 344      async fn run(
 345          &self,
 346          global_context_gen: ContextGen,
 347          sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
 348      ) {
 349          debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
 350          if let Err(err) = self
 351              .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
 352              .await
 353          {
 354              warn!(
 355                  %err,
 356                  "An unexpected error occurred during a state transition"
 357              );
 358          }
 359      }
 360  
 361      async fn get_transition_for(
 362          &self,
 363          state: &DynState,
 364          meta: ActiveStateMeta,
 365          global_context_gen: &ContextGen,
 366      ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
 367          let module_instance = state.module_instance_id();
 368          let context = &self
 369              .module_contexts
 370              .get(&module_instance)
 371              .expect("Unknown module");
 372          let transitions = state
 373              .transitions(
 374                  context,
 375                  &global_context_gen(module_instance, state.operation_id()),
 376              )
 377              .into_iter()
 378              .map(|transition| {
 379                  let state = state.clone();
 380                  let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
 381                      let StateTransition {
 382                          trigger,
 383                          transition,
 384                      } = transition;
 385                      TransitionForActiveState {
 386                          outcome: trigger.await,
 387                          state,
 388                          transition_fn: transition,
 389                          meta,
 390                      }
 391                  });
 392                  f
 393              })
 394              .collect::<Vec<_>>();
 395          if transitions.is_empty() {
 396              // In certain cases a terminal (no transitions) state could get here due to
 397              // module bug. Inactivate it to prevent accumulation of such states.
 398              // See [`Self::add_state_machines_dbtx`].
 399              warn!(module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream.");
 400              self.db
 401                  .autocommit::<_, _, anyhow::Error>(
 402                      |dbtx, _| {
 403                          Box::pin(async {
 404                              let k = InactiveStateKey::from_state(state.clone());
 405                              let v = ActiveStateMeta::default().into_inactive();
 406                              dbtx.remove_entry(&ActiveStateKey::from_state(state.clone()))
 407                                  .await;
 408                              dbtx.insert_entry(&k, &v).await;
 409                              Ok(())
 410                          })
 411                      },
 412                      None,
 413                  )
 414                  .await
 415                  .expect("Autocommit here can't fail");
 416          }
 417  
 418          transitions
 419      }
 420  
 421      async fn run_state_machines_executor_inner(
 422          &self,
 423          global_context_gen: ContextGen,
 424          mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
 425      ) -> anyhow::Result<()> {
 426          let active_states = self.get_active_states().await;
 427          trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
 428          for (state, _meta) in active_states {
 429              self.sm_update_tx
 430                  .send(state)
 431                  .expect("Must be able to send state machine to own opened channel");
 432          }
 433  
 434          /// All futures in the executor resolve to this type, so the handling
 435          /// code can tell them apart.
 436          enum ExecutorLoopEvent {
 437              /// Notification about `DynState` arrived and should be handled,
 438              /// usually added to the list of pending futures.
 439              New { state: DynState },
 440              /// One of trigger futures of a state machine finished and
 441              /// returned transition function to run
 442              Triggered(TransitionForActiveState),
 443              /// Transition function and all the accounting around it are done
 444              Completed {
 445                  state: DynState,
 446                  outcome: ActiveOrInactiveState,
 447              },
 448              /// New job receiver disconnected, that can only mean termination
 449              Disconnected,
 450          }
 451  
 452          // Keeps track of things already running, so we can deduplicate, just
 453          // in case.
 454          let mut currently_running_sms = HashSet::<DynState>::new();
 455          // All things happening in parallel go into here
 456          let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
 457              FuturesUnordered::new();
 458  
 459          loop {
 460              let event = tokio::select! {
 461                  new = sm_update_rx.recv() => {
 462                      if let Some(new) = new {
 463                          ExecutorLoopEvent::New {
 464                              state: new,
 465                          }
 466                      } else {
 467                          ExecutorLoopEvent::Disconnected
 468                      }
 469                  },
 470  
 471                  event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
 472              };
 473  
 474              // main reactor loop: wait for next thing that completed, react (possibly adding
 475              // more things to `futures`)
 476              match event {
 477                  ExecutorLoopEvent::New { state } => {
 478                      if currently_running_sms.contains(&state) {
 479                          warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Received a state machine that is already running. Ignoring");
 480                          continue;
 481                      }
 482                      let Some(meta) = self.get_active_state(&state).await else {
 483                          warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Couldn't look up received state machine. Ignoring.");
 484                          continue;
 485                      };
 486  
 487                      let transitions = self
 488                          .get_transition_for(&state, meta, &global_context_gen)
 489                          .await;
 490                      if transitions.is_empty() {
 491                          warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), "Received an active state that doesn't produce any transitions. Ignoring.");
 492                          continue;
 493                      }
 494  
 495                      let transitions_num = transitions.len();
 496                      currently_running_sms.insert(state.clone());
 497                      futures.push(Box::pin(async move {
 498                          let (first_completed_result, _index, _unused_transitions) =
 499                              select_all(transitions).await;
 500                          ExecutorLoopEvent::Triggered(first_completed_result)
 501                      }));
 502  
 503                      debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id(), total = futures.len(), transitions_num, "New active state machine.");
 504                  }
 505                  ExecutorLoopEvent::Triggered(TransitionForActiveState {
 506                      outcome,
 507                      state,
 508                      meta,
 509                      transition_fn,
 510                  }) => {
 511                      debug!(
 512                          target: LOG_CLIENT_REACTOR,
 513                          operation_id = %state.operation_id(),
 514                          "Triggered state transition",
 515                      );
 516                      let span = tracing::debug_span!(
 517                          target: LOG_CLIENT_REACTOR,
 518                          "sm_transition",
 519                          operation_id = %state.operation_id()
 520                      );
 521                      // Perform the transition as another future, so transitions can happen in
 522                      // parallel.
 523                      // Database write conflicts might be happening quite often here,
 524                      // but transaction functions are supposed to be idempotent anyway,
 525                      // so it seems like a good stress-test in the worst case.
 526                      futures.push({
 527                          let sm_update_tx = self.sm_update_tx.clone();
 528                          let db = self.db.clone();
 529                          let notifier = self.notifier.clone();
 530                          let module_contexts = self.module_contexts.clone();
 531                          let global_context_gen = global_context_gen.clone();
 532                          Box::pin(
 533                              async move {
 534                                  debug!(
 535                                      target: LOG_CLIENT_REACTOR,
 536                                      "Executing state transition",
 537                                  );
 538                                  trace!(
 539                                      target: LOG_CLIENT_REACTOR,
 540                                      ?state,
 541                                      outcome = ?AbbreviateJson(&outcome),
 542                                      "Executing state transition (details)",
 543                                  );
 544  
 545                                  let module_contexts = &module_contexts;
 546                                  let global_context_gen = &global_context_gen;
 547  
 548                                  let outcome = db
 549                                      .autocommit::<'_, '_, _, _, Infallible>(
 550                                          |dbtx, _| {
 551                                              let state = state.clone();
 552                                              let transition_fn = transition_fn.clone();
 553                                              let transition_outcome = outcome.clone();
 554                                              Box::pin(async move {
 555                                                  let new_state = transition_fn(
 556                                                      &mut ClientSMDatabaseTransaction::new(
 557                                                          &mut dbtx.to_ref(),
 558                                                          state.module_instance_id(),
 559                                                      ),
 560                                                      transition_outcome,
 561                                                      state.clone(),
 562                                                  )
 563                                                  .await;
 564                                                  dbtx.remove_entry(&ActiveStateKey::from_state(
 565                                                      state.clone(),
 566                                                  ))
 567                                                  .await;
 568                                                  dbtx.insert_entry(
 569                                                      &InactiveStateKey::from_state(state.clone()),
 570                                                      &meta.into_inactive(),
 571                                                  )
 572                                                  .await;
 573  
 574                                                  let context = &module_contexts
 575                                                      .get(&state.module_instance_id())
 576                                                      .expect("Unknown module");
 577  
 578                                                  let global_context = global_context_gen(
 579                                                      state.module_instance_id(),
 580                                                      state.operation_id(),
 581                                                  );
 582                                                  if new_state.is_terminal(context, &global_context) {
 583                                                      let k = InactiveStateKey::from_state(
 584                                                          new_state.clone(),
 585                                                      );
 586                                                      let v = ActiveStateMeta::default().into_inactive();
 587                                                      dbtx.insert_entry(&k, &v).await;
 588                                                      Ok(ActiveOrInactiveState::Inactive {
 589                                                          dyn_state: new_state,
 590                                                      })
 591                                                  } else {
 592                                                      let k = ActiveStateKey::from_state(
 593                                                          new_state.clone(),
 594                                                      );
 595                                                      let v = ActiveStateMeta::default();
 596                                                      dbtx.insert_entry(&k, &v).await;
 597                                                      Ok(ActiveOrInactiveState::Active {
 598                                                          dyn_state: new_state,
 599                                                          meta: v,
 600                                                      })
 601                                                  }
 602                                              })
 603                                          },
 604                                          None,
 605                                      )
 606                                      .await
 607                                      .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
 608  
 609                                  debug!(
 610                                      target: LOG_CLIENT_REACTOR,
 611                                      terminal = !outcome.is_active(),
 612                                      ?outcome,
 613                                      "State transition complete",
 614                                  );
 615  
 616                                  match &outcome {
 617                                      ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
 618                                          sm_update_tx
 619                                              .send(dyn_state.clone())
 620                                              .expect("can't fail: we are the receiving end");
 621                                          notifier.notify(dyn_state.clone());
 622                                      }
 623                                      ActiveOrInactiveState::Inactive { dyn_state } => {
 624                                          notifier.notify(dyn_state.clone());
 625                                      }
 626                                  }
 627                                  ExecutorLoopEvent::Completed { state, outcome }
 628                              }
 629                              .instrument(span),
 630                          )
 631                      });
 632                  }
 633                  ExecutorLoopEvent::Completed { state, outcome } => {
 634                      assert!(
 635                          currently_running_sms.remove(&state),
 636                          "State must have been recorded"
 637                      );
 638                      debug!(
 639                          target: LOG_CLIENT_REACTOR,
 640                          operation_id = %state.operation_id(),
 641                          outcome_active = outcome.is_active(),
 642                          total = futures.len(),
 643                          "State transition complete"
 644                      );
 645                      trace!(
 646                          target: LOG_CLIENT_REACTOR,
 647                          ?outcome,
 648                          operation_id = %state.operation_id(), total = futures.len(),
 649                          "State transition complete"
 650                      );
 651                  }
 652                  ExecutorLoopEvent::Disconnected => {
 653                      break;
 654                  }
 655              }
 656          }
 657  
 658          info!(target: LOG_CLIENT_REACTOR, "Terminated.");
 659          Ok(())
 660      }
 661  
 662      async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
 663          self.db
 664              .begin_transaction()
 665              .await
 666              .find_by_prefix(&ActiveStateKeyPrefix)
 667              .await
 668              // ignore states from modules that are not initialized yet
 669              .filter(|(state, _)| {
 670                  future::ready(
 671                      self.module_contexts
 672                          .contains_key(&state.state.module_instance_id()),
 673                  )
 674              })
 675              .map(|(state, meta)| (state.state, meta))
 676              .collect::<Vec<_>>()
 677              .await
 678      }
 679  
 680      async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
 681          // ignore states from modules that are not initialized yet
 682          if !self
 683              .module_contexts
 684              .contains_key(&state.module_instance_id())
 685          {
 686              return None;
 687          }
 688          self.db
 689              .begin_transaction()
 690              .await
 691              .get_value(&ActiveStateKey::from_state(state.clone()))
 692              .await
 693      }
 694  
 695      async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
 696          self.db
 697              .begin_transaction()
 698              .await
 699              .find_by_prefix(&InactiveStateKeyPrefix)
 700              .await
 701              // ignore states from modules that are not initialized yet
 702              .filter(|(state, _)| {
 703                  future::ready(
 704                      self.module_contexts
 705                          .contains_key(&state.state.module_instance_id()),
 706                  )
 707              })
 708              .map(|(state, meta)| (state.state, meta))
 709              .collect::<Vec<_>>()
 710              .await
 711      }
 712  }
 713  
 714  impl ExecutorInner {
 715      /// See [`Executor::stop_executor`].
 716      fn stop_executor(&self) -> Option<()> {
 717          let Some(shutdown_sender) = self
 718              .shutdown_executor
 719              .try_lock()
 720              .expect("Only locked during startup, no collisions should be possible")
 721              .take()
 722          else {
 723              debug!("Executor already stopped, ignoring stop request");
 724              return None;
 725          };
 726  
 727          if shutdown_sender.send(()).is_err() {
 728              warn!("Failed to send shutdown signal to executor, already dead?");
 729          }
 730  
 731          Some(())
 732      }
 733  }
 734  
 735  impl Debug for ExecutorInner {
 736      fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 737          writeln!(f, "ExecutorInner {{}}")
 738      }
 739  }
 740  
 741  impl ExecutorBuilder {
 742      /// Allow executor being built to run state machines associated with the
 743      /// supplied module
 744      pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
 745      where
 746          C: IntoDynInstance<DynType = DynContext>,
 747      {
 748          self.with_module_dyn(context.into_dyn(instance_id));
 749      }
 750  
 751      /// Allow executor being built to run state machines associated with the
 752      /// supplied module
 753      pub fn with_module_dyn(&mut self, context: DynContext) {
 754          self.valid_module_ids.insert(context.module_instance_id());
 755  
 756          if self
 757              .module_contexts
 758              .insert(context.module_instance_id(), context)
 759              .is_some()
 760          {
 761              panic!("Tried to add two modules with the same instance id!");
 762          }
 763      }
 764  
 765      /// Allow executor to build state machines associated with the module id,
 766      /// for which the module itself might not be available yet (otherwise it
 767      /// would be registered with `[Self::with_module_dyn]`).
 768      pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
 769          self.valid_module_ids.insert(module_id);
 770      }
 771  
 772      /// Build [`Executor`] and spawn background task in `tasks` executing active
 773      /// state machines. The supplied database `db` must support isolation, so
 774      /// cannot be an isolated DB instance itself.
 775      pub async fn build(
 776          self,
 777          db: Database,
 778          notifier: Notifier,
 779          client_task_group: TaskGroup,
 780      ) -> Executor {
 781          let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
 782  
 783          let inner = Arc::new(ExecutorInner {
 784              db,
 785              context: Mutex::new(None),
 786              module_contexts: self.module_contexts,
 787              valid_module_ids: self.valid_module_ids,
 788              notifier,
 789              shutdown_executor: Default::default(),
 790              sm_update_tx,
 791              sm_update_rx: Mutex::new(Some(sm_update_rx)),
 792              client_task_group,
 793          });
 794  
 795          debug!(
 796              instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
 797              "Initialized state machine executor with module instances"
 798          );
 799          Executor { inner }
 800      }
 801  }
 802  
 803  /// A state that is able to make progress eventually
 804  #[derive(Debug)]
 805  pub struct ActiveStateKey {
 806      // TODO: remove redundant operation id from state trait
 807      pub operation_id: OperationId,
 808      // TODO: state being a key... seems ... risky?
 809      pub state: DynState,
 810  }
 811  
 812  impl ActiveStateKey {
 813      pub fn from_state(state: DynState) -> ActiveStateKey {
 814          ActiveStateKey {
 815              operation_id: state.operation_id(),
 816              state,
 817          }
 818      }
 819  }
 820  
 821  impl Encodable for ActiveStateKey {
 822      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
 823          let mut len = 0;
 824          len += self.operation_id.consensus_encode(writer)?;
 825          len += self.state.consensus_encode(writer)?;
 826          Ok(len)
 827      }
 828  }
 829  
 830  impl Decodable for ActiveStateKey {
 831      fn consensus_decode<R: Read>(
 832          reader: &mut R,
 833          modules: &ModuleDecoderRegistry,
 834      ) -> Result<Self, DecodeError> {
 835          let operation_id = OperationId::consensus_decode(reader, modules)?;
 836          let state = DynState::consensus_decode(reader, modules)?;
 837  
 838          Ok(ActiveStateKey {
 839              operation_id,
 840              state,
 841          })
 842      }
 843  }
 844  
 845  #[derive(Debug)]
 846  pub struct ActiveStateKeyBytes {
 847      pub operation_id: OperationId,
 848      pub module_instance_id: ModuleInstanceId,
 849      pub state: Vec<u8>,
 850  }
 851  
 852  impl Encodable for ActiveStateKeyBytes {
 853      fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
 854          let mut len = 0;
 855          len += self.operation_id.consensus_encode(writer)?;
 856          len += writer.write(self.state.as_slice())?;
 857          Ok(len)
 858      }
 859  }
 860  
 861  impl Decodable for ActiveStateKeyBytes {
 862      fn consensus_decode<R: std::io::Read>(
 863          reader: &mut R,
 864          modules: &ModuleDecoderRegistry,
 865      ) -> Result<Self, DecodeError> {
 866          let operation_id = OperationId::consensus_decode(reader, modules)?;
 867          let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
 868          let mut bytes = Vec::new();
 869          reader
 870              .read_to_end(&mut bytes)
 871              .map_err(DecodeError::from_err)?;
 872  
 873          let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
 874          instance_bytes.append(&mut bytes);
 875  
 876          Ok(ActiveStateKeyBytes {
 877              operation_id,
 878              module_instance_id,
 879              state: instance_bytes,
 880          })
 881      }
 882  }
 883  
 884  #[derive(Debug)]
 885  pub(crate) struct ActiveOperationStateKeyPrefix {
 886      pub operation_id: OperationId,
 887  }
 888  
 889  impl Encodable for ActiveOperationStateKeyPrefix {
 890      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
 891          self.operation_id.consensus_encode(writer)
 892      }
 893  }
 894  
 895  impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
 896      type Record = ActiveStateKey;
 897  }
 898  
 899  #[derive(Debug)]
 900  pub(crate) struct ActiveModuleOperationStateKeyPrefix {
 901      pub operation_id: OperationId,
 902      pub module_instance: ModuleInstanceId,
 903  }
 904  
 905  impl Encodable for ActiveModuleOperationStateKeyPrefix {
 906      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
 907          let mut len = 0;
 908          len += self.operation_id.consensus_encode(writer)?;
 909          len += self.module_instance.consensus_encode(writer)?;
 910          Ok(len)
 911      }
 912  }
 913  
 914  impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
 915      type Record = ActiveStateKey;
 916  }
 917  
 918  #[derive(Debug)]
 919  pub struct ActiveStateKeyPrefix;
 920  
 921  impl Encodable for ActiveStateKeyPrefix {
 922      fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
 923          Ok(0)
 924      }
 925  }
 926  
 927  #[derive(Debug, Copy, Clone, Encodable, Decodable)]
 928  pub struct ActiveStateMeta {
 929      pub created_at: SystemTime,
 930  }
 931  
 932  impl ::fedimint_core::db::DatabaseRecord for ActiveStateKey {
 933      const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
 934      const NOTIFY_ON_MODIFY: bool = true;
 935      type Key = Self;
 936      type Value = ActiveStateMeta;
 937  }
 938  
 939  impl DatabaseKeyWithNotify for ActiveStateKey {}
 940  
 941  impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
 942      type Record = ActiveStateKey;
 943  }
 944  
 945  #[derive(Debug, Encodable, Decodable)]
 946  pub(crate) struct ActiveStateKeyPrefixBytes;
 947  
 948  impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
 949      const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
 950      const NOTIFY_ON_MODIFY: bool = false;
 951      type Key = Self;
 952      type Value = ActiveStateMeta;
 953  }
 954  
 955  impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
 956      type Record = ActiveStateKeyBytes;
 957  }
 958  
 959  impl Default for ActiveStateMeta {
 960      fn default() -> Self {
 961          Self {
 962              created_at: fedimint_core::time::now(),
 963          }
 964      }
 965  }
 966  
 967  impl ActiveStateMeta {
 968      fn into_inactive(self) -> InactiveStateMeta {
 969          InactiveStateMeta {
 970              created_at: self.created_at,
 971              exited_at: fedimint_core::time::now(),
 972          }
 973      }
 974  }
 975  
 976  /// A past or final state of a state machine
 977  #[derive(Debug, Clone)]
 978  pub struct InactiveStateKey {
 979      // TODO: remove redundant operation id from state trait
 980      pub operation_id: OperationId,
 981      pub state: DynState,
 982  }
 983  
 984  impl InactiveStateKey {
 985      pub fn from_state(state: DynState) -> InactiveStateKey {
 986          InactiveStateKey {
 987              operation_id: state.operation_id(),
 988              state,
 989          }
 990      }
 991  }
 992  
 993  impl Encodable for InactiveStateKey {
 994      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
 995          let mut len = 0;
 996          len += self.operation_id.consensus_encode(writer)?;
 997          len += self.state.consensus_encode(writer)?;
 998          Ok(len)
 999      }
1000  }
1001  
1002  impl Decodable for InactiveStateKey {
1003      fn consensus_decode<R: Read>(
1004          reader: &mut R,
1005          modules: &ModuleDecoderRegistry,
1006      ) -> Result<Self, DecodeError> {
1007          let operation_id = OperationId::consensus_decode(reader, modules)?;
1008          let state = DynState::consensus_decode(reader, modules)?;
1009  
1010          Ok(InactiveStateKey {
1011              operation_id,
1012              state,
1013          })
1014      }
1015  }
1016  
1017  #[derive(Debug)]
1018  pub struct InactiveStateKeyBytes {
1019      pub operation_id: OperationId,
1020      pub module_instance_id: ModuleInstanceId,
1021      pub state: Vec<u8>,
1022  }
1023  
1024  impl Encodable for InactiveStateKeyBytes {
1025      fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
1026          let mut len = 0;
1027          len += self.operation_id.consensus_encode(writer)?;
1028          len += writer.write(self.state.as_slice())?;
1029          Ok(len)
1030      }
1031  }
1032  
1033  impl Decodable for InactiveStateKeyBytes {
1034      fn consensus_decode<R: std::io::Read>(
1035          reader: &mut R,
1036          modules: &ModuleDecoderRegistry,
1037      ) -> Result<Self, DecodeError> {
1038          let operation_id = OperationId::consensus_decode(reader, modules)?;
1039          let module_instance_id = ModuleInstanceId::consensus_decode(reader, modules)?;
1040          let mut bytes = Vec::new();
1041          reader
1042              .read_to_end(&mut bytes)
1043              .map_err(DecodeError::from_err)?;
1044  
1045          let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1046          instance_bytes.append(&mut bytes);
1047  
1048          Ok(InactiveStateKeyBytes {
1049              operation_id,
1050              module_instance_id,
1051              state: instance_bytes,
1052          })
1053      }
1054  }
1055  
1056  #[derive(Debug)]
1057  pub(crate) struct InactiveOperationStateKeyPrefix {
1058      pub operation_id: OperationId,
1059  }
1060  
1061  impl Encodable for InactiveOperationStateKeyPrefix {
1062      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1063          self.operation_id.consensus_encode(writer)
1064      }
1065  }
1066  
1067  impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1068      type Record = InactiveStateKey;
1069  }
1070  
1071  #[derive(Debug)]
1072  pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1073      pub operation_id: OperationId,
1074      pub module_instance: ModuleInstanceId,
1075  }
1076  
1077  impl Encodable for InactiveModuleOperationStateKeyPrefix {
1078      fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<usize, Error> {
1079          let mut len = 0;
1080          len += self.operation_id.consensus_encode(writer)?;
1081          len += self.module_instance.consensus_encode(writer)?;
1082          Ok(len)
1083      }
1084  }
1085  
1086  impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1087      type Record = InactiveStateKey;
1088  }
1089  
1090  #[derive(Debug, Clone)]
1091  pub struct InactiveStateKeyPrefix;
1092  
1093  impl Encodable for InactiveStateKeyPrefix {
1094      fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<usize, Error> {
1095          Ok(0)
1096      }
1097  }
1098  
1099  #[derive(Debug, Encodable, Decodable)]
1100  pub(crate) struct InactiveStateKeyPrefixBytes;
1101  
1102  impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1103      const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1104      const NOTIFY_ON_MODIFY: bool = false;
1105      type Key = Self;
1106      type Value = InactiveStateMeta;
1107  }
1108  
1109  impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1110      type Record = InactiveStateKeyBytes;
1111  }
1112  
1113  #[derive(Debug, Copy, Clone, Decodable, Encodable)]
1114  pub struct InactiveStateMeta {
1115      pub created_at: SystemTime,
1116      pub exited_at: SystemTime,
1117  }
1118  
1119  impl ::fedimint_core::db::DatabaseRecord for InactiveStateKey {
1120      const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1121      const NOTIFY_ON_MODIFY: bool = true;
1122      type Key = Self;
1123      type Value = InactiveStateMeta;
1124  }
1125  
1126  impl DatabaseKeyWithNotify for InactiveStateKey {}
1127  
1128  impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1129      type Record = InactiveStateKey;
1130  }
1131  
1132  #[derive(Debug)]
1133  enum ActiveOrInactiveState {
1134      Active {
1135          dyn_state: DynState,
1136          #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1137          meta: ActiveStateMeta,
1138      },
1139      Inactive {
1140          dyn_state: DynState,
1141      },
1142  }
1143  
1144  impl ActiveOrInactiveState {
1145      fn is_active(&self) -> bool {
1146          match self {
1147              ActiveOrInactiveState::Active { .. } => true,
1148              ActiveOrInactiveState::Inactive { .. } => false,
1149          }
1150      }
1151  }
1152  
1153  #[cfg(test)]
1154  mod tests {
1155      use std::fmt::Debug;
1156      use std::sync::Arc;
1157      use std::time::Duration;
1158  
1159      use fedimint_core::core::{
1160          Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId,
1161      };
1162      use fedimint_core::db::mem_impl::MemDatabase;
1163      use fedimint_core::db::Database;
1164      use fedimint_core::encoding::{Decodable, Encodable};
1165      use fedimint_core::module::registry::ModuleDecoderRegistry;
1166      use fedimint_core::runtime;
1167      use fedimint_core::task::TaskGroup;
1168      use tokio::sync::broadcast::Sender;
1169      use tracing::{info, trace};
1170  
1171      use crate::sm::state::{Context, DynContext, DynState};
1172      use crate::sm::{Executor, Notifier, State, StateTransition};
1173      use crate::DynGlobalClientContext;
1174  
1175      #[derive(Debug, Clone, Eq, PartialEq, Decodable, Encodable, Hash)]
1176      enum MockStateMachine {
1177          Start,
1178          ReceivedNonNull(u64),
1179          Final,
1180      }
1181  
1182      impl State for MockStateMachine {
1183          type ModuleContext = MockContext;
1184  
1185          fn transitions(
1186              &self,
1187              context: &Self::ModuleContext,
1188              _global_context: &DynGlobalClientContext,
1189          ) -> Vec<StateTransition<Self>> {
1190              match self {
1191                  MockStateMachine::Start => {
1192                      let mut receiver1 = context.broadcast.subscribe();
1193                      let mut receiver2 = context.broadcast.subscribe();
1194                      vec![
1195                          StateTransition::new(
1196                              async move {
1197                                  loop {
1198                                      let val = receiver1.recv().await.unwrap();
1199                                      if val == 0 {
1200                                          trace!("State transition Start->Final");
1201                                          break;
1202                                      }
1203                                  }
1204                              },
1205                              |_dbtx, (), _state| Box::pin(async move { MockStateMachine::Final }),
1206                          ),
1207                          StateTransition::new(
1208                              async move {
1209                                  loop {
1210                                      let val = receiver2.recv().await.unwrap();
1211                                      if val != 0 {
1212                                          trace!("State transition Start->ReceivedNonNull");
1213                                          break val;
1214                                      }
1215                                  }
1216                              },
1217                              |_dbtx, value, _state| {
1218                                  Box::pin(async move { MockStateMachine::ReceivedNonNull(value) })
1219                              },
1220                          ),
1221                      ]
1222                  }
1223                  MockStateMachine::ReceivedNonNull(prev_val) => {
1224                      let prev_val = *prev_val;
1225                      let mut receiver = context.broadcast.subscribe();
1226                      vec![StateTransition::new(
1227                          async move {
1228                              loop {
1229                                  let val = receiver.recv().await.unwrap();
1230                                  if val == prev_val {
1231                                      trace!("State transition ReceivedNonNull->Final");
1232                                      break;
1233                                  }
1234                              }
1235                          },
1236                          |_dbtx, (), _state| Box::pin(async move { MockStateMachine::Final }),
1237                      )]
1238                  }
1239                  MockStateMachine::Final => {
1240                      vec![]
1241                  }
1242              }
1243          }
1244  
1245          fn operation_id(&self) -> OperationId {
1246              OperationId([0u8; 32])
1247          }
1248      }
1249  
1250      impl IntoDynInstance for MockStateMachine {
1251          type DynType = DynState;
1252  
1253          fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1254              DynState::from_typed(instance_id, self)
1255          }
1256      }
1257  
1258      #[derive(Debug, Clone)]
1259      struct MockContext {
1260          broadcast: tokio::sync::broadcast::Sender<u64>,
1261      }
1262  
1263      impl IntoDynInstance for MockContext {
1264          type DynType = DynContext;
1265  
1266          fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
1267              DynContext::from_typed(instance_id, self)
1268          }
1269      }
1270  
1271      impl Context for MockContext {}
1272  
1273      async fn get_executor() -> (Executor, Sender<u64>, Database) {
1274          let (broadcast, _) = tokio::sync::broadcast::channel(10);
1275  
1276          let mut decoder_builder = Decoder::builder();
1277          decoder_builder.with_decodable_type::<MockStateMachine>();
1278          let decoder = decoder_builder.build();
1279  
1280          let decoders =
1281              ModuleDecoderRegistry::new(vec![(42, ModuleKind::from_static_str("test"), decoder)]);
1282          let db = Database::new(MemDatabase::new(), decoders);
1283  
1284          let mut executor_builder = Executor::builder();
1285          executor_builder.with_module(
1286              42,
1287              MockContext {
1288                  broadcast: broadcast.clone(),
1289              },
1290          );
1291          let executor = executor_builder
1292              .build(db.clone(), Notifier::new(db.clone()), TaskGroup::new())
1293              .await;
1294          executor
1295              .start_executor(Arc::new(|_, _| DynGlobalClientContext::new_fake()))
1296              .await;
1297  
1298          info!("Initialized test executor");
1299          (executor, broadcast, db)
1300      }
1301  
1302      #[tokio::test]
1303      #[tracing_test::traced_test]
1304      async fn test_executor() {
1305          const MOCK_INSTANCE_1: ModuleInstanceId = 42;
1306          const MOCK_INSTANCE_2: ModuleInstanceId = 21;
1307  
1308          let (executor, sender, _db) = get_executor().await;
1309          executor
1310              .add_state_machines(vec![DynState::from_typed(
1311                  MOCK_INSTANCE_1,
1312                  MockStateMachine::Start,
1313              )])
1314              .await
1315              .unwrap();
1316  
1317          assert!(
1318              executor
1319                  .add_state_machines(vec![DynState::from_typed(
1320                      MOCK_INSTANCE_1,
1321                      MockStateMachine::Start
1322                  )])
1323                  .await
1324                  .is_err(),
1325              "Running the same state machine a second time should fail"
1326          );
1327  
1328          assert!(
1329              executor
1330                  .contains_active_state(MOCK_INSTANCE_1, MockStateMachine::Start)
1331                  .await,
1332              "State was written to DB and waits for broadcast"
1333          );
1334          assert!(
1335              !executor
1336                  .contains_active_state(MOCK_INSTANCE_2, MockStateMachine::Start)
1337                  .await,
1338              "Instance separation works"
1339          );
1340  
1341          // TODO build await fn+timeout or allow manual driving of executor
1342          runtime::sleep(Duration::from_secs(1)).await;
1343          sender.send(0).unwrap();
1344          runtime::sleep(Duration::from_secs(2)).await;
1345  
1346          assert!(
1347              executor
1348                  .contains_inactive_state(MOCK_INSTANCE_1, MockStateMachine::Final)
1349                  .await,
1350              "State was written to DB and waits for broadcast"
1351          );
1352      }
1353  }