lib.rs
1 //! # Client library for fedimintd 2 //! 3 //! This library provides a client interface to build module clients that can be 4 //! plugged together into a fedimint client that exposes a high-level interface 5 //! for application authors to integrate with. 6 //! 7 //! ## Module Clients 8 //! Module clients have to at least implement the [`module::ClientModule`] trait 9 //! and a factory struct implementing [`module::init::ClientModuleInit`]. The 10 //! `ClientModule` trait defines the module types (tx inputs, outputs, etc.) as 11 //! well as the module's [state machines](sm::State). 12 //! 13 //! ### State machines 14 //! State machines are spawned when starting operations and drive them 15 //! forward in the background. All module state machines are run by a central 16 //! [`sm::Executor`]. This means typically starting an operation shall return 17 //! instantly. 18 //! 19 //! For example when doing a deposit the function starting it would immediately 20 //! return a deposit address and a [`OperationId`] (important concept, highly 21 //! recommended to read the docs) while spawning a state machine checking the 22 //! blockchain for incoming bitcoin transactions. The progress of these state 23 //! machines can then be *observed* using the operation id, but no further user 24 //! interaction is required to drive them forward. 25 //! 26 //! ### State Machine Contexts 27 //! State machines have access to both a [global 28 //! context](`DynGlobalClientContext`) as well as to a [module-specific 29 //! context](module::ClientModule::context). 30 //! 31 //! The global context provides access to the federation API and allows to claim 32 //! module outputs (and transferring the value into the client's wallet), which 33 //! can be used for refunds. 34 //! 35 //! The client-specific context can be used for other purposes, such as 36 //! supplying config to the state transitions or giving access to other APIs 37 //! (e.g. LN gateway in case of the lightning module). 38 //! 39 //! ### Extension traits 40 //! The modules themselves can only create inputs and outputs that then have to 41 //! be combined into transactions by the user and submitted via 42 //! [`Client::finalize_and_submit_transaction`]. To make this easier most module 43 //! client implementations contain an extension trait which is implemented for 44 //! [`Client`] and allows to create the most typical fedimint transactions with 45 //! a single function call. 46 //! 47 //! To observe the progress each high level operation function should be 48 //! accompanied by one returning a stream of high-level operation updates. 49 //! Internally that stream queries the state machines belonging to the 50 //! operation to determine the high-level operation state. 51 //! 52 //! ### Primary Modules 53 //! Not all modules have the ability to hold money for long. E.g. the lightning 54 //! module and its smart contracts are only used to incentivize LN payments, not 55 //! to hold money. The mint module on the other hand holds e-cash note and can 56 //! thus be used to fund transactions and to absorb change. Module clients with 57 //! this ability should implement [`ClientModule:: supports_being_primary`] and 58 //! related methods. 59 //! 60 //! For a example of a client module see [the mint client](https://github.com/fedimint/fedimint/blob/master/modules/fedimint-mint-client/src/lib.rs). 61 //! 62 //! ## Client 63 //! The [`Client`] struct is the main entry point for application authors. It is 64 //! constructed using its builder which can be obtained via [`Client::builder`]. 65 //! The supported module clients have to be chosen at compile time while the 66 //! actually available ones will be determined by the config loaded at runtime. 67 //! 68 //! For a hacky instantiation of a complete client see the [`ng` subcommand of `fedimint-cli`](https://github.com/fedimint/fedimint/blob/55f9d88e17d914b92a7018de677d16e57ed42bf6/fedimint-cli/src/ng.rs#L56-L73). 69 70 use std::collections::{BTreeMap, HashSet}; 71 use std::fmt::{Debug, Formatter}; 72 use std::ops::{self, Range}; 73 use std::pin::Pin; 74 use std::sync::{Arc, Weak}; 75 use std::time::Duration; 76 77 use anyhow::{anyhow, bail, ensure, Context}; 78 use async_stream::stream; 79 use backup::ClientBackup; 80 use db::{ 81 apply_migrations_client, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, 82 ClientConfigKeyPrefix, ClientInitStateKey, ClientModuleRecovery, EncodedClientSecretKey, 83 InitMode, 84 }; 85 use envs::get_discover_api_version_timeout; 86 use fedimint_api_client::api::{ApiVersionSet, DynGlobalApi, DynModuleApi, IGlobalFederationApi}; 87 use fedimint_core::config::{ 88 ClientConfig, ClientModuleConfig, FederationId, JsonClientConfig, JsonWithKind, 89 ModuleInitRegistry, 90 }; 91 use fedimint_core::core::{ 92 DynInput, DynOutput, IInput, IOutput, ModuleInstanceId, ModuleKind, OperationId, 93 }; 94 use fedimint_core::db::{ 95 AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped, 96 }; 97 use fedimint_core::encoding::{Decodable, Encodable}; 98 use fedimint_core::module::registry::ModuleDecoderRegistry; 99 use fedimint_core::module::{ 100 ApiAuth, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary, SupportedCoreApiVersions, 101 SupportedModuleApiVersions, 102 }; 103 use fedimint_core::task::{MaybeSend, MaybeSync, TaskGroup}; 104 use fedimint_core::transaction::Transaction; 105 use fedimint_core::util::{BoxStream, NextOrPending}; 106 use fedimint_core::{ 107 apply, async_trait_maybe_send, dyn_newtype_define, fedimint_build_code_version_env, 108 maybe_add_send, maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId, 109 TransactionId, 110 }; 111 pub use fedimint_derive_secret as derivable_secret; 112 use fedimint_derive_secret::DerivableSecret; 113 use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY}; 114 use futures::{Future, Stream, StreamExt}; 115 use meta::{LegacyMetaSource, MetaService}; 116 use module::recovery::RecoveryProgress; 117 use module::{DynClientModule, FinalClient}; 118 use rand::thread_rng; 119 use secp256k1_zkp::{PublicKey, Secp256k1}; 120 use secret::{DeriveableSecretClientExt, PlainRootSecretStrategy, RootSecretStrategy as _}; 121 use thiserror::Error; 122 #[cfg(not(target_family = "wasm"))] 123 use tokio::runtime::{Handle as RuntimeHandle, RuntimeFlavor}; 124 use tokio::sync::watch; 125 use tokio_stream::wrappers::WatchStream; 126 use tracing::{debug, error, info, warn}; 127 128 use crate::backup::Metadata; 129 use crate::db::{ClientMetadataKey, ClientModuleRecoveryState, InitState, OperationLogKey}; 130 use crate::module::init::{ 131 ClientModuleInit, ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit, 132 }; 133 use crate::module::{ClientModule, ClientModuleRegistry, IClientModule, StateGenerator}; 134 use crate::oplog::OperationLog; 135 use crate::sm::executor::{ 136 ActiveOperationStateKeyPrefix, ContextGen, InactiveOperationStateKeyPrefix, 137 }; 138 use crate::sm::{ 139 ClientSMDatabaseTransaction, DynState, Executor, IState, Notifier, OperationState, State, 140 }; 141 use crate::transaction::{ 142 tx_submission_sm_decoder, ClientInput, ClientOutput, TransactionBuilder, TxSubmissionContext, 143 TxSubmissionStates, TRANSACTION_SUBMISSION_MODULE_INSTANCE, 144 }; 145 146 /// Client backup 147 pub mod backup; 148 /// Database keys used by the client 149 pub mod db; 150 /// Environment variables 151 pub mod envs; 152 /// Module client interface definitions 153 pub mod module; 154 /// Operation log subsystem of the client 155 pub mod oplog; 156 /// Secret handling & derivation 157 pub mod secret; 158 /// Client state machine interfaces and executor implementation 159 pub mod sm; 160 /// Structs and interfaces to construct Fedimint transactions 161 pub mod transaction; 162 163 /// Management of meta fields 164 pub mod meta; 165 166 pub type InstancelessDynClientInput = ClientInput< 167 Box<maybe_add_send_sync!(dyn IInput + 'static)>, 168 Box<maybe_add_send_sync!(dyn IState + 'static)>, 169 >; 170 171 pub type InstancelessDynClientOutput = ClientOutput< 172 Box<maybe_add_send_sync!(dyn IOutput + 'static)>, 173 Box<maybe_add_send_sync!(dyn IState + 'static)>, 174 >; 175 176 #[derive(Debug, Error)] 177 pub enum AddStateMachinesError { 178 #[error("State already exists in database")] 179 StateAlreadyExists, 180 #[error("Got {0}")] 181 Other(#[from] anyhow::Error), 182 } 183 184 pub enum DiscoverCommonApiVersionMode { 185 /// Get the response from only a few peers, or until a timeout 186 Fast, 187 /// Try to get a response from all peers, or until a timeout 188 Full, 189 } 190 191 pub type AddStateMachinesResult = Result<(), AddStateMachinesError>; 192 193 #[apply(async_trait_maybe_send!)] 194 pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static { 195 /// Returned a reference client's module API client, so that module-specific 196 /// calls can be made 197 fn module_api(&self) -> DynModuleApi; 198 199 fn client_config(&self) -> &ClientConfig; 200 201 /// Returns a reference to the client's federation API client. The provided 202 /// interface [`IGlobalFederationApi`] typically does not provide the 203 /// necessary functionality, for this extension traits like 204 /// [`fedimint_api_client::api::IGlobalFederationApi`] have to be used. 205 // TODO: Could be removed in favor of client() except for testing 206 fn api(&self) -> &DynGlobalApi; 207 208 fn decoders(&self) -> &ModuleDecoderRegistry; 209 210 /// This function is mostly meant for internal use, you are probably looking 211 /// for [`DynGlobalClientContext::claim_input`]. 212 /// Returns transaction id of the funding transaction and an optional 213 /// `OutPoint` that represents change if change was added. 214 async fn claim_input_dyn( 215 &self, 216 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 217 input: InstancelessDynClientInput, 218 ) -> (TransactionId, Vec<OutPoint>); 219 220 /// This function is mostly meant for internal use, you are probably looking 221 /// for [`DynGlobalClientContext::fund_output`]. 222 /// Returns transaction id of the funding transaction and an optional 223 /// `OutPoint` that represents change if change was added. 224 async fn fund_output_dyn( 225 &self, 226 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 227 output: InstancelessDynClientOutput, 228 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)>; 229 230 /// Adds a state machine to the executor. 231 async fn add_state_machine_dyn( 232 &self, 233 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 234 sm: Box<maybe_add_send_sync!(dyn IState)>, 235 ) -> AddStateMachinesResult; 236 237 async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>>; 238 } 239 240 #[apply(async_trait_maybe_send!)] 241 impl IGlobalClientContext for () { 242 fn module_api(&self) -> DynModuleApi { 243 unimplemented!("fake implementation, only for tests"); 244 } 245 246 fn client_config(&self) -> &ClientConfig { 247 unimplemented!("fake implementation, only for tests"); 248 } 249 250 fn api(&self) -> &DynGlobalApi { 251 unimplemented!("fake implementation, only for tests"); 252 } 253 254 fn decoders(&self) -> &ModuleDecoderRegistry { 255 unimplemented!("fake implementation, only for tests"); 256 } 257 258 async fn claim_input_dyn( 259 &self, 260 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 261 _input: InstancelessDynClientInput, 262 ) -> (TransactionId, Vec<OutPoint>) { 263 unimplemented!("fake implementation, only for tests"); 264 } 265 266 async fn fund_output_dyn( 267 &self, 268 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 269 _output: InstancelessDynClientOutput, 270 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> { 271 unimplemented!("fake implementation, only for tests"); 272 } 273 274 async fn add_state_machine_dyn( 275 &self, 276 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 277 _sm: Box<maybe_add_send_sync!(dyn IState)>, 278 ) -> AddStateMachinesResult { 279 unimplemented!("fake implementation, only for tests"); 280 } 281 282 async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>> { 283 unimplemented!("fake implementation, only for tests"); 284 } 285 } 286 287 dyn_newtype_define! { 288 /// Global state and functionality provided to all state machines running in the 289 /// client 290 #[derive(Clone)] 291 pub DynGlobalClientContext(Arc<IGlobalClientContext>) 292 } 293 294 impl DynGlobalClientContext { 295 pub fn new_fake() -> Self { 296 DynGlobalClientContext::from(()) 297 } 298 299 pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> { 300 self.transaction_update_stream() 301 .await 302 .filter_map(|tx_update| { 303 std::future::ready(match tx_update.state { 304 TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())), 305 TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => { 306 Some(Err(submit_error)) 307 } 308 _ => None, 309 }) 310 }) 311 .next_or_pending() 312 .await 313 } 314 315 /// Creates a transaction that with an output of the primary module, 316 /// claiming the given input and transferring its value into the client's 317 /// wallet. 318 /// 319 /// The transactions submission state machine as well as the state 320 /// machines responsible for the generated output are generated 321 /// automatically. The caller is responsible for the input's state machines, 322 /// should there be any required. 323 pub async fn claim_input<I, S>( 324 &self, 325 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 326 input: ClientInput<I, S>, 327 ) -> (TransactionId, Vec<OutPoint>) 328 where 329 I: IInput + MaybeSend + MaybeSync + 'static, 330 S: IState + MaybeSend + MaybeSync + 'static, 331 { 332 self.claim_input_dyn( 333 dbtx, 334 InstancelessDynClientInput { 335 input: Box::new(input.input), 336 keys: input.keys, 337 amount: input.amount, 338 state_machines: states_to_instanceless_dyn(input.state_machines), 339 }, 340 ) 341 .await 342 } 343 344 /// Creates a transaction with the supplied output and funding added by the 345 /// primary module if possible. If the primary module does not have the 346 /// required funds this function fails. 347 /// 348 /// The transactions submission state machine as well as the state machines 349 /// for the funding inputs are generated automatically. The caller is 350 /// responsible for the output's state machines, should there be any 351 /// required. 352 pub async fn fund_output<O, S>( 353 &self, 354 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 355 output: ClientOutput<O, S>, 356 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> 357 where 358 O: IOutput + MaybeSend + MaybeSync + 'static, 359 S: IState + MaybeSend + MaybeSync + 'static, 360 { 361 self.fund_output_dyn( 362 dbtx, 363 InstancelessDynClientOutput { 364 output: Box::new(output.output), 365 amount: output.amount, 366 state_machines: states_to_instanceless_dyn(output.state_machines), 367 }, 368 ) 369 .await 370 } 371 372 /// Allows adding state machines from inside a transition to the executor. 373 /// The added state machine belongs to the same module instance as the state 374 /// machine from inside which it was spawned. 375 pub async fn add_state_machine<S>( 376 &self, 377 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 378 sm: S, 379 ) -> AddStateMachinesResult 380 where 381 S: State + MaybeSend + MaybeSync + 'static, 382 { 383 self.add_state_machine_dyn(dbtx, box_up_state(sm)).await 384 } 385 } 386 387 fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>( 388 state_gen: StateGenerator<S>, 389 ) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> { 390 Arc::new(move |txid, out_idx| { 391 let states: Vec<S> = state_gen(txid, out_idx); 392 states 393 .into_iter() 394 .map(|state| box_up_state(state)) 395 .collect() 396 }) 397 } 398 399 /// Not sure why I couldn't just directly call `Box::new` ins 400 /// [`states_to_instanceless_dyn`], but this fixed it. 401 fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> { 402 Box::new(state) 403 } 404 405 impl<T> From<Arc<T>> for DynGlobalClientContext 406 where 407 T: IGlobalClientContext, 408 { 409 fn from(inner: Arc<T>) -> Self { 410 DynGlobalClientContext { inner } 411 } 412 } 413 414 // TODO: impl `Debug` for `Client` and derive here 415 impl Debug for Client { 416 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 417 write!(f, "Client") 418 } 419 } 420 421 /// Global state given to a specific client module and state. It is aware inside 422 /// which module instance and operation it is used and to avoid module being 423 /// aware of their instance id etc. 424 #[derive(Clone, Debug)] 425 struct ModuleGlobalClientContext { 426 client: Arc<Client>, 427 module_instance_id: ModuleInstanceId, 428 operation: OperationId, 429 } 430 431 #[apply(async_trait_maybe_send!)] 432 impl IGlobalClientContext for ModuleGlobalClientContext { 433 fn module_api(&self) -> DynModuleApi { 434 self.api().with_module(self.module_instance_id) 435 } 436 437 fn api(&self) -> &DynGlobalApi { 438 &self.client.api 439 } 440 441 fn decoders(&self) -> &ModuleDecoderRegistry { 442 self.client.decoders() 443 } 444 445 fn client_config(&self) -> &ClientConfig { 446 self.client.config() 447 } 448 449 async fn claim_input_dyn( 450 &self, 451 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 452 input: InstancelessDynClientInput, 453 ) -> (TransactionId, Vec<OutPoint>) { 454 let instance_input = ClientInput { 455 input: DynInput::from_parts(self.module_instance_id, input.input), 456 keys: input.keys, 457 amount: input.amount, 458 state_machines: states_add_instance(self.module_instance_id, input.state_machines), 459 }; 460 461 self.client 462 .finalize_and_submit_transaction_inner( 463 &mut dbtx.global_tx().to_ref_nc(), 464 self.operation, 465 TransactionBuilder::new().with_input(instance_input), 466 ) 467 .await 468 .expect("Can only fail if additional funding is needed") 469 } 470 471 async fn fund_output_dyn( 472 &self, 473 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 474 output: InstancelessDynClientOutput, 475 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> { 476 let instance_output = ClientOutput { 477 output: DynOutput::from_parts(self.module_instance_id, output.output), 478 amount: output.amount, 479 state_machines: states_add_instance(self.module_instance_id, output.state_machines), 480 }; 481 482 self.client 483 .finalize_and_submit_transaction_inner( 484 &mut dbtx.global_tx().to_ref_nc(), 485 self.operation, 486 TransactionBuilder::new().with_output(instance_output), 487 ) 488 .await 489 } 490 491 async fn add_state_machine_dyn( 492 &self, 493 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, 494 sm: Box<maybe_add_send_sync!(dyn IState)>, 495 ) -> AddStateMachinesResult { 496 let state = DynState::from_parts(self.module_instance_id, sm); 497 498 self.client 499 .executor 500 .add_state_machines_dbtx(&mut dbtx.global_tx().to_ref_nc(), vec![state]) 501 .await 502 } 503 504 async fn transaction_update_stream(&self) -> BoxStream<OperationState<TxSubmissionStates>> { 505 self.client.transaction_update_stream(self.operation).await 506 } 507 } 508 509 fn states_add_instance( 510 module_instance_id: ModuleInstanceId, 511 state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>, 512 ) -> StateGenerator<DynState> { 513 Arc::new(move |txid, out_idx| { 514 let states = state_gen(txid, out_idx); 515 Iterator::collect( 516 states 517 .into_iter() 518 .map(|state| DynState::from_parts(module_instance_id, state)), 519 ) 520 }) 521 } 522 523 /// User handle to [`Client`] 524 /// 525 /// On the drop of [`ClientHandle`] the client will be shut-down, and resources 526 /// it used freed. 527 /// 528 /// Notably it [`ops::Deref`]s to the [`Client`] where most 529 /// methods live. 530 /// 531 /// Put this in an Arc to clone it. 532 #[derive(Debug)] 533 pub struct ClientHandle { 534 inner: Option<Arc<Client>>, 535 } 536 537 pub type ClientHandleArc = Arc<ClientHandle>; 538 539 impl ClientHandle { 540 /// Create 541 fn new(inner: Arc<Client>) -> Self { 542 ClientHandle { 543 inner: inner.into(), 544 } 545 } 546 547 fn as_inner(&self) -> &Arc<Client> { 548 self.inner.as_ref().expect("Inner always set") 549 } 550 551 pub async fn start_executor(&self) { 552 self.as_inner().start_executor().await 553 } 554 555 /// Shutdown the client. 556 pub async fn shutdown(mut self) { 557 self.shutdown_inner().await 558 } 559 560 async fn shutdown_inner(&mut self) { 561 let Some(inner) = self.inner.take() else { 562 error!("ClientHandleShared::shutdown called twice"); 563 return; 564 }; 565 inner.executor.stop_executor(); 566 let db = inner.db.clone(); 567 debug!(target: LOG_CLIENT, "Waiting for client task group to shut down"); 568 if let Err(err) = inner 569 .task_group 570 .clone() 571 .shutdown_join_all(Some(Duration::from_secs(30))) 572 .await 573 { 574 warn!(target: LOG_CLIENT, %err, "Error waiting for client task group to shut down"); 575 } 576 577 let client_strong_count = Arc::strong_count(&inner); 578 debug!(target: LOG_CLIENT, "Dropping last handle to Client"); 579 // We are sure that no background tasks are running in the client anymore, so we 580 // can drop the (usually) last inner reference. 581 drop(inner); 582 583 if client_strong_count != 1 { 584 debug!(target: LOG_CLIENT, count = client_strong_count - 1, LOG_CLIENT, "External Client references remaining after last handle dropped"); 585 } 586 587 let db_strong_count = db.strong_count(); 588 if db_strong_count != 1 { 589 debug!(target: LOG_CLIENT, count = db_strong_count - 1, "External DB references remaining after last handle dropped"); 590 } 591 } 592 593 /// Restart the client 594 /// 595 /// Returns false if there are other clones of [`ClientHandle`], or starting 596 /// the client again failed for some reason. 597 /// 598 /// Notably it will re-use the original [`Database`] handle, and not attempt 599 /// to open it again. 600 pub async fn restart(self) -> anyhow::Result<ClientHandle> { 601 let (builder, config, root_secret) = { 602 let client = self 603 .inner 604 .as_ref() 605 .ok_or_else(|| anyhow::format_err!("Already stopped"))?; 606 let builder = ClientBuilder::from_existing(client); 607 let config = client.config.clone(); 608 let root_secret = client.root_secret.clone(); 609 610 (builder, config, root_secret) 611 }; 612 self.shutdown().await; 613 614 builder.build(root_secret, config, false).await 615 } 616 } 617 618 impl ops::Deref for ClientHandle { 619 type Target = Client; 620 621 fn deref(&self) -> &Self::Target { 622 self.inner.as_ref().expect("Must have inner client set") 623 } 624 } 625 626 impl ClientHandle { 627 pub(crate) fn downgrade(&self) -> ClientWeak { 628 ClientWeak { 629 inner: Arc::downgrade(self.inner.as_ref().expect("Inner always set")), 630 } 631 } 632 } 633 634 /// Internal self-reference to [`Client`] 635 #[derive(Debug, Clone)] 636 pub(crate) struct ClientStrong { 637 inner: Arc<Client>, 638 } 639 640 impl ops::Deref for ClientStrong { 641 type Target = Client; 642 643 fn deref(&self) -> &Self::Target { 644 self.inner.deref() 645 } 646 } 647 648 /// Like [`ClientStrong`] but using a [`Weak`] handle to [`Client`] 649 /// 650 /// This is not meant to be used by external code. 651 #[derive(Debug, Clone)] 652 pub(crate) struct ClientWeak { 653 inner: Weak<Client>, 654 } 655 656 impl ClientWeak { 657 pub fn upgrade(&self) -> Option<ClientStrong> { 658 Weak::upgrade(&self.inner).map(|inner| ClientStrong { inner }) 659 } 660 } 661 662 /// We need a separate drop implementation for `Client` that triggers 663 /// `Executor::stop_executor` even though the `Drop` implementation of 664 /// `ExecutorInner` should already take care of that. The reason is that as long 665 /// as the executor task is active there may be a cycle in the 666 /// `Arc<Client>`s such that at least one `Executor` never gets dropped. 667 impl Drop for ClientHandle { 668 fn drop(&mut self) { 669 if self.inner.is_none() { 670 return; 671 } 672 673 // We can't use block_on in single-threaded mode or wasm 674 #[cfg(target_family = "wasm")] 675 let can_block = false; 676 #[cfg(not(target_family = "wasm"))] 677 // nosemgrep: ban-raw-block-on 678 let can_block = RuntimeHandle::current().runtime_flavor() != RuntimeFlavor::CurrentThread; 679 if !can_block { 680 let inner = self.inner.take().expect("Must have inner client set"); 681 inner.executor.stop_executor(); 682 if cfg!(target_family = "wasm") { 683 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on wasm, call ClientHandle::shutdown manually."); 684 } else { 685 error!(target: LOG_CLIENT, "Automatic client shutdown is not possible on current thread runtime, call ClientHandle::shutdown manually."); 686 } 687 return; 688 } 689 690 debug!(target: LOG_CLIENT, "Shutting down the Client on last handle drop"); 691 #[cfg(not(target_family = "wasm"))] 692 runtime::block_in_place(|| { 693 runtime::block_on(self.shutdown_inner()); 694 }); 695 } 696 } 697 698 /// List of core api versions supported by the implementation. 699 /// Notably `major` version is the one being supported, and corresponding 700 /// `minor` version is the one required (for given `major` version). 701 const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] = 702 &[ApiVersion { major: 0, minor: 0 }]; 703 704 pub type ModuleGlobalContextGen = ContextGen; 705 706 /// Resources particular to a module instance 707 pub struct ClientModuleInstance<'m, M: ClientModule> { 708 /// Instance id of the module 709 pub id: ModuleInstanceId, 710 /// Module-specific DB 711 pub db: Database, 712 /// Module-specific API 713 pub api: DynModuleApi, 714 715 module: &'m M, 716 } 717 718 impl<'m, M> ops::Deref for ClientModuleInstance<'m, M> 719 where 720 M: ClientModule, 721 { 722 type Target = M; 723 724 fn deref(&self) -> &Self::Target { 725 self.module 726 } 727 } 728 729 /// Main client type 730 /// 731 /// A handle and API to interacting with a single Federation. 732 /// 733 /// Under the hood managing service tasks, state machines, 734 /// database and other resources required. 735 /// 736 /// This type is shared externally and internally, and 737 /// [`ClientHandle`] is responsible for external lifecycle management 738 /// and resource freeing of the [`Client`]. 739 pub struct Client { 740 config: ClientConfig, 741 decoders: ModuleDecoderRegistry, 742 db: Database, 743 federation_id: FederationId, 744 federation_meta: BTreeMap<String, String>, 745 primary_module_instance: ModuleInstanceId, 746 modules: ClientModuleRegistry, 747 module_inits: ClientModuleInitRegistry, 748 executor: Executor, 749 api: DynGlobalApi, 750 root_secret: DerivableSecret, 751 operation_log: OperationLog, 752 secp_ctx: Secp256k1<secp256k1_zkp::All>, 753 meta_service: Arc<MetaService>, 754 755 task_group: TaskGroup, 756 757 /// Updates about client recovery progress 758 client_recovery_progress_receiver: 759 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>, 760 } 761 762 impl Client { 763 /// Initialize a client builder that can be configured to create a new 764 /// client. 765 pub fn builder(db: Database) -> ClientBuilder { 766 ClientBuilder::new(db) 767 } 768 769 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) { 770 self.api.as_ref() 771 } 772 773 pub fn api_clone(&self) -> DynGlobalApi { 774 self.api.clone() 775 } 776 777 /// Get the [`TaskGroup`] that is tied to Client's lifetime. 778 pub fn task_group(&self) -> &TaskGroup { 779 &self.task_group 780 } 781 782 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> { 783 let mut dbtx = db.begin_transaction().await; 784 #[allow(clippy::let_and_return)] 785 let config = dbtx 786 .find_by_prefix(&ClientConfigKeyPrefix) 787 .await 788 .next() 789 .await 790 .map(|(_, config)| config); 791 config 792 } 793 794 pub async fn store_encodable_client_secret<T: Encodable>( 795 db: &Database, 796 secret: T, 797 ) -> anyhow::Result<()> { 798 let mut dbtx = db.begin_transaction().await; 799 800 // Don't overwrite an existing secret 801 match dbtx.get_value(&EncodedClientSecretKey).await { 802 Some(_) => bail!("Encoded client secret already exists, cannot overwrite"), 803 None => { 804 let encoded_secret = T::consensus_encode_to_vec(&secret); 805 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret) 806 .await; 807 dbtx.commit_tx().await; 808 Ok(()) 809 } 810 } 811 } 812 813 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> { 814 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else { 815 bail!("Encoded client secret not present in DB") 816 }; 817 818 Ok(secret) 819 } 820 pub async fn load_decodable_client_secret_opt<T: Decodable>( 821 db: &Database, 822 ) -> anyhow::Result<Option<T>> { 823 let mut dbtx = db.begin_transaction_nc().await; 824 825 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await; 826 827 Ok(match client_secret { 828 Some(client_secret) => Some( 829 T::consensus_decode(&mut client_secret.as_slice(), &Default::default()) 830 .map_err(|e| anyhow!("Decoding failed: {e}"))?, 831 ), 832 None => None, 833 }) 834 } 835 836 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> { 837 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await { 838 Ok(secret) => secret, 839 Err(_) => { 840 let secret = PlainRootSecretStrategy::random(&mut thread_rng()); 841 Self::store_encodable_client_secret(db, secret) 842 .await 843 .expect("Storing client secret must work"); 844 secret 845 } 846 }; 847 Ok(client_secret) 848 } 849 850 pub async fn is_initialized(db: &Database) -> bool { 851 Self::get_config_from_db(db).await.is_some() 852 } 853 854 pub async fn start_executor(self: &Arc<Self>) { 855 debug!( 856 "Starting fedimint client executor (version: {})", 857 fedimint_build_code_version_env!() 858 ); 859 self.executor.start_executor(self.context_gen()).await; 860 } 861 862 pub fn federation_id(&self) -> FederationId { 863 self.federation_id 864 } 865 866 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen { 867 let client_inner = Arc::downgrade(self); 868 Arc::new(move |module_instance, operation| { 869 ModuleGlobalClientContext { 870 client: client_inner 871 .clone() 872 .upgrade() 873 .expect("ModuleGlobalContextGen called after client was dropped"), 874 module_instance_id: module_instance, 875 operation, 876 } 877 .into() 878 }) 879 } 880 881 fn config(&self) -> &ClientConfig { 882 &self.config 883 } 884 885 pub fn decoders(&self) -> &ModuleDecoderRegistry { 886 &self.decoders 887 } 888 889 /// Returns a reference to the module, panics if not found 890 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) { 891 self.try_get_module(instance) 892 .expect("Module instance not found") 893 } 894 895 fn try_get_module( 896 &self, 897 instance: ModuleInstanceId, 898 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> { 899 Some(self.modules.get(instance)?.as_ref()) 900 } 901 902 pub fn has_module(&self, instance: ModuleInstanceId) -> bool { 903 self.modules.get(instance).is_some() 904 } 905 906 /// Returns the input amount and output amount of a transaction 907 /// 908 /// # Panics 909 /// If any of the input or output versions in the transaction builder are 910 /// unknown by the respective module. 911 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) { 912 // FIXME: prevent overflows, currently not suitable for untrusted input 913 let mut in_amount = Amount::ZERO; 914 let mut out_amount = Amount::ZERO; 915 let mut fee_amount = Amount::ZERO; 916 917 for input in &builder.inputs { 918 let module = self.get_module(input.input.module_instance_id()); 919 920 let item_fee = module.input_fee(&input.input).expect( 921 "We only build transactions with input versions that are supported by the module", 922 ); 923 924 in_amount += input.amount; 925 fee_amount += item_fee; 926 } 927 928 for output in &builder.outputs { 929 let module = self.get_module(output.output.module_instance_id()); 930 931 let item_fee = module.output_fee(&output.output).expect( 932 "We only build transactions with output versions that are supported by the module", 933 ); 934 935 out_amount += output.amount; 936 fee_amount += item_fee; 937 } 938 939 (in_amount, out_amount + fee_amount) 940 } 941 942 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> { 943 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0)) 944 } 945 946 pub fn get_meta(&self, key: &str) -> Option<String> { 947 self.federation_meta.get(key).cloned() 948 } 949 950 fn root_secret(&self) -> DerivableSecret { 951 self.root_secret.clone() 952 } 953 954 pub async fn add_state_machines( 955 &self, 956 dbtx: &mut DatabaseTransaction<'_>, 957 states: Vec<DynState>, 958 ) -> AddStateMachinesResult { 959 self.executor.add_state_machines_dbtx(dbtx, states).await 960 } 961 962 // TODO: implement as part of [`OperationLog`] 963 pub async fn get_active_operations(&self) -> HashSet<OperationId> { 964 let active_states = self.executor.get_active_states().await; 965 let mut active_operations = HashSet::with_capacity(active_states.len()); 966 let mut dbtx = self.db().begin_transaction_nc().await; 967 for (state, _) in active_states { 968 let operation_id = state.operation_id(); 969 if dbtx 970 .get_value(&OperationLogKey { operation_id }) 971 .await 972 .is_some() 973 { 974 active_operations.insert(operation_id); 975 } 976 } 977 active_operations 978 } 979 980 pub fn operation_log(&self) -> &OperationLog { 981 &self.operation_log 982 } 983 984 /// Get the meta manager to read meta fields. 985 pub fn meta_service(&self) -> &Arc<MetaService> { 986 &self.meta_service 987 } 988 989 /// Adds funding to a transaction or removes over-funding via change. 990 async fn finalize_transaction( 991 &self, 992 dbtx: &mut DatabaseTransaction<'_>, 993 operation_id: OperationId, 994 mut partial_transaction: TransactionBuilder, 995 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> { 996 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction); 997 998 let (added_inputs, change_outputs) = self 999 .primary_module() 1000 .create_final_inputs_and_outputs( 1001 self.primary_module_instance, 1002 dbtx, 1003 operation_id, 1004 input_amount, 1005 output_amount, 1006 ) 1007 .await?; 1008 1009 // This is the range of outputs that will be added to the transaction 1010 // in order to balance it. Notice that it may stay empty in case the transaction 1011 // is already balanced. 1012 let change_range = Range { 1013 start: partial_transaction.outputs.len() as u64, 1014 end: (partial_transaction.outputs.len() + change_outputs.len()) as u64, 1015 }; 1016 1017 partial_transaction.inputs.extend(added_inputs); 1018 partial_transaction.outputs.extend(change_outputs); 1019 1020 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction); 1021 1022 assert_eq!(input_amount, output_amount, "Transaction is not balanced"); 1023 1024 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng()); 1025 1026 Ok((tx, states, change_range)) 1027 } 1028 1029 /// Add funding and/or change to the transaction builder as needed, finalize 1030 /// the transaction and submit it to the federation. 1031 /// 1032 /// ## Errors 1033 /// The function will return an error if the operation with given ID already 1034 /// exists. 1035 /// 1036 /// ## Panics 1037 /// The function will panic if the database transaction collides with 1038 /// other and fails with others too often, this should not happen except for 1039 /// excessively concurrent scenarios. 1040 pub async fn finalize_and_submit_transaction<F, M>( 1041 &self, 1042 operation_id: OperationId, 1043 operation_type: &str, 1044 operation_meta: F, 1045 tx_builder: TransactionBuilder, 1046 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> 1047 where 1048 F: Fn(TransactionId, Vec<OutPoint>) -> M + Clone + MaybeSend + MaybeSync, 1049 M: serde::Serialize + MaybeSend, 1050 { 1051 let operation_type = operation_type.to_owned(); 1052 1053 let autocommit_res = self 1054 .db 1055 .autocommit( 1056 |dbtx, _| { 1057 let operation_type = operation_type.clone(); 1058 let tx_builder = tx_builder.clone(); 1059 let operation_meta = operation_meta.clone(); 1060 Box::pin(async move { 1061 if Client::operation_exists_dbtx(dbtx, operation_id).await { 1062 bail!("There already exists an operation with id {operation_id:?}") 1063 } 1064 1065 let (txid, change) = self 1066 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder) 1067 .await?; 1068 1069 self.operation_log() 1070 .add_operation_log_entry( 1071 dbtx, 1072 operation_id, 1073 &operation_type, 1074 operation_meta(txid, change.clone()), 1075 ) 1076 .await; 1077 1078 Ok((txid, change)) 1079 }) 1080 }, 1081 Some(100), // TODO: handle what happens after 100 retries 1082 ) 1083 .await; 1084 1085 match autocommit_res { 1086 Ok(txid) => Ok(txid), 1087 Err(AutocommitError::ClosureError { error, .. }) => Err(error), 1088 Err(AutocommitError::CommitFailed { 1089 attempts, 1090 last_error, 1091 }) => panic!( 1092 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}" 1093 ), 1094 } 1095 } 1096 1097 async fn finalize_and_submit_transaction_inner( 1098 &self, 1099 dbtx: &mut DatabaseTransaction<'_>, 1100 operation_id: OperationId, 1101 tx_builder: TransactionBuilder, 1102 ) -> anyhow::Result<(TransactionId, Vec<OutPoint>)> { 1103 let (transaction, mut states, change_range) = self 1104 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder) 1105 .await?; 1106 1107 ensure!( 1108 transaction.consensus_encode_to_vec().len() <= Transaction::MAX_TX_SIZE, 1109 "The generated transaction would be rejected by the federation for being too large." 1110 ); 1111 1112 let txid = transaction.tx_hash(); 1113 1114 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction"); 1115 1116 let change_outpoints = change_range 1117 .into_iter() 1118 .map(|out_idx| OutPoint { txid, out_idx }) 1119 .collect(); 1120 1121 let tx_submission_sm = DynState::from_typed( 1122 TRANSACTION_SUBMISSION_MODULE_INSTANCE, 1123 OperationState { 1124 operation_id, 1125 state: TxSubmissionStates::Created(transaction), 1126 }, 1127 ); 1128 states.push(tx_submission_sm); 1129 1130 self.executor.add_state_machines_dbtx(dbtx, states).await?; 1131 1132 Ok((txid, change_outpoints)) 1133 } 1134 1135 async fn transaction_update_stream( 1136 &self, 1137 operation_id: OperationId, 1138 ) -> BoxStream<'static, OperationState<TxSubmissionStates>> { 1139 self.executor 1140 .notifier() 1141 .module_notifier::<OperationState<TxSubmissionStates>>( 1142 TRANSACTION_SUBMISSION_MODULE_INSTANCE, 1143 ) 1144 .subscribe(operation_id) 1145 .await 1146 } 1147 1148 pub async fn operation_exists(&self, operation_id: OperationId) -> bool { 1149 let mut dbtx = self.db().begin_transaction_nc().await; 1150 1151 Client::operation_exists_dbtx(&mut dbtx, operation_id).await 1152 } 1153 1154 pub async fn operation_exists_dbtx( 1155 dbtx: &mut DatabaseTransaction<'_>, 1156 operation_id: OperationId, 1157 ) -> bool { 1158 let active_state_exists = dbtx 1159 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id }) 1160 .await 1161 .next() 1162 .await 1163 .is_some(); 1164 1165 let inactive_state_exists = dbtx 1166 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id }) 1167 .await 1168 .next() 1169 .await 1170 .is_some(); 1171 1172 active_state_exists || inactive_state_exists 1173 } 1174 1175 pub async fn has_active_states(&self, operation_id: OperationId) -> bool { 1176 self.db 1177 .begin_transaction() 1178 .await 1179 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id }) 1180 .await 1181 .next() 1182 .await 1183 .is_some() 1184 } 1185 1186 /// Waits for an output from the primary module to reach its final 1187 /// state. 1188 pub async fn await_primary_module_output( 1189 &self, 1190 operation_id: OperationId, 1191 out_point: OutPoint, 1192 ) -> anyhow::Result<Amount> { 1193 self.primary_module() 1194 .await_primary_module_output(operation_id, out_point) 1195 .await 1196 } 1197 1198 /// Returns a reference to a typed module client instance by kind 1199 pub fn get_first_module<M: ClientModule>(&self) -> ClientModuleInstance<M> { 1200 let module_kind = M::kind(); 1201 let id = self 1202 .get_first_instance(&module_kind) 1203 .unwrap_or_else(|| panic!("No modules found of kind {module_kind}")); 1204 let module: &M = self 1205 .try_get_module(id) 1206 .unwrap_or_else(|| panic!("Unknown module instance {id}")) 1207 .as_any() 1208 .downcast_ref::<M>() 1209 .unwrap_or_else(|| panic!("Module is not of type {}", std::any::type_name::<M>())); 1210 ClientModuleInstance { 1211 id, 1212 db: self.db().with_prefix_module_id(id), 1213 api: self.api().with_module(id), 1214 module, 1215 } 1216 } 1217 1218 pub fn get_module_client_dyn( 1219 &self, 1220 instance_id: ModuleInstanceId, 1221 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> { 1222 self.try_get_module(instance_id) 1223 .ok_or(anyhow!("Unknown module instance {}", instance_id)) 1224 } 1225 1226 pub fn db(&self) -> &Database { 1227 &self.db 1228 } 1229 1230 /// Returns a stream of transaction updates for the given operation id that 1231 /// can later be used to watch for a specific transaction being accepted. 1232 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates { 1233 TransactionUpdates { 1234 update_stream: self.transaction_update_stream(operation_id).await, 1235 } 1236 } 1237 1238 /// Returns the instance id of the first module of the given kind. The 1239 /// primary module will always be returned before any other modules (which 1240 /// themselves are ordered by their instance ID). 1241 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> { 1242 if self 1243 .modules 1244 .get_with_kind(self.primary_module_instance) 1245 .map(|(kind, _)| kind == module_kind) 1246 .unwrap_or(false) 1247 { 1248 return Some(self.primary_module_instance); 1249 } 1250 1251 self.modules 1252 .iter_modules() 1253 .find(|(_, kind, _module)| *kind == module_kind) 1254 .map(|(instance_id, _, _)| instance_id) 1255 } 1256 1257 /// Returns the data from which the client's root secret is derived (e.g. 1258 /// BIP39 seed phrase struct). 1259 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> { 1260 get_decoded_client_secret::<T>(self.db()).await 1261 } 1262 1263 /// Waits for outputs from the primary module to reach its final 1264 /// state. 1265 pub async fn await_primary_module_outputs( 1266 &self, 1267 operation_id: OperationId, 1268 outputs: Vec<OutPoint>, 1269 ) -> anyhow::Result<Amount> { 1270 let mut amount = Amount::ZERO; 1271 1272 for out_point in outputs { 1273 amount += self 1274 .await_primary_module_output(operation_id, out_point) 1275 .await?; 1276 } 1277 1278 Ok(amount) 1279 } 1280 1281 /// Returns the config with which the client was initialized. 1282 pub fn get_config(&self) -> &ClientConfig { 1283 &self.config 1284 } 1285 1286 /// Returns the config of the client in JSON format. 1287 /// 1288 /// Compared to the consensus module format where module configs are binary 1289 /// encoded this format cannot be cryptographically verified but is easier 1290 /// to consume and to some degree human-readable. 1291 pub fn get_config_json(&self) -> JsonClientConfig { 1292 JsonClientConfig { 1293 global: self.get_config().global.clone(), 1294 modules: self 1295 .get_config() 1296 .modules 1297 .iter() 1298 .map(|(instance_id, ClientModuleConfig { kind, config, .. })| { 1299 ( 1300 *instance_id, 1301 JsonWithKind::new( 1302 kind.clone(), 1303 config 1304 .clone() 1305 .decoded() 1306 .map(|decoded| decoded.to_json().into()) 1307 .unwrap_or(serde_json::Value::Null), 1308 ), 1309 ) 1310 }) 1311 .collect(), 1312 } 1313 } 1314 1315 /// Get the primary module 1316 pub fn primary_module(&self) -> &DynClientModule { 1317 self.modules 1318 .get(self.primary_module_instance) 1319 .expect("primary module must be present") 1320 } 1321 1322 /// Balance available to the client for spending 1323 pub async fn get_balance(&self) -> Amount { 1324 self.primary_module() 1325 .get_balance( 1326 self.primary_module_instance, 1327 &mut self.db().begin_transaction_nc().await, 1328 ) 1329 .await 1330 } 1331 1332 /// Returns a stream that yields the current client balance every time it 1333 /// changes. 1334 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> { 1335 let mut balance_changes = self.primary_module().subscribe_balance_changes().await; 1336 let initial_balance = self.get_balance().await; 1337 let db = self.db().clone(); 1338 let primary_module = self.primary_module().clone(); 1339 let primary_module_instance = self.primary_module_instance; 1340 1341 Box::pin(stream! { 1342 yield initial_balance; 1343 let mut prev_balance = initial_balance; 1344 while let Some(()) = balance_changes.next().await { 1345 let mut dbtx = db.begin_transaction_nc().await; 1346 let balance = primary_module 1347 .get_balance(primary_module_instance, &mut dbtx) 1348 .await; 1349 1350 // Deduplicate in case modules cannot always tell if the balance actually changed 1351 if balance != prev_balance { 1352 prev_balance = balance; 1353 yield balance; 1354 } 1355 } 1356 }) 1357 } 1358 1359 pub async fn discover_common_api_version( 1360 &self, 1361 threshold: Option<usize>, 1362 ) -> anyhow::Result<ApiVersionSet> { 1363 Ok(self 1364 .api() 1365 .discover_api_version_set( 1366 &Self::supported_api_versions_summary_static(self.get_config(), &self.module_inits) 1367 .await, 1368 get_discover_api_version_timeout(), 1369 threshold, 1370 ) 1371 .await?) 1372 } 1373 1374 /// Query the federation for API version support and then calculate 1375 /// the best API version to use (supported by most guardians). 1376 pub async fn discover_common_api_version_static( 1377 config: &ClientConfig, 1378 client_module_init: &ClientModuleInitRegistry, 1379 api: &DynGlobalApi, 1380 mode: DiscoverCommonApiVersionMode, 1381 ) -> anyhow::Result<ApiVersionSet> { 1382 Ok(api 1383 .discover_api_version_set( 1384 &Self::supported_api_versions_summary_static(config, client_module_init).await, 1385 get_discover_api_version_timeout(), 1386 match mode { 1387 DiscoverCommonApiVersionMode::Fast => { 1388 Some((config.global.api_endpoints.len() / 2).min(1)) 1389 } 1390 DiscoverCommonApiVersionMode::Full => None, 1391 }, 1392 ) 1393 .await?) 1394 } 1395 1396 /// [`SupportedApiVersionsSummary`] that the client and its modules support 1397 pub async fn supported_api_versions_summary_static( 1398 config: &ClientConfig, 1399 client_module_init: &ClientModuleInitRegistry, 1400 ) -> SupportedApiVersionsSummary { 1401 SupportedApiVersionsSummary { 1402 core: SupportedCoreApiVersions { 1403 core_consensus: config.global.consensus_version, 1404 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned()) 1405 .expect("must not have conflicting versions"), 1406 }, 1407 modules: config 1408 .modules 1409 .iter() 1410 .filter_map(|(&module_instance_id, module_config)| { 1411 client_module_init 1412 .get(module_config.kind()) 1413 .map(|module_init| { 1414 ( 1415 module_instance_id, 1416 SupportedModuleApiVersions { 1417 core_consensus: config.global.consensus_version, 1418 module_consensus: module_config.version, 1419 api: module_init.supported_api_versions(), 1420 }, 1421 ) 1422 }) 1423 }) 1424 .collect(), 1425 } 1426 } 1427 1428 /// Load the common api versions to use from cache and start a background 1429 /// process to refresh them. 1430 /// 1431 /// This is a compromise, so we not have to wait for version discovery to 1432 /// complete every time a [`Client`] is being built. 1433 async fn load_and_refresh_common_api_version_static( 1434 config: &ClientConfig, 1435 module_inits: &ModuleInitRegistry<DynClientModuleInit>, 1436 api: &DynGlobalApi, 1437 db: &Database, 1438 task_group: &TaskGroup, 1439 ) -> anyhow::Result<ApiVersionSet> { 1440 if let Some(v) = db 1441 .begin_transaction() 1442 .await 1443 .get_value(&CachedApiVersionSetKey) 1444 .await 1445 { 1446 debug!("Found existing cached common api versions"); 1447 let config = config.clone(); 1448 let module_inits = module_inits.clone(); 1449 let api = api.clone(); 1450 let db = db.clone(); 1451 // Separate task group, because we actually don't want to be waiting for this to 1452 // finish, and it's just best effort. 1453 task_group.spawn_cancellable("refresh_common_api_version_static", async move { 1454 if let Err(error) = Self::refresh_common_api_version_static( 1455 &config, 1456 &module_inits, 1457 &api, 1458 &db, 1459 DiscoverCommonApiVersionMode::Full, 1460 ) 1461 .await 1462 { 1463 warn!(%error, "Failed to discover common api versions"); 1464 } 1465 }); 1466 1467 return Ok(v.0); 1468 } 1469 1470 debug!("No existing cached common api versions found, waiting for initial discovery"); 1471 Self::refresh_common_api_version_static( 1472 config, 1473 module_inits, 1474 api, 1475 db, 1476 DiscoverCommonApiVersionMode::Fast, 1477 ) 1478 .await 1479 } 1480 1481 async fn refresh_common_api_version_static( 1482 config: &ClientConfig, 1483 module_inits: &ModuleInitRegistry<DynClientModuleInit>, 1484 api: &DynGlobalApi, 1485 db: &Database, 1486 mode: DiscoverCommonApiVersionMode, 1487 ) -> anyhow::Result<ApiVersionSet> { 1488 debug!("Refreshing common api versions"); 1489 1490 let common_api_versions = 1491 Client::discover_common_api_version_static(config, module_inits, api, mode).await?; 1492 1493 debug!( 1494 value = ?common_api_versions, 1495 "Updating the cached common api versions" 1496 ); 1497 let mut dbtx = db.begin_transaction().await; 1498 let _ = dbtx 1499 .insert_entry( 1500 &CachedApiVersionSetKey, 1501 &CachedApiVersionSet(common_api_versions.clone()), 1502 ) 1503 .await; 1504 1505 dbtx.commit_tx().await; 1506 1507 Ok(common_api_versions) 1508 } 1509 1510 /// Get the client [`Metadata`] 1511 pub async fn get_metadata(&self) -> Metadata { 1512 self.db 1513 .begin_transaction_nc() 1514 .await 1515 .get_value(&ClientMetadataKey) 1516 .await 1517 .unwrap_or_else(|| { 1518 warn!("Missing existing metadata. This key should have been set on Client init"); 1519 Metadata::empty() 1520 }) 1521 } 1522 1523 /// Set the client [`Metadata`] 1524 pub async fn set_metadata(&self, metadata: &Metadata) { 1525 self.db 1526 .autocommit::<_, _, anyhow::Error>( 1527 move |dbtx, _| { 1528 Box::pin(async move { 1529 Self::set_metadata_dbtx(dbtx, metadata).await; 1530 Ok(()) 1531 }) 1532 }, 1533 None, 1534 ) 1535 .await 1536 .expect("Failed to autocommit metadata") 1537 } 1538 1539 pub async fn has_pending_recoveries(&self) -> bool { 1540 !self 1541 .client_recovery_progress_receiver 1542 .borrow() 1543 .iter() 1544 .all(|(_id, progress)| progress.is_done()) 1545 } 1546 1547 /// Wait for all module recoveries to finish 1548 /// 1549 /// This will block until the recovery task is done with recoveries. 1550 /// Returns success if all recovery tasks are complete (success case), 1551 /// or an error if some modules could not complete the recovery at the time. 1552 /// 1553 /// A bit of a heavy approach. 1554 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> { 1555 let mut recovery_receiver = self.client_recovery_progress_receiver.clone(); 1556 recovery_receiver 1557 .wait_for(|in_progress| { 1558 in_progress 1559 .iter() 1560 .all(|(_id, progress)| progress.is_done()) 1561 }) 1562 .await 1563 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?; 1564 1565 Ok(()) 1566 } 1567 1568 /// Subscribe to recover progress for all the modules. 1569 /// 1570 /// This stream can contain duplicate progress for a module. 1571 /// Don't use this stream for detecting completion of recovery. 1572 pub fn subscribe_to_recovery_progress( 1573 &self, 1574 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> { 1575 WatchStream::new(self.client_recovery_progress_receiver.clone()) 1576 .flat_map(futures::stream::iter) 1577 } 1578 1579 pub async fn wait_for_module_kind_recovery( 1580 &self, 1581 module_kind: ModuleKind, 1582 ) -> anyhow::Result<()> { 1583 let mut recovery_receiver = self.client_recovery_progress_receiver.clone(); 1584 recovery_receiver 1585 .wait_for(|in_progress| { 1586 !in_progress 1587 .iter() 1588 .filter(|(module_instance_id, _progress)| { 1589 self.config.modules[module_instance_id].kind == module_kind 1590 }) 1591 .any(|(_id, progress)| !progress.is_done()) 1592 }) 1593 .await 1594 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?; 1595 1596 Ok(()) 1597 } 1598 1599 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> { 1600 loop { 1601 if self.executor.get_active_states().await.is_empty() { 1602 break; 1603 } 1604 fedimint_core::runtime::sleep(Duration::from_millis(100)).await; 1605 } 1606 Ok(()) 1607 } 1608 1609 /// Set the client [`Metadata`] 1610 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) { 1611 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await; 1612 } 1613 1614 async fn spawn_module_recoveries_task( 1615 &self, 1616 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>, 1617 module_recoveries: BTreeMap< 1618 ModuleInstanceId, 1619 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>, 1620 >, 1621 module_recovery_progress_receivers: BTreeMap< 1622 ModuleInstanceId, 1623 watch::Receiver<RecoveryProgress>, 1624 >, 1625 ) { 1626 let db = self.db.clone(); 1627 self.task_group 1628 .spawn("module recoveries", move |_task_handle| async move { 1629 Self::run_module_recoveries_task( 1630 db, 1631 recovery_sender, 1632 module_recoveries, 1633 module_recovery_progress_receivers, 1634 ) 1635 .await 1636 }); 1637 } 1638 1639 async fn run_module_recoveries_task( 1640 db: Database, 1641 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>, 1642 module_recoveries: BTreeMap< 1643 ModuleInstanceId, 1644 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>, 1645 >, 1646 module_recovery_progress_receivers: BTreeMap< 1647 ModuleInstanceId, 1648 watch::Receiver<RecoveryProgress>, 1649 >, 1650 ) { 1651 debug!(target:LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries"); 1652 let mut completed_stream = Vec::new(); 1653 let progress_stream = futures::stream::FuturesUnordered::new(); 1654 1655 for (module_instance_id, f) in module_recoveries.into_iter() { 1656 completed_stream.push(futures::stream::once(Box::pin(async move { 1657 match f.await { 1658 Ok(_) => (module_instance_id, None), 1659 Err(err) => { 1660 warn!(%err, module_instance_id, "Module recovery failed"); 1661 // a module recovery that failed reports and error and 1662 // just never finishes, so we don't need a separate state 1663 // for it 1664 futures::future::pending::<Option<RecoveryProgress>>().await; 1665 unreachable!() 1666 } 1667 } 1668 }))); 1669 } 1670 1671 for (module_instance_id, rx) in module_recovery_progress_receivers.into_iter() { 1672 progress_stream.push( 1673 tokio_stream::wrappers::WatchStream::new(rx) 1674 .fuse() 1675 .map(move |progress| (module_instance_id, Some(progress))), 1676 ); 1677 } 1678 1679 let mut futures = futures::stream::select( 1680 futures::stream::select_all(progress_stream), 1681 futures::stream::select_all(completed_stream), 1682 ); 1683 1684 while let Some((module_instance_id, progress)) = futures.next().await { 1685 let mut dbtx = db.begin_transaction().await; 1686 1687 let prev_progress = *recovery_sender 1688 .borrow() 1689 .get(&module_instance_id) 1690 .expect("existing progress must be present"); 1691 1692 let progress = if prev_progress.is_done() { 1693 // since updates might be out of order, once done, stick with it 1694 prev_progress 1695 } else if let Some(progress) = progress { 1696 progress 1697 } else { 1698 prev_progress.to_complete() 1699 }; 1700 1701 info!( 1702 module_instance_id, 1703 progress = format!("{}/{}", progress.complete, progress.total), 1704 "Recovery progress" 1705 ); 1706 1707 dbtx.insert_entry( 1708 &ClientModuleRecovery { module_instance_id }, 1709 &ClientModuleRecoveryState { progress }, 1710 ) 1711 .await; 1712 dbtx.commit_tx().await; 1713 1714 recovery_sender.send_modify(|v| { 1715 v.insert(module_instance_id, progress); 1716 }); 1717 } 1718 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped"); 1719 } 1720 } 1721 1722 /// See [`Client::transaction_updates`] 1723 pub struct TransactionUpdates { 1724 update_stream: BoxStream<'static, OperationState<TxSubmissionStates>>, 1725 } 1726 1727 impl TransactionUpdates { 1728 /// Waits for the transaction to be accepted or rejected as part of the 1729 /// operation to which the `TransactionUpdates` object is subscribed. 1730 pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> { 1731 self.update_stream 1732 .filter_map(|tx_update| { 1733 std::future::ready(match tx_update.state { 1734 TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())), 1735 TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => { 1736 Some(Err(submit_error)) 1737 } 1738 _ => None, 1739 }) 1740 }) 1741 .next_or_pending() 1742 .await 1743 } 1744 } 1745 1746 /// Admin (guardian) identification and authentication 1747 pub struct AdminCreds { 1748 /// Guardian's own `peer_id` 1749 pub peer_id: PeerId, 1750 /// Authentication details 1751 pub auth: ApiAuth, 1752 } 1753 1754 /// Used to configure, assemble and build [`Client`] 1755 pub struct ClientBuilder { 1756 module_inits: ClientModuleInitRegistry, 1757 primary_module_instance: Option<ModuleInstanceId>, 1758 admin_creds: Option<AdminCreds>, 1759 db_no_decoders: Database, 1760 meta_service: Arc<MetaService>, 1761 stopped: bool, 1762 } 1763 1764 impl ClientBuilder { 1765 fn new(db: Database) -> Self { 1766 let meta_service = MetaService::new(LegacyMetaSource::default()); 1767 ClientBuilder { 1768 module_inits: Default::default(), 1769 primary_module_instance: Default::default(), 1770 admin_creds: None, 1771 db_no_decoders: db, 1772 stopped: false, 1773 meta_service, 1774 } 1775 } 1776 1777 fn from_existing(client: &Client) -> Self { 1778 ClientBuilder { 1779 module_inits: client.module_inits.clone(), 1780 primary_module_instance: Some(client.primary_module_instance), 1781 admin_creds: None, 1782 db_no_decoders: client.db.with_decoders(Default::default()), 1783 stopped: false, 1784 // non unique 1785 meta_service: client.meta_service.clone(), 1786 } 1787 } 1788 1789 /// Replace module generator registry entirely 1790 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) { 1791 self.module_inits = module_inits; 1792 } 1793 1794 /// Make module generator available when reading the config 1795 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) { 1796 self.module_inits.attach(module_init); 1797 } 1798 1799 pub fn stopped(&mut self) { 1800 self.stopped = true; 1801 } 1802 1803 /// Uses this module with the given instance id as the primary module. See 1804 /// [`ClientModule::supports_being_primary`] for more information. 1805 /// 1806 /// ## Panics 1807 /// If there was a primary module specified previously 1808 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) { 1809 let was_replaced = self 1810 .primary_module_instance 1811 .replace(primary_module_instance) 1812 .is_some(); 1813 assert!( 1814 !was_replaced, 1815 "Only one primary module can be given to the builder." 1816 ) 1817 } 1818 1819 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) { 1820 self.meta_service = meta_service; 1821 } 1822 1823 async fn migrate_database(&self, db: &Database) -> anyhow::Result<()> { 1824 // Only apply the client database migrations if the database has been 1825 // initialized. 1826 if let Ok(client_config) = self.load_existing_config().await { 1827 for (module_id, module_cfg) in client_config.modules { 1828 let kind = module_cfg.kind.clone(); 1829 let Some(init) = self.module_inits.get(&kind) else { 1830 // normal, expected and already logged about when building the client 1831 continue; 1832 }; 1833 1834 apply_migrations_client( 1835 db, 1836 kind.to_string(), 1837 init.database_version(), 1838 init.get_database_migrations(), 1839 module_id, 1840 ) 1841 .await?; 1842 } 1843 } 1844 1845 Ok(()) 1846 } 1847 1848 pub fn db_no_decoders(&self) -> &Database { 1849 &self.db_no_decoders 1850 } 1851 1852 pub async fn load_existing_config(&self) -> anyhow::Result<ClientConfig> { 1853 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else { 1854 bail!("Client database not initialized") 1855 }; 1856 1857 Ok(config) 1858 } 1859 1860 pub fn set_admin_creds(&mut self, creds: AdminCreds) { 1861 self.admin_creds = Some(creds); 1862 } 1863 1864 async fn init( 1865 self, 1866 root_secret: DerivableSecret, 1867 config: ClientConfig, 1868 init_mode: InitMode, 1869 ) -> anyhow::Result<ClientHandle> { 1870 if Client::is_initialized(&self.db_no_decoders).await { 1871 bail!("Client database already initialized") 1872 } 1873 1874 // Note: It's important all client initialization is performed as one big 1875 // transaction to avoid half-initialized client state. 1876 { 1877 debug!(target: LOG_CLIENT, "Initializing client database"); 1878 let mut dbtx = self.db_no_decoders.begin_transaction().await; 1879 // Save config to DB 1880 dbtx.insert_new_entry( 1881 &ClientConfigKey { 1882 id: config.calculate_federation_id(), 1883 }, 1884 &config, 1885 ) 1886 .await; 1887 1888 let init_state = InitState::Pending(init_mode); 1889 dbtx.insert_entry(&ClientInitStateKey, &init_state).await; 1890 1891 let metadata = init_state 1892 .does_require_recovery() 1893 .flatten() 1894 .map(|s| s.metadata) 1895 .unwrap_or(Metadata::empty()); 1896 1897 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await; 1898 1899 dbtx.commit_tx_result().await?; 1900 } 1901 1902 let stopped = self.stopped; 1903 self.build(root_secret, config, stopped).await 1904 } 1905 1906 /// Join a new Federation 1907 /// 1908 /// When a user wants to connect to a new federation this function fetches 1909 /// the federation config and initializes the client database. If a user 1910 /// already joined the federation in the past and has a preexisting database 1911 /// use [`ClientBuilder::open`] instead. 1912 /// 1913 /// **Warning**: Calling `join` with a `root_secret` key that was used 1914 /// previous to `join` a Federation will lead to all sorts of malfunctions 1915 /// including likely loss of funds. 1916 /// 1917 /// This should be generally called only if the `root_secret` key is known 1918 /// not to have been used before (e.g. just randomly generated). For keys 1919 /// that might have been previous used (e.g. provided by the user), 1920 /// it's safer to call [`Self::recover`] which will attempt to recover 1921 /// client module states for the Federation. 1922 /// 1923 /// A typical "join federation" flow would look as follows: 1924 /// ```no_run 1925 /// # use std::str::FromStr; 1926 /// # use fedimint_core::invite_code::InviteCode; 1927 /// # use fedimint_core::config::ClientConfig; 1928 /// # use fedimint_derive_secret::DerivableSecret; 1929 /// # use fedimint_client::{Client, ClientBuilder}; 1930 /// # use fedimint_core::db::Database; 1931 /// # use fedimint_core::config::META_FEDERATION_NAME_KEY; 1932 /// # 1933 /// # #[tokio::main] 1934 /// # async fn main() { 1935 /// # let root_secret: DerivableSecret = unimplemented!(); 1936 /// // Create a root secret, e.g. via fedimint-bip39, see also: 1937 /// // https://github.com/fedimint/fedimint/blob/master/docs/secret_derivation.md 1938 /// // let root_secret = …; 1939 /// 1940 /// // Get invite code from user 1941 /// let invite_code = InviteCode::from_str("fed11qgqpw9thwvaz7te3xgmjuvpwxqhrzw3jxumrvvf0qqqjpetvlg8glnpvzcufhffgzhv8m75f7y34ryk7suamh8x7zetly8h0v9v0rm") 1942 /// .expect("Invalid invite code"); 1943 /// let config = fedimint_api_client::download_from_invite_code(&invite_code).await 1944 /// .expect("Error downloading config"); 1945 /// 1946 /// // Tell the user the federation name, bitcoin network 1947 /// // (e.g. from wallet module config), and other details 1948 /// // that are typically contained in the federation's 1949 /// // meta fields. 1950 /// 1951 /// // let network = config.get_first_module_by_kind::<WalletClientConfig>("wallet") 1952 /// // .expect("Module not found") 1953 /// // .network; 1954 /// 1955 /// println!( 1956 /// "The federation name is: {}", 1957 /// config.meta::<String>(META_FEDERATION_NAME_KEY) 1958 /// .expect("Could not decode name field") 1959 /// .expect("Name isn't set") 1960 /// ); 1961 /// 1962 /// // Open the client's database, using the federation ID 1963 /// // as the DB name is a common pattern: 1964 /// 1965 /// // let db_path = format!("./path/to/db/{}", config.federation_id()); 1966 /// // let db = RocksDb::open(db_path).expect("error opening DB"); 1967 /// # let db: Database = unimplemented!(); 1968 /// 1969 /// let client = Client::builder(db) 1970 /// // Mount the modules the client should support: 1971 /// // .with_module(LightningClientInit) 1972 /// // .with_module(MintClientInit) 1973 /// // .with_module(WalletClientInit::default()) 1974 /// .join(root_secret, config) 1975 /// .await 1976 /// .expect("Error joining federation"); 1977 /// # } 1978 /// ``` 1979 pub async fn join( 1980 self, 1981 root_secret: DerivableSecret, 1982 config: ClientConfig, 1983 ) -> anyhow::Result<ClientHandle> { 1984 self.init(root_secret, config, InitMode::Fresh).await 1985 } 1986 1987 /// Download most recent valid backup found from the Federation 1988 pub async fn download_backup_from_federation( 1989 &self, 1990 root_secret: &DerivableSecret, 1991 config: &ClientConfig, 1992 ) -> anyhow::Result<Option<ClientBackup>> { 1993 let api = DynGlobalApi::from_config(config); 1994 Client::download_backup_from_federation_static( 1995 &api, 1996 &Self::federation_root_secret(root_secret, config), 1997 &self.decoders(config), 1998 ) 1999 .await 2000 } 2001 2002 /// Join a (possibly) previous joined Federation 2003 /// 2004 /// Unlike [`Self::join`], `recover` will run client module recovery for 2005 /// each client module attempting to recover any previous module state. 2006 /// 2007 /// Recovery process takes time during which each recovering client module 2008 /// will not be available for use. 2009 /// 2010 /// Calling `recovery` with a `root_secret` that was not actually previous 2011 /// used in a given Federation is safe. 2012 pub async fn recover( 2013 self, 2014 root_secret: DerivableSecret, 2015 config: ClientConfig, 2016 backup: Option<ClientBackup>, 2017 ) -> anyhow::Result<ClientHandle> { 2018 let client = self 2019 .init( 2020 root_secret, 2021 config, 2022 InitMode::Recover { 2023 snapshot: backup.clone(), 2024 }, 2025 ) 2026 .await?; 2027 2028 Ok(client) 2029 } 2030 2031 pub async fn open(self, root_secret: DerivableSecret) -> anyhow::Result<ClientHandle> { 2032 let Some(config) = Client::get_config_from_db(&self.db_no_decoders).await else { 2033 bail!("Client database not initialized") 2034 }; 2035 let stopped = self.stopped; 2036 2037 let client = self.build_stopped(root_secret, config).await?; 2038 if !stopped { 2039 client.as_inner().start_executor().await; 2040 } 2041 Ok(client) 2042 } 2043 2044 /// Build a [`Client`] but do not start the executor 2045 async fn build( 2046 self, 2047 root_secret: DerivableSecret, 2048 config: ClientConfig, 2049 stopped: bool, 2050 ) -> anyhow::Result<ClientHandle> { 2051 let client = self.build_stopped(root_secret, config).await?; 2052 if !stopped { 2053 client.as_inner().start_executor().await; 2054 } 2055 2056 Ok(client) 2057 } 2058 2059 /// Build a [`Client`] but do not start the executor 2060 async fn build_stopped( 2061 self, 2062 root_secret: DerivableSecret, 2063 config: ClientConfig, 2064 ) -> anyhow::Result<ClientHandle> { 2065 let decoders = self.decoders(&config); 2066 let config = Self::config_decoded(config, &decoders)?; 2067 let fed_id = config.calculate_federation_id(); 2068 let db = self.db_no_decoders.with_decoders(decoders.clone()); 2069 let api = if let Some(admin_creds) = self.admin_creds.as_ref() { 2070 DynGlobalApi::from_config_admin(&config, admin_creds.peer_id) 2071 } else { 2072 DynGlobalApi::from_config(&config) 2073 }; 2074 let task_group = TaskGroup::new(); 2075 2076 // Migrate the database before interacting with it in case any on-disk data 2077 // structures have changed. 2078 self.migrate_database(&db).await?; 2079 2080 let init_state = Self::load_init_state(&db).await; 2081 2082 let primary_module_instance = self 2083 .primary_module_instance 2084 .ok_or(anyhow!("No primary module instance id was provided"))?; 2085 2086 let notifier = Notifier::new(db.clone()); 2087 2088 let common_api_versions = Client::load_and_refresh_common_api_version_static( 2089 &config, 2090 &self.module_inits, 2091 &api, 2092 &db, 2093 &task_group, 2094 ) 2095 .await 2096 .inspect_err(|err| { 2097 warn!(target: LOG_CLIENT, %err, "Failed to discover initial API version to use."); 2098 }) 2099 .unwrap_or(ApiVersionSet { 2100 core: ApiVersion::new(0, 0), 2101 // This will cause all modules to skip initialization 2102 modules: Default::default(), 2103 }); 2104 2105 debug!(?common_api_versions, "Completed api version negotiation"); 2106 2107 let mut module_recoveries: BTreeMap< 2108 ModuleInstanceId, 2109 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>, 2110 > = Default::default(); 2111 let mut module_recovery_progress_receivers: BTreeMap< 2112 ModuleInstanceId, 2113 watch::Receiver<RecoveryProgress>, 2114 > = Default::default(); 2115 2116 let final_client = FinalClient::default(); 2117 2118 let root_secret = Self::federation_root_secret(&root_secret, &config); 2119 2120 let modules = { 2121 let mut modules = ClientModuleRegistry::default(); 2122 for (module_instance_id, module_config) in config.modules.clone() { 2123 let kind = module_config.kind().clone(); 2124 let Some(module_init) = self.module_inits.get(&kind).cloned() else { 2125 debug!("Module kind {kind} of instance {module_instance_id} not found in module gens, skipping"); 2126 continue; 2127 }; 2128 2129 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id) 2130 else { 2131 warn!("Module kind {kind} of instance {module_instance_id} has not compatible api version, skipping"); 2132 continue; 2133 }; 2134 2135 // since the exact logic of when to start recovery is a bit gnarly, 2136 // the recovery call is extracted here. 2137 let start_module_recover_fn = 2138 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| { 2139 let module_config = module_config.clone(); 2140 let num_peers = NumPeers::from(config.global.api_endpoints.len()); 2141 let db = db.clone(); 2142 let kind = kind.clone(); 2143 let notifier = notifier.clone(); 2144 let api = api.clone(); 2145 let root_secret = root_secret.clone(); 2146 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone()); 2147 let final_client = final_client.clone(); 2148 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress); 2149 let module_init = module_init.clone(); 2150 ( 2151 Box::pin(async move { 2152 module_init 2153 .recover( 2154 final_client.clone(), 2155 fed_id, 2156 num_peers, 2157 module_config.clone(), 2158 db.clone(), 2159 module_instance_id, 2160 common_api_versions.core, 2161 api_version, 2162 root_secret.derive_module_secret(module_instance_id), 2163 notifier.clone(), 2164 api.clone(), 2165 admin_auth, 2166 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id).to_owned()), 2167 progress_tx, 2168 ) 2169 .await 2170 .map_err(|err| { 2171 warn!( 2172 module_id = module_instance_id, %kind, %err, "Module failed to recover" 2173 ); 2174 err 2175 }) 2176 }), 2177 progress_rx, 2178 ) 2179 }; 2180 2181 let recovery = if let Some(snapshot) = init_state.does_require_recovery() { 2182 if let Some(module_recovery_state) = db 2183 .begin_transaction_nc() 2184 .await 2185 .get_value(&ClientModuleRecovery { module_instance_id }) 2186 .await 2187 { 2188 if module_recovery_state.is_done() { 2189 debug!( 2190 id = %module_instance_id, 2191 %kind, "Module recovery already complete" 2192 ); 2193 None 2194 } else { 2195 debug!( 2196 id = %module_instance_id, 2197 %kind, 2198 progress = %module_recovery_state.progress, 2199 "Starting module recovery with an existing progress" 2200 ); 2201 Some(start_module_recover_fn( 2202 snapshot, 2203 module_recovery_state.progress, 2204 )) 2205 } 2206 } else { 2207 debug!( 2208 id = %module_instance_id, 2209 %kind, "Starting new module recovery" 2210 ); 2211 Some(start_module_recover_fn(snapshot, RecoveryProgress::none())) 2212 } 2213 } else { 2214 None 2215 }; 2216 2217 if let Some((recovery, recovery_progress_rx)) = recovery { 2218 module_recoveries.insert(module_instance_id, recovery); 2219 module_recovery_progress_receivers 2220 .insert(module_instance_id, recovery_progress_rx); 2221 } else { 2222 let module = module_init 2223 .init( 2224 final_client.clone(), 2225 fed_id, 2226 config.global.api_endpoints.len(), 2227 module_config, 2228 db.clone(), 2229 module_instance_id, 2230 common_api_versions.core, 2231 api_version, 2232 // This is a divergence from the legacy client, where the child secret 2233 // keys were derived using *module kind*-specific derivation paths. 2234 // Since the new client has to support multiple, segregated modules of 2235 // the same kind we have to use the instance id instead. 2236 root_secret.derive_module_secret(module_instance_id), 2237 notifier.clone(), 2238 api.clone(), 2239 self.admin_creds.as_ref().map(|cred| cred.auth.clone()), 2240 task_group.clone(), 2241 ) 2242 .await?; 2243 2244 if primary_module_instance == module_instance_id 2245 && !module.supports_being_primary() 2246 { 2247 bail!("Module instance {primary_module_instance} of kind {kind} does not support being a primary module"); 2248 } 2249 2250 modules.register_module(module_instance_id, kind, module); 2251 } 2252 } 2253 modules 2254 }; 2255 2256 if init_state.is_pending() && module_recoveries.is_empty() { 2257 let mut dbtx = db.begin_transaction().await; 2258 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete()) 2259 .await; 2260 dbtx.commit_tx().await; 2261 } 2262 2263 let executor = { 2264 let mut executor_builder = Executor::builder(); 2265 executor_builder 2266 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext); 2267 2268 for (module_instance_id, _, module) in modules.iter_modules() { 2269 executor_builder.with_module_dyn(module.context(module_instance_id)); 2270 } 2271 2272 for (module_instance_id, _) in module_recoveries.iter() { 2273 executor_builder.with_valid_module_id(*module_instance_id); 2274 } 2275 2276 executor_builder 2277 .build(db.clone(), notifier, task_group.clone()) 2278 .await 2279 }; 2280 2281 let recovery_receiver_init_val = BTreeMap::from_iter( 2282 module_recovery_progress_receivers 2283 .iter() 2284 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow())), 2285 ); 2286 let (client_recovery_progress_sender, client_recovery_progress_receiver) = 2287 watch::channel(recovery_receiver_init_val); 2288 2289 let client_inner = Arc::new(Client { 2290 config: config.clone(), 2291 decoders, 2292 db: db.clone(), 2293 federation_id: fed_id, 2294 federation_meta: config.global.meta, 2295 primary_module_instance, 2296 modules, 2297 module_inits: self.module_inits.clone(), 2298 executor, 2299 api, 2300 secp_ctx: Secp256k1::new(), 2301 root_secret, 2302 task_group, 2303 operation_log: OperationLog::new(db), 2304 client_recovery_progress_receiver, 2305 meta_service: self.meta_service, 2306 }); 2307 client_inner 2308 .task_group 2309 .spawn_cancellable("MetaService::update_continuously", { 2310 let client_inner = client_inner.clone(); 2311 async move { 2312 client_inner 2313 .meta_service 2314 .update_continuously(&client_inner) 2315 .await 2316 } 2317 }); 2318 2319 let client_arc = ClientHandle::new(client_inner); 2320 2321 final_client.set(client_arc.downgrade()); 2322 2323 if !module_recoveries.is_empty() { 2324 client_arc 2325 .spawn_module_recoveries_task( 2326 client_recovery_progress_sender, 2327 module_recoveries, 2328 module_recovery_progress_receivers, 2329 ) 2330 .await; 2331 } 2332 2333 Ok(client_arc) 2334 } 2335 2336 async fn load_init_state(db: &Database) -> InitState { 2337 let mut dbtx = db.begin_transaction_nc().await; 2338 dbtx.get_value(&ClientInitStateKey) 2339 .await 2340 .unwrap_or_else(|| { 2341 // could be turned in a hard error in the future, but for now 2342 // no need to break backward compat. 2343 warn!("Client missing ClientRequiresRecovery: assuming complete"); 2344 db::InitState::Complete(db::InitModeComplete::Fresh) 2345 }) 2346 } 2347 2348 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry { 2349 let mut decoders = client_decoders( 2350 &self.module_inits, 2351 config 2352 .modules 2353 .iter() 2354 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())), 2355 ); 2356 2357 decoders.register_module( 2358 TRANSACTION_SUBMISSION_MODULE_INSTANCE, 2359 ModuleKind::from_static_str("tx_submission"), 2360 tx_submission_sm_decoder(), 2361 ); 2362 2363 decoders 2364 } 2365 2366 fn config_decoded( 2367 config: ClientConfig, 2368 decoders: &ModuleDecoderRegistry, 2369 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> { 2370 config.clone().redecode_raw(decoders) 2371 } 2372 2373 /// Re-derive client's root_secret using the federation ID. This eliminates 2374 /// the possibility of having the same client root_secret across 2375 /// multiple federations. 2376 fn federation_root_secret( 2377 root_secret: &DerivableSecret, 2378 config: &ClientConfig, 2379 ) -> DerivableSecret { 2380 root_secret.federation_key(&config.global.calculate_federation_id()) 2381 } 2382 } 2383 2384 /// Fetches the encoded client secret from the database and decodes it. 2385 /// If an encoded client secret is not present in the database, or if 2386 /// decoding fails, an error is returned. 2387 pub async fn get_decoded_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> { 2388 let mut tx = db.begin_transaction().await; 2389 let client_secret = tx.get_value(&EncodedClientSecretKey).await; 2390 tx.commit_tx().await; 2391 2392 match client_secret { 2393 Some(client_secret) => { 2394 T::consensus_decode(&mut client_secret.as_slice(), &Default::default()) 2395 .map_err(|e| anyhow!("Decoding failed: {e}")) 2396 } 2397 None => bail!("Encoded client secret not present in DB"), 2398 } 2399 } 2400 2401 pub fn client_decoders<'a>( 2402 registry: &ModuleInitRegistry<DynClientModuleInit>, 2403 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>, 2404 ) -> ModuleDecoderRegistry { 2405 let mut modules = BTreeMap::new(); 2406 for (id, kind) in module_kinds { 2407 let Some(init) = registry.get(kind) else { 2408 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}"); 2409 continue; 2410 }; 2411 2412 modules.insert( 2413 id, 2414 ( 2415 kind.clone(), 2416 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)), 2417 ), 2418 ); 2419 } 2420 ModuleDecoderRegistry::from(modules) 2421 }