/ fedimint-client / src / lib.rs
lib.rs
   1  //! # Client library for fedimintd
   2  //!
   3  //! This library provides a client interface to build module clients that can be
   4  //! plugged together into a fedimint client that exposes a high-level interface
   5  //! for application authors to integrate with.
   6  //!
   7  //! ## Module Clients
   8  //! Module clients have to at least implement the [`module::ClientModule`] trait
   9  //! and a factory struct implementing [`module::init::ClientModuleInit`]. The
  10  //! `ClientModule` trait defines the module types (tx inputs, outputs, etc.) as
  11  //! well as the module's [state machines](sm::State).
  12  //!
  13  //! ### State machines
  14  //! State machines are spawned when starting operations and drive them
  15  //! forward in the background. All module state machines are run by a central
  16  //! [`sm::Executor`]. This means typically starting an operation shall return
  17  //! instantly.
  18  //!
  19  //! For example when doing a deposit the function starting it would immediately
  20  //! return a deposit address and a [`OperationId`] (important concept, highly
  21  //! recommended to read the docs) while spawning a state machine checking the
  22  //! blockchain for incoming bitcoin transactions. The progress of these state
  23  //! machines can then be *observed* using the operation id, but no further user
  24  //! interaction is required to drive them forward.
  25  //!
  26  //! ### State Machine Contexts
  27  //! State machines have access to both a [global
  28  //! context](`DynGlobalClientContext`) as well as to a [module-specific
  29  //! context](module::ClientModule::context).
  30  //!
  31  //! The global context provides access to the federation API and allows to claim
  32  //! module outputs (and transferring the value into the client's wallet), which
  33  //! can be used for refunds.
  34  //!
  35  //! The client-specific context can be used for other purposes, such as
  36  //! supplying config to the state transitions or giving access to other APIs
  37  //! (e.g. LN gateway in case of the lightning module).
  38  //!
  39  //! ### Extension traits
  40  //! The modules themselves can only create inputs and outputs that then have to
  41  //! be combined into transactions by the user and submitted via
  42  //! [`Client::finalize_and_submit_transaction`]. To make this easier most module
  43  //! client implementations contain an extension trait which is implemented for
  44  //! [`Client`] and allows to create the most typical fedimint transactions with
  45  //! a single function call.
  46  //!
  47  //! To observe the progress each high level operation function should be
  48  //! accompanied by one returning a stream of high-level operation updates.
  49  //! Internally that stream queries the state machines belonging to the
  50  //! operation to determine the high-level operation state.
  51  //!
  52  //! ### Primary Modules
  53  //! Not all modules have the ability to hold money for long. E.g. the lightning
  54  //! module and its smart contracts are only used to incentivize LN payments, not
  55  //! to hold money. The mint module on the other hand holds e-cash note and can
  56  //! thus be used to fund transactions and to absorb change. Module clients with
  57  //! this ability should implement [`ClientModule::  supports_being_primary`] and
  58  //! related methods.
  59  //!
  60  //! For a example of a client module see [the mint client](https://github.com/fedimint/fedimint/blob/master/modules/fedimint-mint-client/src/lib.rs).
  61  //!
  62  //! ## Client
  63  //! The [`Client`] struct is the main entry point for application authors. It is
  64  //! constructed using its builder which can be obtained via [`Client::builder`].
  65  //! The supported module clients have to be chosen at compile time while the
  66  //! actually available ones will be determined by the config loaded at runtime.
  67  //!
  68  //! For a hacky instantiation of a complete client see the [`ng` subcommand of `fedimint-cli`](https://github.com/fedimint/fedimint/blob/55f9d88e17d914b92a7018de677d16e57ed42bf6/fedimint-cli/src/ng.rs#L56-L73).
  69  
  70  use std::collections::{BTreeMap, HashSet};
  71  use std::fmt::{Debug, Formatter};
  72  use std::ops::{self, Range};
  73  use std::pin::Pin;
  74  use std::sync::{Arc, Weak};
  75  use std::time::Duration;
  76  
  77  use anyhow::{anyhow, bail, ensure, Context};
  78  use async_stream::stream;
  79  use backup::ClientBackup;
  80  use db::{
  81      apply_migrations_client, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey,
  82      ClientConfigKeyPrefix, ClientInitStateKey, ClientModuleRecovery, EncodedClientSecretKey,
  83      InitMode,
  84  };
  85  use envs::get_discover_api_version_timeout;
  86  use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, DynModuleApi, IGlobalFederationApi};
  87  use fedimint_core::config::{
  88      ClientConfig, ClientModuleConfig, FederationId, JsonClientConfig, JsonWithKind,
  89      ModuleInitRegistry,
  90  };
  91  use fedimint_core::core::{
  92      DynInput, DynOutput, IInput, IOutput, ModuleInstanceId, ModuleKind, OperationId,
  93  };
  94  use fedimint_core::db::{
  95      AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
  96  };
  97  use fedimint_core::encoding::{Decodable, Encodable};
  98  use fedimint_core::module::registry::ModuleDecoderRegistry;
  99  use fedimint_core::module::{
 100      ApiAuth, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary, SupportedCoreApiVersions,
 101      SupportedModuleApiVersions,
 102  };
 103  use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup};
 104  use fedimint_core::transaction::Transaction;
 105  use fedimint_core::util::{BoxStream, NextOrPending};
 106  use fedimint_core::{
 107      apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env,
 108      maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
 109      TransactionId,
 110  };
 111  pub use fedimint_derive_secret as derivable_secret;
 112  use fedimint_derive_secret::DerivableSecret;
 113  use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
 114  use futures::{Future, Stream, StreamExt};
 115  use meta::{LegacyMetaSource, MetaService};
 116  use module::recovery::RecoveryProgress;
 117  use module::{DynClientModule, FinalClient};
 118  use rand::thread_rng;
 119  use secp256k1_zkp::{PublicKey, Secp256k1};
 120  use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _};
 121  use thiserror::Error;
 122  #[cfg(not(target_family = "wasm"))]
 123  use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor};
 124  use tokio::sync::watch;
 125  use tokio_stream::wrappers::WatchStream;
 126  use tracing::{debug, error, info, warn};
 127  
 128  use crate::backup::Metadata;
 129  use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey};
 130  use crate::module::init::{
 131      ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit,
 132  };
 133  use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator};
 134  use crate::oplog::OperationLog;
 135  use crate::sm::executor::{
 136      ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix,
 137  };
 138  use crate::sm::{
 139      ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, OperationState, State,
 140  };
 141  use crate::transaction::{
 142      tx_submission_sm_decoder, ClientInput, ClientOutput, TransactionBuilder, TxSubmissionContext,
 143      TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE,
 144  };
 145  
 146  /// Client backup
 147  pub mod backup;
 148  /// Database keys used by the client
 149  pub mod db;
 150  /// Environment variables
 151  pub mod envs;
 152  /// Module client interface definitions
 153  pub mod module;
 154  /// Operation log subsystem of the client
 155  pub mod oplog;
 156  /// Secret handling & derivation
 157  pub mod secret;
 158  /// Client state machine interfaces and executor implementation
 159  pub mod sm;
 160  /// Structs and interfaces to construct Fedimint transactions
 161  pub mod transaction;
 162  
 163  /// Management of meta fields
 164  pub mod meta;
 165  
 166  pub type InstancelessDynClientInput = ClientInput<
 167      Box<maybe_add_send_sync!(dyn IInput + 'static)>,
 168      Box<maybe_add_send_sync!(dyn IState + 'static)>,
 169  >;
 170  
 171  pub type InstancelessDynClientOutput = ClientOutput<
 172      Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
 173      Box<maybe_add_send_sync!(dyn IState + 'static)>,
 174  >;
 175  
 176  #[derive(Debug, Error)]
 177  pub enum AddStateMachinesError {
 178      #[error("State already exists in database")]
 179      StateAlreadyExists,
 180      #[error("Got {0}")]
 181      Other(#[from] anyhow::Error),
 182  }
 183  
 184  pub enum DiscoverCommonApiVersionMode {
 185      /// Get the response from only a few peers, or until a timeout
 186      Fast,
 187      /// Try to get a response from all peers, or until a timeout
 188      Full,
 189  }
 190  
 191  pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
 192  
 193  #[apply(async_trait_maybe_send!)]
 194  pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
 195      /// Returned a reference client's module API client, so that module-specific
 196      /// calls can be made
 197      fn module_api(&self) -> DynModuleApi;
 198  
 199      fn client_config(&self) -> &ClientConfig;
 200  
 201      /// Returns a reference to the client's federation API client. The provided
 202      /// interface [`IGlobalFederationApi`] typically does not provide the
 203      /// necessary functionality, for this extension traits like
 204      /// [`fedimint_api_client::api::IGlobalFederationApi`] have to be used.
 205      // TODO: Could be removed in favor of client() except for testing
 206      fn api(&self) -> &DynGlobalApi;
 207  
 208      fn decoders(&self) -> &ModuleDecoderRegistry;
 209  
 210      /// This function is mostly meant for internal use, you are probably looking
 211      /// for [`DynGlobalClientContext::claim_input`].
 212      /// Returns transaction id of the funding transaction and an optional
 213      /// `OutPoint` that represents change if change was added.
 214      async fn claim_input_dyn(
 215          &self,
 216          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 217          input: InstancelessDynClientInput,
 218      ) -> (TransactionId, Vec<OutPoint>);
 219  
 220      /// This function is mostly meant for internal use, you are probably looking
 221      /// for [`DynGlobalClientContext::fund_output`].
 222      /// Returns transaction id of the funding transaction and an optional
 223      /// `OutPoint` that represents change if change was added.
 224      async fn fund_output_dyn(
 225          &self,
 226          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 227          output: InstancelessDynClientOutput,
 228      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>;
 229  
 230      /// Adds a state machine to the executor.
 231      async fn add_state_machine_dyn(
 232          &self,
 233          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 234          sm: Box<maybe_add_send_sync!(dyn IState)>,
 235      ) -> AddStateMachinesResult;
 236  
 237      async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>>;
 238  }
 239  
 240  #[apply(async_trait_maybe_send!)]
 241  impl IGlobalClientContext for () {
 242      fn module_api(&self) -> DynModuleApi {
 243          unimplemented!("fake implementation, only for tests");
 244      }
 245  
 246      fn client_config(&self) -> &ClientConfig {
 247          unimplemented!("fake implementation, only for tests");
 248      }
 249  
 250      fn api(&self) -> &DynGlobalApi {
 251          unimplemented!("fake implementation, only for tests");
 252      }
 253  
 254      fn decoders(&self) -> &ModuleDecoderRegistry {
 255          unimplemented!("fake implementation, only for tests");
 256      }
 257  
 258      async fn claim_input_dyn(
 259          &self,
 260          _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 261          _input: InstancelessDynClientInput,
 262      ) -> (TransactionId, Vec<OutPoint>) {
 263          unimplemented!("fake implementation, only for tests");
 264      }
 265  
 266      async fn fund_output_dyn(
 267          &self,
 268          _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 269          _output: InstancelessDynClientOutput,
 270      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
 271          unimplemented!("fake implementation, only for tests");
 272      }
 273  
 274      async fn add_state_machine_dyn(
 275          &self,
 276          _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 277          _sm: Box<maybe_add_send_sync!(dyn IState)>,
 278      ) -> AddStateMachinesResult {
 279          unimplemented!("fake implementation, only for tests");
 280      }
 281  
 282      async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>> {
 283          unimplemented!("fake implementation, only for tests");
 284      }
 285  }
 286  
 287  dyn_newtype_define! {
 288      /// Global state and functionality provided to all state machines running in the
 289      /// client
 290      #[derive(Clone)]
 291      pub DynGlobalClientContext(Arc<IGlobalClientContext>)
 292  }
 293  
 294  impl DynGlobalClientContext {
 295      pub fn new_fake() -> Self {
 296          DynGlobalClientContext::from(())
 297      }
 298  
 299      pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
 300          self.transaction_update_stream()
 301              .await
 302              .filter_map(|tx_update| {
 303                  std::future::ready(match tx_update.state {
 304                      TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
 305                      TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
 306                          Some(Err(submit_error))
 307                      }
 308                      _ => None,
 309                  })
 310              })
 311              .next_or_pending()
 312              .await
 313      }
 314  
 315      /// Creates a transaction that with an output of the primary module,
 316      /// claiming the given input and transferring its value into the client's
 317      /// wallet.
 318      ///
 319      /// The transactions submission state machine as well as the state
 320      /// machines responsible for the generated output are generated
 321      /// automatically. The caller is responsible for the input's state machines,
 322      /// should there be any required.
 323      pub async fn claim_input<I, S>(
 324          &self,
 325          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 326          input: ClientInput<I, S>,
 327      ) -> (TransactionId, Vec<OutPoint>)
 328      where
 329          I: IInput + MaybeSend + MaybeSync + 'static,
 330          S: IState + MaybeSend + MaybeSync + 'static,
 331      {
 332          self.claim_input_dyn(
 333              dbtx,
 334              InstancelessDynClientInput {
 335                  input: Box::new(input.input),
 336                  keys: input.keys,
 337                  amount: input.amount,
 338                  state_machines: states_to_instanceless_dyn(input.state_machines),
 339              },
 340          )
 341          .await
 342      }
 343  
 344      /// Creates a transaction with the supplied output and funding added by the
 345      /// primary module if possible. If the primary module does not have the
 346      /// required funds this function fails.
 347      ///
 348      /// The transactions submission state machine as well as the state machines
 349      /// for the funding inputs are generated automatically. The caller is
 350      /// responsible for the output's state machines, should there be any
 351      /// required.
 352      pub async fn fund_output<O, S>(
 353          &self,
 354          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 355          output: ClientOutput<O, S>,
 356      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
 357      where
 358          O: IOutput + MaybeSend + MaybeSync + 'static,
 359          S: IState + MaybeSend + MaybeSync + 'static,
 360      {
 361          self.fund_output_dyn(
 362              dbtx,
 363              InstancelessDynClientOutput {
 364                  output: Box::new(output.output),
 365                  amount: output.amount,
 366                  state_machines: states_to_instanceless_dyn(output.state_machines),
 367              },
 368          )
 369          .await
 370      }
 371  
 372      /// Allows adding state machines from inside a transition to the executor.
 373      /// The added state machine belongs to the same module instance as the state
 374      /// machine from inside which it was spawned.
 375      pub async fn add_state_machine<S>(
 376          &self,
 377          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 378          sm: S,
 379      ) -> AddStateMachinesResult
 380      where
 381          S: State + MaybeSend + MaybeSync + 'static,
 382      {
 383          self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
 384      }
 385  }
 386  
 387  fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
 388      state_gen: StateGenerator<S>,
 389  ) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
 390      Arc::new(move |txid, out_idx| {
 391          let states: Vec<S> = state_gen(txid, out_idx);
 392          states
 393              .into_iter()
 394              .map(|state| box_up_state(state))
 395              .collect()
 396      })
 397  }
 398  
 399  /// Not sure why I couldn't just directly call `Box::new` ins
 400  /// [`states_to_instanceless_dyn`], but this fixed it.
 401  fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
 402      Box::new(state)
 403  }
 404  
 405  impl<T> From<Arc<T>> for DynGlobalClientContext
 406  where
 407      T: IGlobalClientContext,
 408  {
 409      fn from(inner: Arc<T>) -> Self {
 410          DynGlobalClientContext { inner }
 411      }
 412  }
 413  
 414  // TODO: impl `Debug` for `Client` and derive here
 415  impl Debug for Client {
 416      fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
 417          write!(f, "Client")
 418      }
 419  }
 420  
 421  /// Global state given to a specific client module and state. It is aware inside
 422  /// which module instance and operation it is used and to avoid module being
 423  /// aware of their instance id etc.
 424  #[derive(Clone, Debug)]
 425  struct ModuleGlobalClientContext {
 426      client: Arc<Client>,
 427      module_instance_id: ModuleInstanceId,
 428      operation: OperationId,
 429  }
 430  
 431  #[apply(async_trait_maybe_send!)]
 432  impl IGlobalClientContext for ModuleGlobalClientContext {
 433      fn module_api(&self) -> DynModuleApi {
 434          self.api().with_module(self.module_instance_id)
 435      }
 436  
 437      fn api(&self) -> &DynGlobalApi {
 438          &self.client.api
 439      }
 440  
 441      fn decoders(&self) -> &ModuleDecoderRegistry {
 442          self.client.decoders()
 443      }
 444  
 445      fn client_config(&self) -> &ClientConfig {
 446          self.client.config()
 447      }
 448  
 449      async fn claim_input_dyn(
 450          &self,
 451          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 452          input: InstancelessDynClientInput,
 453      ) -> (TransactionId, Vec<OutPoint>) {
 454          let instance_input = ClientInput {
 455              input: DynInput::from_parts(self.module_instance_id, input.input),
 456              keys: input.keys,
 457              amount: input.amount,
 458              state_machines: states_add_instance(self.module_instance_id, input.state_machines),
 459          };
 460  
 461          self.client
 462              .finalize_and_submit_transaction_inner(
 463                  &mut dbtx.global_tx().to_ref_nc(),
 464                  self.operation,
 465                  TransactionBuilder::new().with_input(instance_input),
 466              )
 467              .await
 468              .expect("Can only fail if additional funding is needed")
 469      }
 470  
 471      async fn fund_output_dyn(
 472          &self,
 473          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 474          output: InstancelessDynClientOutput,
 475      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
 476          let instance_output = ClientOutput {
 477              output: DynOutput::from_parts(self.module_instance_id, output.output),
 478              amount: output.amount,
 479              state_machines: states_add_instance(self.module_instance_id, output.state_machines),
 480          };
 481  
 482          self.client
 483              .finalize_and_submit_transaction_inner(
 484                  &mut dbtx.global_tx().to_ref_nc(),
 485                  self.operation,
 486                  TransactionBuilder::new().with_output(instance_output),
 487              )
 488              .await
 489      }
 490  
 491      async fn add_state_machine_dyn(
 492          &self,
 493          dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
 494          sm: Box<maybe_add_send_sync!(dyn IState)>,
 495      ) -> AddStateMachinesResult {
 496          let state = DynState::from_parts(self.module_instance_id, sm);
 497  
 498          self.client
 499              .executor
 500              .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state])
 501              .await
 502      }
 503  
 504      async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>> {
 505          self.client.transaction_update_stream(self.operation).await
 506      }
 507  }
 508  
 509  fn states_add_instance(
 510      module_instance_id: ModuleInstanceId,
 511      state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
 512  ) -> StateGenerator<DynState> {
 513      Arc::new(move |txid, out_idx| {
 514          let states = state_gen(txid, out_idx);
 515          Iterator::collect(
 516              states
 517                  .into_iter()
 518                  .map(|state| DynState::from_parts(module_instance_id, state)),
 519          )
 520      })
 521  }
 522  
 523  /// User handle to [`Client`]
 524  ///
 525  /// On the drop of [`ClientHandle`] the client will be shut-down, and resources
 526  /// it used freed.
 527  ///
 528  /// Notably it [`ops::Deref`]s to the [`Client`] where most
 529  /// methods live.
 530  ///
 531  /// Put this in an Arc to clone it.
 532  #[derive(Debug)]
 533  pub struct ClientHandle {
 534      inner: Option<Arc<Client>>,
 535  }
 536  
 537  pub type ClientHandleArc = Arc<ClientHandle>;
 538  
 539  impl ClientHandle {
 540      /// Create
 541      fn new(inner: Arc<Client>) -> Self {
 542          ClientHandle {
 543              inner: inner.into(),
 544          }
 545      }
 546  
 547      fn as_inner(&self) -> &Arc<Client> {
 548          self.inner.as_ref().expect("Inner always set")
 549      }
 550  
 551      pub async fn start_executor(&self) {
 552          self.as_inner().start_executor().await
 553      }
 554  
 555      /// Shutdown the client.
 556      pub async fn shutdown(mut self) {
 557          self.shutdown_inner().await
 558      }
 559  
 560      async fn shutdown_inner(&mut self) {
 561          let Some(inner) = self.inner.take() else {
 562              error!("ClientHandleShared::shutdown called twice");
 563              return;
 564          };
 565          inner.executor.stop_executor();
 566          let db = inner.db.clone();
 567          debug!(target: LOG_CLIENT, "Waiting for client task group to shut down");
 568          if let Err(err) = inner
 569              .task_group
 570              .clone()
 571              .shutdown_join_all(Some(Duration::from_secs(30)))
 572              .await
 573          {
 574              warn!(target: LOG_CLIENT, %err, "Error waiting for client task group to shut down");
 575          }
 576  
 577          let client_strong_count = Arc::strong_count(&inner);
 578          debug!(target: LOG_CLIENT, "Dropping last handle to Client");
 579          // We are sure that no background tasks are running in the client anymore, so we
 580          // can drop the (usually) last inner reference.
 581          drop(inner);
 582  
 583          if client_strong_count != 1 {
 584              debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped");
 585          }
 586  
 587          let db_strong_count = db.strong_count();
 588          if db_strong_count != 1 {
 589              debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped");
 590          }
 591      }
 592  
 593      /// Restart the client
 594      ///
 595      /// Returns false if there are other clones of [`ClientHandle`], or starting
 596      /// the client again failed for some reason.
 597      ///
 598      /// Notably it will re-use the original [`Database`] handle, and not attempt
 599      /// to open it again.
 600      pub async fn restart(self) -> anyhow::Result<ClientHandle> {
 601          let (builder, config, root_secret) = {
 602              let client = self
 603                  .inner
 604                  .as_ref()
 605                  .ok_or_else(|| anyhow::format_err!("Already stopped"))?;
 606              let builder = ClientBuilder::from_existing(client);
 607              let config = client.config.clone();
 608              let root_secret = client.root_secret.clone();
 609  
 610              (builder, config, root_secret)
 611          };
 612          self.shutdown().await;
 613  
 614          builder.build(root_secret, config, false).await
 615      }
 616  }
 617  
 618  impl ops::Deref for ClientHandle {
 619      type Target = Client;
 620  
 621      fn deref(&self) -> &Self::Target {
 622          self.inner.as_ref().expect("Must have inner client set")
 623      }
 624  }
 625  
 626  impl ClientHandle {
 627      pub(crate) fn downgrade(&self) -> ClientWeak {
 628          ClientWeak {
 629              inner: Arc::downgrade(self.inner.as_ref().expect("Inner always set")),
 630          }
 631      }
 632  }
 633  
 634  /// Internal self-reference to [`Client`]
 635  #[derive(Debug, Clone)]
 636  pub(crate) struct ClientStrong {
 637      inner: Arc<Client>,
 638  }
 639  
 640  impl ops::Deref for ClientStrong {
 641      type Target = Client;
 642  
 643      fn deref(&self) -> &Self::Target {
 644          self.inner.deref()
 645      }
 646  }
 647  
 648  /// Like [`ClientStrong`] but using a [`Weak`] handle to [`Client`]
 649  ///
 650  /// This is not meant to be used by external code.
 651  #[derive(Debug, Clone)]
 652  pub(crate) struct ClientWeak {
 653      inner: Weak<Client>,
 654  }
 655  
 656  impl ClientWeak {
 657      pub fn upgrade(&self) -> Option<ClientStrong> {
 658          Weak::upgrade(&self.inner).map(|inner| ClientStrong { inner })
 659      }
 660  }
 661  
 662  /// We need a separate drop implementation for `Client` that triggers
 663  /// `Executor::stop_executor` even though the `Drop` implementation of
 664  /// `ExecutorInner` should already take care of that. The reason is that as long
 665  /// as the executor task is active there may be a cycle in the
 666  /// `Arc<Client>`s such that at least one `Executor` never gets dropped.
 667  impl Drop for ClientHandle {
 668      fn drop(&mut self) {
 669          if self.inner.is_none() {
 670              return;
 671          }
 672  
 673          // We can't use block_on in single-threaded mode or wasm
 674          #[cfg(target_family = "wasm")]
 675          let can_block = false;
 676          #[cfg(not(target_family = "wasm"))]
 677          // nosemgrep: ban-raw-block-on
 678          let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread;
 679          if !can_block {
 680              let inner = self.inner.take().expect("Must have inner client set");
 681              inner.executor.stop_executor();
 682              if cfg!(target_family = "wasm") {
 683                  error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually.");
 684              } else {
 685                  error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually.");
 686              }
 687              return;
 688          }
 689  
 690          debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop");
 691          #[cfg(not(target_family = "wasm"))]
 692          runtime::block_in_place(|| {
 693              runtime::block_on(self.shutdown_inner());
 694          });
 695      }
 696  }
 697  
 698  /// List of core api versions supported by the implementation.
 699  /// Notably `major` version is the one being supported, and corresponding
 700  /// `minor` version is the one required (for given `major` version).
 701  const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
 702      &[ApiVersion { major: 0, minor: 0 }];
 703  
 704  pub type ModuleGlobalContextGen = ContextGen;
 705  
 706  /// Resources particular to a module instance
 707  pub struct ClientModuleInstance<'m, M: ClientModule> {
 708      /// Instance id of the module
 709      pub id: ModuleInstanceId,
 710      /// Module-specific DB
 711      pub db: Database,
 712      /// Module-specific API
 713      pub api: DynModuleApi,
 714  
 715      module: &'m M,
 716  }
 717  
 718  impl<'m, M> ops::Deref for ClientModuleInstance<'m, M>
 719  where
 720      M: ClientModule,
 721  {
 722      type Target = M;
 723  
 724      fn deref(&self) -> &Self::Target {
 725          self.module
 726      }
 727  }
 728  
 729  /// Main client type
 730  ///
 731  /// A handle and API to interacting with a single Federation.
 732  ///
 733  /// Under the hood managing service tasks, state machines,
 734  /// database and other resources required.
 735  ///
 736  /// This type is shared externally and internally, and
 737  /// [`ClientHandle`] is responsible for external lifecycle management
 738  /// and resource freeing of the [`Client`].
 739  pub struct Client {
 740      config: ClientConfig,
 741      decoders: ModuleDecoderRegistry,
 742      db: Database,
 743      federation_id: FederationId,
 744      federation_meta: BTreeMap<String, String>,
 745      primary_module_instance: ModuleInstanceId,
 746      modules: ClientModuleRegistry,
 747      module_inits: ClientModuleInitRegistry,
 748      executor: Executor,
 749      api: DynGlobalApi,
 750      root_secret: DerivableSecret,
 751      operation_log: OperationLog,
 752      secp_ctx: Secp256k1<secp256k1_zkp::All>,
 753      meta_service: Arc<MetaService>,
 754  
 755      task_group: TaskGroup,
 756  
 757      /// Updates about client recovery progress
 758      client_recovery_progress_receiver:
 759          watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
 760  }
 761  
 762  impl Client {
 763      /// Initialize a client builder that can be configured to create a new
 764      /// client.
 765      pub fn builder(db: Database) -> ClientBuilder {
 766          ClientBuilder::new(db)
 767      }
 768  
 769      pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
 770          self.api.as_ref()
 771      }
 772  
 773      pub fn api_clone(&self) -> DynGlobalApi {
 774          self.api.clone()
 775      }
 776  
 777      /// Get the [`TaskGroup`] that is tied to Client's lifetime.
 778      pub fn task_group(&self) -> &TaskGroup {
 779          &self.task_group
 780      }
 781  
 782      pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
 783          let mut dbtx = db.begin_transaction().await;
 784          #[allow(clippy::let_and_return)]
 785          let config = dbtx
 786              .find_by_prefix(&ClientConfigKeyPrefix)
 787              .await
 788              .next()
 789              .await
 790              .map(|(_, config)| config);
 791          config
 792      }
 793  
 794      pub async fn store_encodable_client_secret<T: Encodable>(
 795          db: &Database,
 796          secret: T,
 797      ) -> anyhow::Result<()> {
 798          let mut dbtx = db.begin_transaction().await;
 799  
 800          // Don't overwrite an existing secret
 801          match dbtx.get_value(&EncodedClientSecretKey).await {
 802              Some(_) => bail!("Encoded client secret already exists, cannot overwrite"),
 803              None => {
 804                  let encoded_secret = T::consensus_encode_to_vec(&secret);
 805                  dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
 806                      .await;
 807                  dbtx.commit_tx().await;
 808                  Ok(())
 809              }
 810          }
 811      }
 812  
 813      pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
 814          let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
 815              bail!("Encoded client secret not present in DB")
 816          };
 817  
 818          Ok(secret)
 819      }
 820      pub async fn load_decodable_client_secret_opt<T: Decodable>(
 821          db: &Database,
 822      ) -> anyhow::Result<Option<T>> {
 823          let mut dbtx = db.begin_transaction_nc().await;
 824  
 825          let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
 826  
 827          Ok(match client_secret {
 828              Some(client_secret) => Some(
 829                  T::consensus_decode(&mut client_secret.as_slice(), &Default::default())
 830                      .map_err(|e| anyhow!("Decoding failed: {e}"))?,
 831              ),
 832              None => None,
 833          })
 834      }
 835  
 836      pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
 837          let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
 838              Ok(secret) => secret,
 839              Err(_) => {
 840                  let secret = PlainRootSecretStrategy::random(&mut thread_rng());
 841                  Self::store_encodable_client_secret(db, secret)
 842                      .await
 843                      .expect("Storing client secret must work");
 844                  secret
 845              }
 846          };
 847          Ok(client_secret)
 848      }
 849  
 850      pub async fn is_initialized(db: &Database) -> bool {
 851          Self::get_config_from_db(db).await.is_some()
 852      }
 853  
 854      pub async fn start_executor(self: &Arc<Self>) {
 855          debug!(
 856              "Starting fedimint client executor (version: {})",
 857              fedimint_build_code_version_env!()
 858          );
 859          self.executor.start_executor(self.context_gen()).await;
 860      }
 861  
 862      pub fn federation_id(&self) -> FederationId {
 863          self.federation_id
 864      }
 865  
 866      fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
 867          let client_inner = Arc::downgrade(self);
 868          Arc::new(move |module_instance, operation| {
 869              ModuleGlobalClientContext {
 870                  client: client_inner
 871                      .clone()
 872                      .upgrade()
 873                      .expect("ModuleGlobalContextGen called after client was dropped"),
 874                  module_instance_id: module_instance,
 875                  operation,
 876              }
 877              .into()
 878          })
 879      }
 880  
 881      fn config(&self) -> &ClientConfig {
 882          &self.config
 883      }
 884  
 885      pub fn decoders(&self) -> &ModuleDecoderRegistry {
 886          &self.decoders
 887      }
 888  
 889      /// Returns a reference to the module, panics if not found
 890      fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
 891          self.try_get_module(instance)
 892              .expect("Module instance not found")
 893      }
 894  
 895      fn try_get_module(
 896          &self,
 897          instance: ModuleInstanceId,
 898      ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
 899          Some(self.modules.get(instance)?.as_ref())
 900      }
 901  
 902      pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
 903          self.modules.get(instance).is_some()
 904      }
 905  
 906      /// Returns the input amount and output amount of a transaction
 907      ///
 908      /// # Panics
 909      /// If any of the input or output versions in the transaction builder are
 910      /// unknown by the respective module.
 911      fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
 912          // FIXME: prevent overflows, currently not suitable for untrusted input
 913          let mut in_amount = Amount::ZERO;
 914          let mut out_amount = Amount::ZERO;
 915          let mut fee_amount = Amount::ZERO;
 916  
 917          for input in &builder.inputs {
 918              let module = self.get_module(input.input.module_instance_id());
 919  
 920              let item_fee = module.input_fee(&input.input).expect(
 921                  "We only build transactions with input versions that are supported by the module",
 922              );
 923  
 924              in_amount += input.amount;
 925              fee_amount += item_fee;
 926          }
 927  
 928          for output in &builder.outputs {
 929              let module = self.get_module(output.output.module_instance_id());
 930  
 931              let item_fee = module.output_fee(&output.output).expect(
 932                  "We only build transactions with output versions that are supported by the module",
 933              );
 934  
 935              out_amount += output.amount;
 936              fee_amount += item_fee;
 937          }
 938  
 939          (in_amount, out_amount + fee_amount)
 940      }
 941  
 942      pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
 943          Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
 944      }
 945  
 946      pub fn get_meta(&self, key: &str) -> Option<String> {
 947          self.federation_meta.get(key).cloned()
 948      }
 949  
 950      fn root_secret(&self) -> DerivableSecret {
 951          self.root_secret.clone()
 952      }
 953  
 954      pub async fn add_state_machines(
 955          &self,
 956          dbtx: &mut DatabaseTransaction<'_>,
 957          states: Vec<DynState>,
 958      ) -> AddStateMachinesResult {
 959          self.executor.add_state_machines_dbtx(dbtx, states).await
 960      }
 961  
 962      // TODO: implement as part of [`OperationLog`]
 963      pub async fn get_active_operations(&self) -> HashSet<OperationId> {
 964          let active_states = self.executor.get_active_states().await;
 965          let mut active_operations = HashSet::with_capacity(active_states.len());
 966          let mut dbtx = self.db().begin_transaction_nc().await;
 967          for (state, _) in active_states {
 968              let operation_id = state.operation_id();
 969              if dbtx
 970                  .get_value(&OperationLogKey { operation_id })
 971                  .await
 972                  .is_some()
 973              {
 974                  active_operations.insert(operation_id);
 975              }
 976          }
 977          active_operations
 978      }
 979  
 980      pub fn operation_log(&self) -> &OperationLog {
 981          &self.operation_log
 982      }
 983  
 984      /// Get the meta manager to read meta fields.
 985      pub fn meta_service(&self) -> &Arc<MetaService> {
 986          &self.meta_service
 987      }
 988  
 989      /// Adds funding to a transaction or removes over-funding via change.
 990      async fn finalize_transaction(
 991          &self,
 992          dbtx: &mut DatabaseTransaction<'_>,
 993          operation_id: OperationId,
 994          mut partial_transaction: TransactionBuilder,
 995      ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
 996          let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
 997  
 998          let (added_inputs, change_outputs) = self
 999              .primary_module()
1000              .create_final_inputs_and_outputs(
1001                  self.primary_module_instance,
1002                  dbtx,
1003                  operation_id,
1004                  input_amount,
1005                  output_amount,
1006              )
1007              .await?;
1008  
1009          // This is the range of  outputs that will be added to the transaction
1010          // in order to balance it. Notice that it may stay empty in case the transaction
1011          // is already balanced.
1012          let change_range = Range {
1013              start: partial_transaction.outputs.len() as u64,
1014              end: (partial_transaction.outputs.len() + change_outputs.len()) as u64,
1015          };
1016  
1017          partial_transaction.inputs.extend(added_inputs);
1018          partial_transaction.outputs.extend(change_outputs);
1019  
1020          let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
1021  
1022          assert_eq!(input_amount, output_amount, "Transaction is not balanced");
1023  
1024          let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
1025  
1026          Ok((tx, states, change_range))
1027      }
1028  
1029      /// Add funding and/or change to the transaction builder as needed, finalize
1030      /// the transaction and submit it to the federation.
1031      ///
1032      /// ## Errors
1033      /// The function will return an error if the operation with given ID already
1034      /// exists.
1035      ///
1036      /// ## Panics
1037      /// The function will panic if the database transaction collides with
1038      /// other and fails with others too often, this should not happen except for
1039      /// excessively concurrent scenarios.
1040      pub async fn finalize_and_submit_transaction<F, M>(
1041          &self,
1042          operation_id: OperationId,
1043          operation_type: &str,
1044          operation_meta: F,
1045          tx_builder: TransactionBuilder,
1046      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>
1047      where
1048          F: Fn(TransactionId, Vec<OutPoint>) -> M + Clone + MaybeSend + MaybeSync,
1049          M: serde::Serialize + MaybeSend,
1050      {
1051          let operation_type = operation_type.to_owned();
1052  
1053          let autocommit_res = self
1054              .db
1055              .autocommit(
1056                  |dbtx, _| {
1057                      let operation_type = operation_type.clone();
1058                      let tx_builder = tx_builder.clone();
1059                      let operation_meta = operation_meta.clone();
1060                      Box::pin(async move {
1061                          if Client::operation_exists_dbtx(dbtx, operation_id).await {
1062                              bail!("There already exists an operation with id {operation_id:?}")
1063                          }
1064  
1065                          let (txid, change) = self
1066                              .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
1067                              .await?;
1068  
1069                          self.operation_log()
1070                              .add_operation_log_entry(
1071                                  dbtx,
1072                                  operation_id,
1073                                  &operation_type,
1074                                  operation_meta(txid, change.clone()),
1075                              )
1076                              .await;
1077  
1078                          Ok((txid, change))
1079                      })
1080                  },
1081                  Some(100), // TODO: handle what happens after 100 retries
1082              )
1083              .await;
1084  
1085          match autocommit_res {
1086              Ok(txid) => Ok(txid),
1087              Err(AutocommitError::ClosureError { error, .. }) => Err(error),
1088              Err(AutocommitError::CommitFailed {
1089                  attempts,
1090                  last_error,
1091              }) => panic!(
1092                  "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
1093              ),
1094          }
1095      }
1096  
1097      async fn finalize_and_submit_transaction_inner(
1098          &self,
1099          dbtx: &mut DatabaseTransaction<'_>,
1100          operation_id: OperationId,
1101          tx_builder: TransactionBuilder,
1102      ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> {
1103          let (transaction, mut states, change_range) = self
1104              .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
1105              .await?;
1106  
1107          ensure!(
1108              transaction.consensus_encode_to_vec().len() <= Transaction::MAX_TX_SIZE,
1109              "The generated transaction would be rejected by the federation for being too large."
1110          );
1111  
1112          let txid = transaction.tx_hash();
1113  
1114          debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
1115  
1116          let change_outpoints = change_range
1117              .into_iter()
1118              .map(|out_idx| OutPoint { txid, out_idx })
1119              .collect();
1120  
1121          let tx_submission_sm = DynState::from_typed(
1122              TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1123              OperationState {
1124                  operation_id,
1125                  state: TxSubmissionStates::Created(transaction),
1126              },
1127          );
1128          states.push(tx_submission_sm);
1129  
1130          self.executor.add_state_machines_dbtx(dbtx, states).await?;
1131  
1132          Ok((txid, change_outpoints))
1133      }
1134  
1135      async fn transaction_update_stream(
1136          &self,
1137          operation_id: OperationId,
1138      ) -> BoxStream<'static, OperationState<TxSubmissionStates>> {
1139          self.executor
1140              .notifier()
1141              .module_notifier::<OperationState<TxSubmissionStates>>(
1142                  TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1143              )
1144              .subscribe(operation_id)
1145              .await
1146      }
1147  
1148      pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
1149          let mut dbtx = self.db().begin_transaction_nc().await;
1150  
1151          Client::operation_exists_dbtx(&mut dbtx, operation_id).await
1152      }
1153  
1154      pub async fn operation_exists_dbtx(
1155          dbtx: &mut DatabaseTransaction<'_>,
1156          operation_id: OperationId,
1157      ) -> bool {
1158          let active_state_exists = dbtx
1159              .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1160              .await
1161              .next()
1162              .await
1163              .is_some();
1164  
1165          let inactive_state_exists = dbtx
1166              .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
1167              .await
1168              .next()
1169              .await
1170              .is_some();
1171  
1172          active_state_exists || inactive_state_exists
1173      }
1174  
1175      pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
1176          self.db
1177              .begin_transaction()
1178              .await
1179              .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
1180              .await
1181              .next()
1182              .await
1183              .is_some()
1184      }
1185  
1186      /// Waits for an output from the primary module to reach its final
1187      /// state.
1188      pub async fn await_primary_module_output(
1189          &self,
1190          operation_id: OperationId,
1191          out_point: OutPoint,
1192      ) -> anyhow::Result<Amount> {
1193          self.primary_module()
1194              .await_primary_module_output(operation_id, out_point)
1195              .await
1196      }
1197  
1198      /// Returns a reference to a typed module client instance by kind
1199      pub fn get_first_module<M: ClientModule>(&self) -> ClientModuleInstance<M> {
1200          let module_kind = M::kind();
1201          let id = self
1202              .get_first_instance(&module_kind)
1203              .unwrap_or_else(|| panic!("No modules found of kind {module_kind}"));
1204          let module: &M = self
1205              .try_get_module(id)
1206              .unwrap_or_else(|| panic!("Unknown module instance {id}"))
1207              .as_any()
1208              .downcast_ref::<M>()
1209              .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>()));
1210          ClientModuleInstance {
1211              id,
1212              db: self.db().with_prefix_module_id(id),
1213              api: self.api().with_module(id),
1214              module,
1215          }
1216      }
1217  
1218      pub fn get_module_client_dyn(
1219          &self,
1220          instance_id: ModuleInstanceId,
1221      ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
1222          self.try_get_module(instance_id)
1223              .ok_or(anyhow!("Unknown module instance {}", instance_id))
1224      }
1225  
1226      pub fn db(&self) -> &Database {
1227          &self.db
1228      }
1229  
1230      /// Returns a stream of transaction updates for the given operation id that
1231      /// can later be used to watch for a specific transaction being accepted.
1232      pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1233          TransactionUpdates {
1234              update_stream: self.transaction_update_stream(operation_id).await,
1235          }
1236      }
1237  
1238      /// Returns the instance id of the first module of the given kind. The
1239      /// primary module will always be returned before any other modules (which
1240      /// themselves are ordered by their instance ID).
1241      pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
1242          if self
1243              .modules
1244              .get_with_kind(self.primary_module_instance)
1245              .map(|(kind, _)| kind == module_kind)
1246              .unwrap_or(false)
1247          {
1248              return Some(self.primary_module_instance);
1249          }
1250  
1251          self.modules
1252              .iter_modules()
1253              .find(|(_, kind, _module)| *kind == module_kind)
1254              .map(|(instance_id, _, _)| instance_id)
1255      }
1256  
1257      /// Returns the data from which the client's root secret is derived (e.g.
1258      /// BIP39 seed phrase struct).
1259      pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
1260          get_decoded_client_secret::<T>(self.db()).await
1261      }
1262  
1263      /// Waits for outputs from the primary module to reach its final
1264      /// state.
1265      pub async fn await_primary_module_outputs(
1266          &self,
1267          operation_id: OperationId,
1268          outputs: Vec<OutPoint>,
1269      ) -> anyhow::Result<Amount> {
1270          let mut amount = Amount::ZERO;
1271  
1272          for out_point in outputs {
1273              amount += self
1274                  .await_primary_module_output(operation_id, out_point)
1275                  .await?;
1276          }
1277  
1278          Ok(amount)
1279      }
1280  
1281      /// Returns the config with which the client was initialized.
1282      pub fn get_config(&self) -> &ClientConfig {
1283          &self.config
1284      }
1285  
1286      /// Returns the config of the client in JSON format.
1287      ///
1288      /// Compared to the consensus module format where module configs are binary
1289      /// encoded this format cannot be cryptographically verified but is easier
1290      /// to consume and to some degree human-readable.
1291      pub fn get_config_json(&self) -> JsonClientConfig {
1292          JsonClientConfig {
1293              global: self.get_config().global.clone(),
1294              modules: self
1295                  .get_config()
1296                  .modules
1297                  .iter()
1298                  .map(|(instance_id, ClientModuleConfig { kind, config, .. })| {
1299                      (
1300                          *instance_id,
1301                          JsonWithKind::new(
1302                              kind.clone(),
1303                              config
1304                                  .clone()
1305                                  .decoded()
1306                                  .map(|decoded| decoded.to_json().into())
1307                                  .unwrap_or(serde_json::Value::Null),
1308                          ),
1309                      )
1310                  })
1311                  .collect(),
1312          }
1313      }
1314  
1315      /// Get the primary module
1316      pub fn primary_module(&self) -> &DynClientModule {
1317          self.modules
1318              .get(self.primary_module_instance)
1319              .expect("primary module must be present")
1320      }
1321  
1322      /// Balance available to the client for spending
1323      pub async fn get_balance(&self) -> Amount {
1324          self.primary_module()
1325              .get_balance(
1326                  self.primary_module_instance,
1327                  &mut self.db().begin_transaction_nc().await,
1328              )
1329              .await
1330      }
1331  
1332      /// Returns a stream that yields the current client balance every time it
1333      /// changes.
1334      pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
1335          let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
1336          let initial_balance = self.get_balance().await;
1337          let db = self.db().clone();
1338          let primary_module = self.primary_module().clone();
1339          let primary_module_instance = self.primary_module_instance;
1340  
1341          Box::pin(stream! {
1342              yield initial_balance;
1343              let mut prev_balance = initial_balance;
1344              while let Some(()) = balance_changes.next().await {
1345                  let mut dbtx = db.begin_transaction_nc().await;
1346                  let balance = primary_module
1347                      .get_balance(primary_module_instance, &mut dbtx)
1348                      .await;
1349  
1350                  // Deduplicate in case modules cannot always tell if the balance actually changed
1351                  if balance != prev_balance {
1352                      prev_balance = balance;
1353                      yield balance;
1354                  }
1355              }
1356          })
1357      }
1358  
1359      pub async fn discover_common_api_version(
1360          &self,
1361          threshold: Option<usize>,
1362      ) -> anyhow::Result<ApiVersionSet> {
1363          Ok(self
1364              .api()
1365              .discover_api_version_set(
1366                  &Self::supported_api_versions_summary_static(self.get_config(), &self.module_inits)
1367                      .await,
1368                  get_discover_api_version_timeout(),
1369                  threshold,
1370              )
1371              .await?)
1372      }
1373  
1374      /// Query the federation for API version support and then calculate
1375      /// the best API version to use (supported by most guardians).
1376      pub async fn discover_common_api_version_static(
1377          config: &ClientConfig,
1378          client_module_init: &ClientModuleInitRegistry,
1379          api: &DynGlobalApi,
1380          mode: DiscoverCommonApiVersionMode,
1381      ) -> anyhow::Result<ApiVersionSet> {
1382          Ok(api
1383              .discover_api_version_set(
1384                  &Self::supported_api_versions_summary_static(config, client_module_init).await,
1385                  get_discover_api_version_timeout(),
1386                  match mode {
1387                      DiscoverCommonApiVersionMode::Fast => {
1388                          Some((config.global.api_endpoints.len() / 2).min(1))
1389                      }
1390                      DiscoverCommonApiVersionMode::Full => None,
1391                  },
1392              )
1393              .await?)
1394      }
1395  
1396      /// [`SupportedApiVersionsSummary`] that the client and its modules support
1397      pub async fn supported_api_versions_summary_static(
1398          config: &ClientConfig,
1399          client_module_init: &ClientModuleInitRegistry,
1400      ) -> SupportedApiVersionsSummary {
1401          SupportedApiVersionsSummary {
1402              core: SupportedCoreApiVersions {
1403                  core_consensus: config.global.consensus_version,
1404                  api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1405                      .expect("must not have conflicting versions"),
1406              },
1407              modules: config
1408                  .modules
1409                  .iter()
1410                  .filter_map(|(&module_instance_id, module_config)| {
1411                      client_module_init
1412                          .get(module_config.kind())
1413                          .map(|module_init| {
1414                              (
1415                                  module_instance_id,
1416                                  SupportedModuleApiVersions {
1417                                      core_consensus: config.global.consensus_version,
1418                                      module_consensus: module_config.version,
1419                                      api: module_init.supported_api_versions(),
1420                                  },
1421                              )
1422                          })
1423                  })
1424                  .collect(),
1425          }
1426      }
1427  
1428      /// Load the common api versions to use from cache and start a background
1429      /// process to refresh them.
1430      ///
1431      /// This is a compromise, so we not have to wait for version discovery to
1432      /// complete every time a [`Client`] is being built.
1433      async fn load_and_refresh_common_api_version_static(
1434          config: &ClientConfig,
1435          module_inits: &ModuleInitRegistry<DynClientModuleInit>,
1436          api: &DynGlobalApi,
1437          db: &Database,
1438          task_group: &TaskGroup,
1439      ) -> anyhow::Result<ApiVersionSet> {
1440          if let Some(v) = db
1441              .begin_transaction()
1442              .await
1443              .get_value(&CachedApiVersionSetKey)
1444              .await
1445          {
1446              debug!("Found existing cached common api versions");
1447              let config = config.clone();
1448              let module_inits = module_inits.clone();
1449              let api = api.clone();
1450              let db = db.clone();
1451              // Separate task group, because we actually don't want to be waiting for this to
1452              // finish, and it's just best effort.
1453              task_group.spawn_cancellable("refresh_common_api_version_static", async move {
1454                  if let Err(error) = Self::refresh_common_api_version_static(
1455                      &config,
1456                      &module_inits,
1457                      &api,
1458                      &db,
1459                      DiscoverCommonApiVersionMode::Full,
1460                  )
1461                  .await
1462                  {
1463                      warn!(%error, "Failed to discover common api versions");
1464                  }
1465              });
1466  
1467              return Ok(v.0);
1468          }
1469  
1470          debug!("No existing cached common api versions found, waiting for initial discovery");
1471          Self::refresh_common_api_version_static(
1472              config,
1473              module_inits,
1474              api,
1475              db,
1476              DiscoverCommonApiVersionMode::Fast,
1477          )
1478          .await
1479      }
1480  
1481      async fn refresh_common_api_version_static(
1482          config: &ClientConfig,
1483          module_inits: &ModuleInitRegistry<DynClientModuleInit>,
1484          api: &DynGlobalApi,
1485          db: &Database,
1486          mode: DiscoverCommonApiVersionMode,
1487      ) -> anyhow::Result<ApiVersionSet> {
1488          debug!("Refreshing common api versions");
1489  
1490          let common_api_versions =
1491              Client::discover_common_api_version_static(config, module_inits, api, mode).await?;
1492  
1493          debug!(
1494              value = ?common_api_versions,
1495              "Updating the cached common api versions"
1496          );
1497          let mut dbtx = db.begin_transaction().await;
1498          let _ = dbtx
1499              .insert_entry(
1500                  &CachedApiVersionSetKey,
1501                  &CachedApiVersionSet(common_api_versions.clone()),
1502              )
1503              .await;
1504  
1505          dbtx.commit_tx().await;
1506  
1507          Ok(common_api_versions)
1508      }
1509  
1510      /// Get the client [`Metadata`]
1511      pub async fn get_metadata(&self) -> Metadata {
1512          self.db
1513              .begin_transaction_nc()
1514              .await
1515              .get_value(&ClientMetadataKey)
1516              .await
1517              .unwrap_or_else(|| {
1518                  warn!("Missing existing metadata. This key should have been set on Client init");
1519                  Metadata::empty()
1520              })
1521      }
1522  
1523      /// Set the client [`Metadata`]
1524      pub async fn set_metadata(&self, metadata: &Metadata) {
1525          self.db
1526              .autocommit::<_, _, anyhow::Error>(
1527                  move |dbtx, _| {
1528                      Box::pin(async move {
1529                          Self::set_metadata_dbtx(dbtx, metadata).await;
1530                          Ok(())
1531                      })
1532                  },
1533                  None,
1534              )
1535              .await
1536              .expect("Failed to autocommit metadata")
1537      }
1538  
1539      pub async fn has_pending_recoveries(&self) -> bool {
1540          !self
1541              .client_recovery_progress_receiver
1542              .borrow()
1543              .iter()
1544              .all(|(_id, progress)| progress.is_done())
1545      }
1546  
1547      /// Wait for all module recoveries to finish
1548      ///
1549      /// This will block until the recovery task is done with recoveries.
1550      /// Returns success if all recovery tasks are complete (success case),
1551      /// or an error if some modules could not complete the recovery at the time.
1552      ///
1553      /// A bit of a heavy approach.
1554      pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1555          let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1556          recovery_receiver
1557              .wait_for(|in_progress| {
1558                  in_progress
1559                      .iter()
1560                      .all(|(_id, progress)| progress.is_done())
1561              })
1562              .await
1563              .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1564  
1565          Ok(())
1566      }
1567  
1568      /// Subscribe to recover progress for all the modules.
1569      ///
1570      /// This stream can contain duplicate progress for a module.
1571      /// Don't use this stream for detecting completion of recovery.
1572      pub fn subscribe_to_recovery_progress(
1573          &self,
1574      ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
1575          WatchStream::new(self.client_recovery_progress_receiver.clone())
1576              .flat_map(futures::stream::iter)
1577      }
1578  
1579      pub async fn wait_for_module_kind_recovery(
1580          &self,
1581          module_kind: ModuleKind,
1582      ) -> anyhow::Result<()> {
1583          let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1584          recovery_receiver
1585              .wait_for(|in_progress| {
1586                  !in_progress
1587                      .iter()
1588                      .filter(|(module_instance_id, _progress)| {
1589                          self.config.modules[module_instance_id].kind == module_kind
1590                      })
1591                      .any(|(_id, progress)| !progress.is_done())
1592              })
1593              .await
1594              .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1595  
1596          Ok(())
1597      }
1598  
1599      pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1600          loop {
1601              if self.executor.get_active_states().await.is_empty() {
1602                  break;
1603              }
1604              fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1605          }
1606          Ok(())
1607      }
1608  
1609      /// Set the client [`Metadata`]
1610      pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1611          dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1612      }
1613  
1614      async fn spawn_module_recoveries_task(
1615          &self,
1616          recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1617          module_recoveries: BTreeMap<
1618              ModuleInstanceId,
1619              Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1620          >,
1621          module_recovery_progress_receivers: BTreeMap<
1622              ModuleInstanceId,
1623              watch::Receiver<RecoveryProgress>,
1624          >,
1625      ) {
1626          let db = self.db.clone();
1627          self.task_group
1628              .spawn("module recoveries", move |_task_handle| async move {
1629                  Self::run_module_recoveries_task(
1630                      db,
1631                      recovery_sender,
1632                      module_recoveries,
1633                      module_recovery_progress_receivers,
1634                  )
1635                  .await
1636              });
1637      }
1638  
1639      async fn run_module_recoveries_task(
1640          db: Database,
1641          recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1642          module_recoveries: BTreeMap<
1643              ModuleInstanceId,
1644              Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1645          >,
1646          module_recovery_progress_receivers: BTreeMap<
1647              ModuleInstanceId,
1648              watch::Receiver<RecoveryProgress>,
1649          >,
1650      ) {
1651          debug!(target:LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1652          let mut completed_stream = Vec::new();
1653          let progress_stream = futures::stream::FuturesUnordered::new();
1654  
1655          for (module_instance_id, f) in module_recoveries.into_iter() {
1656              completed_stream.push(futures::stream::once(Box::pin(async move {
1657                  match f.await {
1658                      Ok(_) => (module_instance_id, None),
1659                      Err(err) => {
1660                          warn!(%err, module_instance_id, "Module recovery failed");
1661                          // a module recovery that failed reports and error and
1662                          // just never finishes, so we don't need a separate state
1663                          // for it
1664                          futures::future::pending::<Option<RecoveryProgress>>().await;
1665                          unreachable!()
1666                      }
1667                  }
1668              })));
1669          }
1670  
1671          for (module_instance_id, rx) in module_recovery_progress_receivers.into_iter() {
1672              progress_stream.push(
1673                  tokio_stream::wrappers::WatchStream::new(rx)
1674                      .fuse()
1675                      .map(move |progress| (module_instance_id, Some(progress))),
1676              );
1677          }
1678  
1679          let mut futures = futures::stream::select(
1680              futures::stream::select_all(progress_stream),
1681              futures::stream::select_all(completed_stream),
1682          );
1683  
1684          while let Some((module_instance_id, progress)) = futures.next().await {
1685              let mut dbtx = db.begin_transaction().await;
1686  
1687              let prev_progress = *recovery_sender
1688                  .borrow()
1689                  .get(&module_instance_id)
1690                  .expect("existing progress must be present");
1691  
1692              let progress = if prev_progress.is_done() {
1693                  // since updates might be out of order, once done, stick with it
1694                  prev_progress
1695              } else if let Some(progress) = progress {
1696                  progress
1697              } else {
1698                  prev_progress.to_complete()
1699              };
1700  
1701              info!(
1702                  module_instance_id,
1703                  progress = format!("{}/{}", progress.complete, progress.total),
1704                  "Recovery progress"
1705              );
1706  
1707              dbtx.insert_entry(
1708                  &ClientModuleRecovery { module_instance_id },
1709                  &ClientModuleRecoveryState { progress },
1710              )
1711              .await;
1712              dbtx.commit_tx().await;
1713  
1714              recovery_sender.send_modify(|v| {
1715                  v.insert(module_instance_id, progress);
1716              });
1717          }
1718          debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1719      }
1720  }
1721  
1722  /// See [`Client::transaction_updates`]
1723  pub struct TransactionUpdates {
1724      update_stream: BoxStream<'static, OperationState<TxSubmissionStates>>,
1725  }
1726  
1727  impl TransactionUpdates {
1728      /// Waits for the transaction to be accepted or rejected as part of the
1729      /// operation to which the `TransactionUpdates` object is subscribed.
1730      pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
1731          self.update_stream
1732              .filter_map(|tx_update| {
1733                  std::future::ready(match tx_update.state {
1734                      TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
1735                      TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
1736                          Some(Err(submit_error))
1737                      }
1738                      _ => None,
1739                  })
1740              })
1741              .next_or_pending()
1742              .await
1743      }
1744  }
1745  
1746  /// Admin (guardian) identification and authentication
1747  pub struct AdminCreds {
1748      /// Guardian's own `peer_id`
1749      pub peer_id: PeerId,
1750      /// Authentication details
1751      pub auth: ApiAuth,
1752  }
1753  
1754  /// Used to configure, assemble and build [`Client`]
1755  pub struct ClientBuilder {
1756      module_inits: ClientModuleInitRegistry,
1757      primary_module_instance: Option<ModuleInstanceId>,
1758      admin_creds: Option<AdminCreds>,
1759      db_no_decoders: Database,
1760      meta_service: Arc<MetaService>,
1761      stopped: bool,
1762  }
1763  
1764  impl ClientBuilder {
1765      fn new(db: Database) -> Self {
1766          let meta_service = MetaService::new(LegacyMetaSource::default());
1767          ClientBuilder {
1768              module_inits: Default::default(),
1769              primary_module_instance: Default::default(),
1770              admin_creds: None,
1771              db_no_decoders: db,
1772              stopped: false,
1773              meta_service,
1774          }
1775      }
1776  
1777      fn from_existing(client: &Client) -> Self {
1778          ClientBuilder {
1779              module_inits: client.module_inits.clone(),
1780              primary_module_instance: Some(client.primary_module_instance),
1781              admin_creds: None,
1782              db_no_decoders: client.db.with_decoders(Default::default()),
1783              stopped: false,
1784              // non unique
1785              meta_service: client.meta_service.clone(),
1786          }
1787      }
1788  
1789      /// Replace module generator registry entirely
1790      pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
1791          self.module_inits = module_inits;
1792      }
1793  
1794      /// Make module generator available when reading the config
1795      pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
1796          self.module_inits.attach(module_init);
1797      }
1798  
1799      pub fn stopped(&mut self) {
1800          self.stopped = true;
1801      }
1802  
1803      /// Uses this module with the given instance id as the primary module. See
1804      /// [`ClientModule::supports_being_primary`] for more information.
1805      ///
1806      /// ## Panics
1807      /// If there was a primary module specified previously
1808      pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
1809          let was_replaced = self
1810              .primary_module_instance
1811              .replace(primary_module_instance)
1812              .is_some();
1813          assert!(
1814              !was_replaced,
1815              "Only one primary module can be given to the builder."
1816          )
1817      }
1818  
1819      pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
1820          self.meta_service = meta_service;
1821      }
1822  
1823      async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> {
1824          // Only apply the client database migrations if the database has been
1825          // initialized.
1826          if let Ok(client_config) = self.load_existing_config().await {
1827              for (module_id, module_cfg) in client_config.modules {
1828                  let kind = module_cfg.kind.clone();
1829                  let Some(init) = self.module_inits.get(&kind) else {
1830                      // normal, expected and already logged about when building the client
1831                      continue;
1832                  };
1833  
1834                  apply_migrations_client(
1835                      db,
1836                      kind.to_string(),
1837                      init.database_version(),
1838                      init.get_database_migrations(),
1839                      module_id,
1840                  )
1841                  .await?;
1842              }
1843          }
1844  
1845          Ok(())
1846      }
1847  
1848      pub fn db_no_decoders(&self) -> &Database {
1849          &self.db_no_decoders
1850      }
1851  
1852      pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> {
1853          let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
1854              bail!("Client database not initialized")
1855          };
1856  
1857          Ok(config)
1858      }
1859  
1860      pub fn set_admin_creds(&mut self, creds: AdminCreds) {
1861          self.admin_creds = Some(creds);
1862      }
1863  
1864      async fn init(
1865          self,
1866          root_secret: DerivableSecret,
1867          config: ClientConfig,
1868          init_mode: InitMode,
1869      ) -> anyhow::Result<ClientHandle> {
1870          if Client::is_initialized(&self.db_no_decoders).await {
1871              bail!("Client database already initialized")
1872          }
1873  
1874          // Note: It's important all client initialization is performed as one big
1875          // transaction to avoid half-initialized client state.
1876          {
1877              debug!(target: LOG_CLIENT, "Initializing client database");
1878              let mut dbtx = self.db_no_decoders.begin_transaction().await;
1879              // Save config to DB
1880              dbtx.insert_new_entry(
1881                  &ClientConfigKey {
1882                      id: config.calculate_federation_id(),
1883                  },
1884                  &config,
1885              )
1886              .await;
1887  
1888              let init_state = InitState::Pending(init_mode);
1889              dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
1890  
1891              let metadata = init_state
1892                  .does_require_recovery()
1893                  .flatten()
1894                  .map(|s| s.metadata)
1895                  .unwrap_or(Metadata::empty());
1896  
1897              dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
1898  
1899              dbtx.commit_tx_result().await?;
1900          }
1901  
1902          let stopped = self.stopped;
1903          self.build(root_secret, config, stopped).await
1904      }
1905  
1906      /// Join a new Federation
1907      ///
1908      /// When a user wants to connect to a new federation this function fetches
1909      /// the federation config and initializes the client database. If a user
1910      /// already joined the federation in the past and has a preexisting database
1911      /// use [`ClientBuilder::open`] instead.
1912      ///
1913      /// **Warning**: Calling `join` with a `root_secret` key that was used
1914      /// previous to `join` a Federation will lead to all sorts of malfunctions
1915      /// including likely loss of funds.
1916      ///
1917      /// This should be generally called only if the `root_secret` key is known
1918      /// not to have been used before (e.g. just randomly generated). For keys
1919      /// that might have been previous used (e.g. provided by the user),
1920      /// it's safer to call [`Self::recover`] which will attempt to recover
1921      /// client module states for the Federation.
1922      ///
1923      /// A typical "join federation" flow would look as follows:
1924      /// ```no_run
1925      /// # use std::str::FromStr;
1926      /// # use fedimint_core::invite_code::InviteCode;
1927      /// # use fedimint_core::config::ClientConfig;
1928      /// # use fedimint_derive_secret::DerivableSecret;
1929      /// # use fedimint_client::{Client, ClientBuilder};
1930      /// # use fedimint_core::db::Database;
1931      /// # use fedimint_core::config::META_FEDERATION_NAME_KEY;
1932      /// #
1933      /// # #[tokio::main]
1934      /// # async fn main() {
1935      /// # let root_secret: DerivableSecret = unimplemented!();
1936      /// // Create a root secret, e.g. via fedimint-bip39, see also:
1937      /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md
1938      /// // let root_secret = …;
1939      ///
1940      /// // Get invite code from user
1941      /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm")
1942      ///     .expect("Invalid invite code");
1943      /// let config = fedimint_api_client::download_from_invite_code(&invite_code).await
1944      ///     .expect("Error downloading config");
1945      ///
1946      /// // Tell the user the federation name, bitcoin network
1947      /// // (e.g. from wallet module config), and other details
1948      /// // that are typically contained in the federation's
1949      /// // meta fields.
1950      ///
1951      /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet")
1952      /// //     .expect("Module not found")
1953      /// //     .network;
1954      ///
1955      /// println!(
1956      ///     "The federation name is: {}",
1957      ///     config.meta::<String>(META_FEDERATION_NAME_KEY)
1958      ///         .expect("Could not decode name field")
1959      ///         .expect("Name isn't set")
1960      /// );
1961      ///
1962      /// // Open the client's database, using the federation ID
1963      /// // as the DB name is a common pattern:
1964      ///
1965      /// // let db_path = format!("./path/to/db/{}", config.federation_id());
1966      /// // let db = RocksDb::open(db_path).expect("error opening DB");
1967      /// # let db: Database = unimplemented!();
1968      ///
1969      /// let client = Client::builder(db)
1970      ///     // Mount the modules the client should support:
1971      ///     // .with_module(LightningClientInit)
1972      ///     // .with_module(MintClientInit)
1973      ///     // .with_module(WalletClientInit::default())
1974      ///     .join(root_secret, config)
1975      ///     .await
1976      ///     .expect("Error joining federation");
1977      /// # }
1978      /// ```
1979      pub async fn join(
1980          self,
1981          root_secret: DerivableSecret,
1982          config: ClientConfig,
1983      ) -> anyhow::Result<ClientHandle> {
1984          self.init(root_secret, config, InitMode::Fresh).await
1985      }
1986  
1987      /// Download most recent valid backup found from the Federation
1988      pub async fn download_backup_from_federation(
1989          &self,
1990          root_secret: &DerivableSecret,
1991          config: &ClientConfig,
1992      ) -> anyhow::Result<Option<ClientBackup>> {
1993          let api = DynGlobalApi::from_config(config);
1994          Client::download_backup_from_federation_static(
1995              &api,
1996              &Self::federation_root_secret(root_secret, config),
1997              &self.decoders(config),
1998          )
1999          .await
2000      }
2001  
2002      /// Join a (possibly) previous joined Federation
2003      ///
2004      /// Unlike [`Self::join`], `recover` will run client module recovery for
2005      /// each client module attempting to recover any previous module state.
2006      ///
2007      /// Recovery process takes time during which each recovering client module
2008      /// will not be available for use.
2009      ///
2010      /// Calling `recovery` with a `root_secret` that was not actually previous
2011      /// used in a given Federation is safe.
2012      pub async fn recover(
2013          self,
2014          root_secret: DerivableSecret,
2015          config: ClientConfig,
2016          backup: Option<ClientBackup>,
2017      ) -> anyhow::Result<ClientHandle> {
2018          let client = self
2019              .init(
2020                  root_secret,
2021                  config,
2022                  InitMode::Recover {
2023                      snapshot: backup.clone(),
2024                  },
2025              )
2026              .await?;
2027  
2028          Ok(client)
2029      }
2030  
2031      pub async fn open(self, root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> {
2032          let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else {
2033              bail!("Client database not initialized")
2034          };
2035          let stopped = self.stopped;
2036  
2037          let client = self.build_stopped(root_secret, config).await?;
2038          if !stopped {
2039              client.as_inner().start_executor().await;
2040          }
2041          Ok(client)
2042      }
2043  
2044      /// Build a [`Client`] but do not start the executor
2045      async fn build(
2046          self,
2047          root_secret: DerivableSecret,
2048          config: ClientConfig,
2049          stopped: bool,
2050      ) -> anyhow::Result<ClientHandle> {
2051          let client = self.build_stopped(root_secret, config).await?;
2052          if !stopped {
2053              client.as_inner().start_executor().await;
2054          }
2055  
2056          Ok(client)
2057      }
2058  
2059      /// Build a [`Client`] but do not start the executor
2060      async fn build_stopped(
2061          self,
2062          root_secret: DerivableSecret,
2063          config: ClientConfig,
2064      ) -> anyhow::Result<ClientHandle> {
2065          let decoders = self.decoders(&config);
2066          let config = Self::config_decoded(config, &decoders)?;
2067          let fed_id = config.calculate_federation_id();
2068          let db = self.db_no_decoders.with_decoders(decoders.clone());
2069          let api = if let Some(admin_creds) = self.admin_creds.as_ref() {
2070              DynGlobalApi::from_config_admin(&config, admin_creds.peer_id)
2071          } else {
2072              DynGlobalApi::from_config(&config)
2073          };
2074          let task_group = TaskGroup::new();
2075  
2076          // Migrate the database before interacting with it in case any on-disk data
2077          // structures have changed.
2078          self.migrate_database(&db).await?;
2079  
2080          let init_state = Self::load_init_state(&db).await;
2081  
2082          let primary_module_instance = self
2083              .primary_module_instance
2084              .ok_or(anyhow!("No primary module instance id was provided"))?;
2085  
2086          let notifier = Notifier::new(db.clone());
2087  
2088          let common_api_versions = Client::load_and_refresh_common_api_version_static(
2089              &config,
2090              &self.module_inits,
2091              &api,
2092              &db,
2093              &task_group,
2094          )
2095          .await
2096          .inspect_err(|err| {
2097              warn!(target: LOG_CLIENT, %err, "Failed to discover initial API version to use.");
2098          })
2099          .unwrap_or(ApiVersionSet {
2100              core: ApiVersion::new(0, 0),
2101              // This will cause all modules to skip initialization
2102              modules: Default::default(),
2103          });
2104  
2105          debug!(?common_api_versions, "Completed api version negotiation");
2106  
2107          let mut module_recoveries: BTreeMap<
2108              ModuleInstanceId,
2109              Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
2110          > = Default::default();
2111          let mut module_recovery_progress_receivers: BTreeMap<
2112              ModuleInstanceId,
2113              watch::Receiver<RecoveryProgress>,
2114          > = Default::default();
2115  
2116          let final_client = FinalClient::default();
2117  
2118          let root_secret = Self::federation_root_secret(&root_secret, &config);
2119  
2120          let modules = {
2121              let mut modules = ClientModuleRegistry::default();
2122              for (module_instance_id, module_config) in config.modules.clone() {
2123                  let kind = module_config.kind().clone();
2124                  let Some(module_init) = self.module_inits.get(&kind).cloned() else {
2125                      debug!("Module kind {kind} of instance {module_instance_id} not found in module gens, skipping");
2126                      continue;
2127                  };
2128  
2129                  let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
2130                  else {
2131                      warn!("Module kind {kind} of instance {module_instance_id} has not compatible api version, skipping");
2132                      continue;
2133                  };
2134  
2135                  // since the exact logic of when to start recovery is a bit gnarly,
2136                  // the recovery call is extracted here.
2137                  let start_module_recover_fn =
2138                      |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
2139                          let module_config = module_config.clone();
2140                          let num_peers = NumPeers::from(config.global.api_endpoints.len());
2141                          let db = db.clone();
2142                          let kind = kind.clone();
2143                          let notifier = notifier.clone();
2144                          let api = api.clone();
2145                          let root_secret = root_secret.clone();
2146                          let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
2147                          let final_client = final_client.clone();
2148                          let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
2149                          let module_init = module_init.clone();
2150                          (
2151                              Box::pin(async move {
2152                                  module_init
2153                                          .recover(
2154                                              final_client.clone(),
2155                                              fed_id,
2156                                          num_peers,
2157                                              module_config.clone(),
2158                                              db.clone(),
2159                                              module_instance_id,
2160                                              common_api_versions.core,
2161                                              api_version,
2162                                              root_secret.derive_module_secret(module_instance_id),
2163                                              notifier.clone(),
2164                                              api.clone(),
2165                                          admin_auth,
2166                                              snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id).to_owned()),
2167                                              progress_tx,
2168                                          )
2169                                          .await
2170                                          .map_err(|err| {
2171                                              warn!(
2172                                                  module_id = module_instance_id, %kind, %err, "Module failed to recover"
2173                                              );
2174                                              err
2175                                          })
2176                              }),
2177                              progress_rx,
2178                          )
2179                      };
2180  
2181                  let recovery = if let Some(snapshot) = init_state.does_require_recovery() {
2182                      if let Some(module_recovery_state) = db
2183                          .begin_transaction_nc()
2184                          .await
2185                          .get_value(&ClientModuleRecovery { module_instance_id })
2186                          .await
2187                      {
2188                          if module_recovery_state.is_done() {
2189                              debug!(
2190                                  id = %module_instance_id,
2191                                  %kind, "Module recovery already complete"
2192                              );
2193                              None
2194                          } else {
2195                              debug!(
2196                                  id = %module_instance_id,
2197                                  %kind,
2198                                  progress = %module_recovery_state.progress,
2199                                  "Starting module recovery with an existing progress"
2200                              );
2201                              Some(start_module_recover_fn(
2202                                  snapshot,
2203                                  module_recovery_state.progress,
2204                              ))
2205                          }
2206                      } else {
2207                          debug!(
2208                              id = %module_instance_id,
2209                              %kind, "Starting new module recovery"
2210                          );
2211                          Some(start_module_recover_fn(snapshot, RecoveryProgress::none()))
2212                      }
2213                  } else {
2214                      None
2215                  };
2216  
2217                  if let Some((recovery, recovery_progress_rx)) = recovery {
2218                      module_recoveries.insert(module_instance_id, recovery);
2219                      module_recovery_progress_receivers
2220                          .insert(module_instance_id, recovery_progress_rx);
2221                  } else {
2222                      let module = module_init
2223                          .init(
2224                              final_client.clone(),
2225                              fed_id,
2226                              config.global.api_endpoints.len(),
2227                              module_config,
2228                              db.clone(),
2229                              module_instance_id,
2230                              common_api_versions.core,
2231                              api_version,
2232                              // This is a divergence from the legacy client, where the child secret
2233                              // keys were derived using *module kind*-specific derivation paths.
2234                              // Since the new client has to support multiple, segregated modules of
2235                              // the same kind we have to use the instance id instead.
2236                              root_secret.derive_module_secret(module_instance_id),
2237                              notifier.clone(),
2238                              api.clone(),
2239                              self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
2240                              task_group.clone(),
2241                          )
2242                          .await?;
2243  
2244                      if primary_module_instance == module_instance_id
2245                          && !module.supports_being_primary()
2246                      {
2247                          bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module");
2248                      }
2249  
2250                      modules.register_module(module_instance_id, kind, module);
2251                  }
2252              }
2253              modules
2254          };
2255  
2256          if init_state.is_pending() && module_recoveries.is_empty() {
2257              let mut dbtx = db.begin_transaction().await;
2258              dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
2259                  .await;
2260              dbtx.commit_tx().await;
2261          }
2262  
2263          let executor = {
2264              let mut executor_builder = Executor::builder();
2265              executor_builder
2266                  .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
2267  
2268              for (module_instance_id, _, module) in modules.iter_modules() {
2269                  executor_builder.with_module_dyn(module.context(module_instance_id));
2270              }
2271  
2272              for (module_instance_id, _) in module_recoveries.iter() {
2273                  executor_builder.with_valid_module_id(*module_instance_id);
2274              }
2275  
2276              executor_builder
2277                  .build(db.clone(), notifier, task_group.clone())
2278                  .await
2279          };
2280  
2281          let recovery_receiver_init_val = BTreeMap::from_iter(
2282              module_recovery_progress_receivers
2283                  .iter()
2284                  .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow())),
2285          );
2286          let (client_recovery_progress_sender, client_recovery_progress_receiver) =
2287              watch::channel(recovery_receiver_init_val);
2288  
2289          let client_inner = Arc::new(Client {
2290              config: config.clone(),
2291              decoders,
2292              db: db.clone(),
2293              federation_id: fed_id,
2294              federation_meta: config.global.meta,
2295              primary_module_instance,
2296              modules,
2297              module_inits: self.module_inits.clone(),
2298              executor,
2299              api,
2300              secp_ctx: Secp256k1::new(),
2301              root_secret,
2302              task_group,
2303              operation_log: OperationLog::new(db),
2304              client_recovery_progress_receiver,
2305              meta_service: self.meta_service,
2306          });
2307          client_inner
2308              .task_group
2309              .spawn_cancellable("MetaService::update_continuously", {
2310                  let client_inner = client_inner.clone();
2311                  async move {
2312                      client_inner
2313                          .meta_service
2314                          .update_continuously(&client_inner)
2315                          .await
2316                  }
2317              });
2318  
2319          let client_arc = ClientHandle::new(client_inner);
2320  
2321          final_client.set(client_arc.downgrade());
2322  
2323          if !module_recoveries.is_empty() {
2324              client_arc
2325                  .spawn_module_recoveries_task(
2326                      client_recovery_progress_sender,
2327                      module_recoveries,
2328                      module_recovery_progress_receivers,
2329                  )
2330                  .await;
2331          }
2332  
2333          Ok(client_arc)
2334      }
2335  
2336      async fn load_init_state(db: &Database) -> InitState {
2337          let mut dbtx = db.begin_transaction_nc().await;
2338          dbtx.get_value(&ClientInitStateKey)
2339              .await
2340              .unwrap_or_else(|| {
2341                  // could be turned in a hard error in the future, but for now
2342                  // no need to break backward compat.
2343                  warn!("Client missing ClientRequiresRecovery: assuming complete");
2344                  db::InitState::Complete(db::InitModeComplete::Fresh)
2345              })
2346      }
2347  
2348      fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
2349          let mut decoders = client_decoders(
2350              &self.module_inits,
2351              config
2352                  .modules
2353                  .iter()
2354                  .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
2355          );
2356  
2357          decoders.register_module(
2358              TRANSACTION_SUBMISSION_MODULE_INSTANCE,
2359              ModuleKind::from_static_str("tx_submission"),
2360              tx_submission_sm_decoder(),
2361          );
2362  
2363          decoders
2364      }
2365  
2366      fn config_decoded(
2367          config: ClientConfig,
2368          decoders: &ModuleDecoderRegistry,
2369      ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
2370          config.clone().redecode_raw(decoders)
2371      }
2372  
2373      /// Re-derive client's root_secret using the federation ID. This eliminates
2374      /// the possibility of having the same client root_secret across
2375      /// multiple federations.
2376      fn federation_root_secret(
2377          root_secret: &DerivableSecret,
2378          config: &ClientConfig,
2379      ) -> DerivableSecret {
2380          root_secret.federation_key(&config.global.calculate_federation_id())
2381      }
2382  }
2383  
2384  /// Fetches the encoded client secret from the database and decodes it.
2385  /// If an encoded client secret is not present in the database, or if
2386  /// decoding fails, an error is returned.
2387  pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
2388      let mut tx = db.begin_transaction().await;
2389      let client_secret = tx.get_value(&EncodedClientSecretKey).await;
2390      tx.commit_tx().await;
2391  
2392      match client_secret {
2393          Some(client_secret) => {
2394              T::consensus_decode(&mut client_secret.as_slice(), &Default::default())
2395                  .map_err(|e| anyhow!("Decoding failed: {e}"))
2396          }
2397          None => bail!("Encoded client secret not present in DB"),
2398      }
2399  }
2400  
2401  pub fn client_decoders<'a>(
2402      registry: &ModuleInitRegistry<DynClientModuleInit>,
2403      module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2404  ) -> ModuleDecoderRegistry {
2405      let mut modules = BTreeMap::new();
2406      for (id, kind) in module_kinds {
2407          let Some(init) = registry.get(kind) else {
2408              debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2409              continue;
2410          };
2411  
2412          modules.insert(
2413              id,
2414              (
2415                  kind.clone(),
2416                  IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2417              ),
2418          );
2419      }
2420      ModuleDecoderRegistry::from(modules)
2421  }