db.rs
1 use std::collections::BTreeMap; 2 use std::io::Cursor; 3 use std::time::SystemTime; 4 5 use fedimint_api_client::api::ApiVersionSet; 6 use fedimint_core::config::{ClientConfig, FederationId}; 7 use fedimint_core::core::{ModuleInstanceId, OperationId}; 8 use fedimint_core::db::{ 9 create_database_version, Database, DatabaseTransaction, DatabaseValue, DatabaseVersion, 10 DatabaseVersionKey, IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped, 11 MODULE_GLOBAL_PREFIX, 12 }; 13 use fedimint_core::encoding::{Decodable, Encodable}; 14 use fedimint_core::module::registry::ModuleDecoderRegistry; 15 use fedimint_core::util::BoxFuture; 16 use fedimint_core::{impl_db_lookup, impl_db_record}; 17 use fedimint_logging::LOG_CLIENT_DB; 18 use futures::StreamExt; 19 use serde::Serialize; 20 use strum_macros::EnumIter; 21 use tracing::{debug, info, trace, warn}; 22 23 use crate::backup::{ClientBackup, Metadata}; 24 use crate::module::recovery::RecoveryProgress; 25 use crate::oplog::OperationLogEntry; 26 use crate::sm::executor::{ 27 ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes, 28 InactiveStateKeyPrefixBytes, 29 }; 30 use crate::sm::{ActiveStateMeta, InactiveStateMeta}; 31 32 #[repr(u8)] 33 #[derive(Clone, EnumIter, Debug)] 34 pub enum DbKeyPrefix { 35 EncodedClientSecret = 0x28, 36 ClientSecret = 0x29, // Unused 37 OperationLog = 0x2c, 38 ChronologicalOperationLog = 0x2d, 39 CommonApiVersionCache = 0x2e, 40 ClientConfig = 0x2f, 41 ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using! 42 ClientInitState = 0x31, 43 ClientMetadata = 0x32, 44 ClientLastBackup = 0x33, 45 ClientMetaField = 0x34, 46 ClientMetaServiceInfo = 0x35, 47 /// Arbitrary data of the applications integrating Fedimint client and 48 /// wanting to store some Federation-specific data in Fedimint client 49 /// database. 50 /// 51 /// New users are encouraged to use this single prefix only. 52 // 53 // TODO: https://github.com/fedimint/fedimint/issues/4444 54 // in the future, we should make all global access to the db private 55 // and only expose a getter returning isolated database. 56 UserData = 0xb0, 57 /// Prefixes between 0xb1..=0xcf shall all be considered allocated for 58 /// historical and future external use 59 ExternalReservedStart = 0xb1, 60 /// Prefixes between 0xb1..=0xcf shall all be considered allocated for 61 /// historical and future external use 62 ExternalReservedEnd = 0xcf, 63 /// 0xd0.. reserved for Fedimint internal use 64 InternalReservedStart = 0xd0, 65 /// Per-module instance data 66 ModuleGlobalPrefix = 0xff, 67 } 68 69 impl std::fmt::Display for DbKeyPrefix { 70 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 71 write!(f, "{self:?}") 72 } 73 } 74 75 #[derive(Debug, Encodable, Decodable)] 76 pub struct EncodedClientSecretKey; 77 78 #[derive(Debug, Encodable, Decodable)] 79 pub struct EncodedClientSecretKeyPrefix; 80 81 impl_db_record!( 82 key = EncodedClientSecretKey, 83 value = Vec<u8>, 84 db_prefix = DbKeyPrefix::EncodedClientSecret, 85 ); 86 impl_db_lookup!( 87 key = EncodedClientSecretKey, 88 query_prefix = EncodedClientSecretKeyPrefix 89 ); 90 91 #[derive(Debug, Encodable, Decodable, Serialize)] 92 pub struct OperationLogKey { 93 pub operation_id: OperationId, 94 } 95 96 impl_db_record!( 97 key = OperationLogKey, 98 value = OperationLogEntry, 99 db_prefix = DbKeyPrefix::OperationLog 100 ); 101 102 /// Key used to lookup operation log entries in chronological order 103 #[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)] 104 pub struct ChronologicalOperationLogKey { 105 pub creation_time: std::time::SystemTime, 106 pub operation_id: OperationId, 107 } 108 109 #[derive(Debug, Encodable)] 110 pub struct ChronologicalOperationLogKeyPrefix; 111 112 impl_db_record!( 113 key = ChronologicalOperationLogKey, 114 value = (), 115 db_prefix = DbKeyPrefix::ChronologicalOperationLog 116 ); 117 118 impl_db_lookup!( 119 key = ChronologicalOperationLogKey, 120 query_prefix = ChronologicalOperationLogKeyPrefix 121 ); 122 123 #[derive(Debug, Encodable, Decodable)] 124 pub struct CachedApiVersionSetKey; 125 126 #[derive(Debug, Encodable, Decodable)] 127 pub struct CachedApiVersionSet(pub ApiVersionSet); 128 129 impl_db_record!( 130 key = CachedApiVersionSetKey, 131 value = CachedApiVersionSet, 132 db_prefix = DbKeyPrefix::CommonApiVersionCache 133 ); 134 135 #[derive(Debug, Encodable, Decodable, Serialize)] 136 pub struct ClientConfigKey { 137 pub id: FederationId, 138 } 139 140 #[derive(Debug, Encodable)] 141 pub struct ClientConfigKeyPrefix; 142 143 impl_db_record!( 144 key = ClientConfigKey, 145 value = ClientConfig, 146 db_prefix = DbKeyPrefix::ClientConfig 147 ); 148 149 impl_db_lookup!(key = ClientConfigKey, query_prefix = ClientConfigKeyPrefix); 150 151 /// Client metadata that will be stored/restored on backup&recovery 152 #[derive(Debug, Encodable, Decodable, Serialize)] 153 pub struct ClientMetadataKey; 154 155 #[derive(Debug, Encodable)] 156 pub struct ClientMetadataPrefix; 157 158 impl_db_record!( 159 key = ClientMetadataKey, 160 value = Metadata, 161 db_prefix = DbKeyPrefix::ClientMetadata 162 ); 163 164 impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix); 165 166 /// Does the client modules need to run recovery before being usable? 167 #[derive(Debug, Encodable, Decodable, Serialize)] 168 pub struct ClientInitStateKey; 169 170 #[derive(Debug, Encodable)] 171 pub struct ClientInitStatePrefix; 172 173 /// Client initialization mode 174 #[derive(Debug, Encodable, Decodable)] 175 pub enum InitMode { 176 /// Should only be used with freshly generated root secret 177 Fresh, 178 /// Should be used with root secrets provided by the user to recover a 179 /// (even if just possibly) already used secret. 180 Recover { snapshot: Option<ClientBackup> }, 181 } 182 183 /// Like `InitMode`, but without no longer required data. 184 /// 185 /// This is distinct from `InitMode` to prevent holding on to `snapshot` 186 /// forever both for user's privacy and space use. In case user get hacked 187 /// or phone gets stolen. 188 #[derive(Debug, Encodable, Decodable)] 189 pub enum InitModeComplete { 190 Fresh, 191 Recover, 192 } 193 194 /// The state of the client initialization 195 #[derive(Debug, Encodable, Decodable)] 196 pub enum InitState { 197 /// Client data initialization might still require some work (e.g. client 198 /// recovery) 199 Pending(InitMode), 200 /// Client initialization was complete 201 Complete(InitModeComplete), 202 } 203 204 impl InitState { 205 pub fn into_complete(self) -> Self { 206 match self { 207 InitState::Pending(p) => InitState::Complete(match p { 208 InitMode::Fresh => InitModeComplete::Fresh, 209 InitMode::Recover { .. } => InitModeComplete::Recover, 210 }), 211 InitState::Complete(t) => InitState::Complete(t), 212 } 213 } 214 215 pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> { 216 match self { 217 InitState::Pending(p) => match p { 218 InitMode::Fresh => None, 219 InitMode::Recover { snapshot } => Some(snapshot.clone()), 220 }, 221 InitState::Complete(_) => None, 222 } 223 } 224 225 pub fn is_pending(&self) -> bool { 226 match self { 227 InitState::Pending(_) => true, 228 InitState::Complete(_) => false, 229 } 230 } 231 } 232 233 impl_db_record!( 234 key = ClientInitStateKey, 235 value = InitState, 236 db_prefix = DbKeyPrefix::ClientInitState 237 ); 238 239 impl_db_lookup!( 240 key = ClientInitStateKey, 241 query_prefix = ClientInitStatePrefix 242 ); 243 244 #[derive(Debug, Encodable, Decodable, Serialize)] 245 pub struct ClientRecoverySnapshot; 246 247 #[derive(Debug, Encodable, Decodable, Serialize)] 248 pub struct ClientRecoverySnapshotPrefix; 249 250 impl_db_record!( 251 key = ClientRecoverySnapshot, 252 value = Option<ClientBackup>, 253 db_prefix = DbKeyPrefix::ClientInitState 254 ); 255 256 impl_db_lookup!( 257 key = ClientRecoverySnapshot, 258 query_prefix = ClientRecoverySnapshotPrefix 259 ); 260 261 #[derive(Debug, Encodable, Decodable, Serialize)] 262 pub struct ClientModuleRecovery { 263 pub module_instance_id: ModuleInstanceId, 264 } 265 266 #[derive(Debug, Encodable)] 267 pub struct ClientModuleRecoveryPrefix; 268 269 #[derive(Debug, Clone, Encodable, Decodable)] 270 pub struct ClientModuleRecoveryState { 271 pub progress: RecoveryProgress, 272 } 273 274 impl ClientModuleRecoveryState { 275 pub fn is_done(&self) -> bool { 276 self.progress.is_done() 277 } 278 } 279 280 impl_db_record!( 281 key = ClientModuleRecovery, 282 value = ClientModuleRecoveryState, 283 db_prefix = DbKeyPrefix::ClientInitState, 284 ); 285 286 impl_db_lookup!( 287 key = ClientModuleRecovery, 288 query_prefix = ClientModuleRecoveryPrefix 289 ); 290 291 /// Last valid backup the client attempted to make 292 /// 293 /// Can be used to find previous valid versions of 294 /// module backup. 295 #[derive(Debug, Encodable, Decodable)] 296 pub struct LastBackupKey; 297 298 impl_db_record!( 299 key = LastBackupKey, 300 value = ClientBackup, 301 db_prefix = DbKeyPrefix::ClientLastBackup 302 ); 303 304 #[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] 305 pub struct MetaFieldKey(pub String); 306 307 #[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] 308 pub struct MetaFieldPrefix; 309 310 #[derive(Encodable, Decodable, Debug, Clone)] 311 pub struct MetaFieldValue(pub String); 312 313 #[derive(Encodable, Decodable, Debug)] 314 pub struct MetaServiceInfoKey; 315 316 #[derive(Encodable, Decodable, Debug)] 317 pub struct MetaServiceInfo { 318 pub last_updated: SystemTime, 319 pub revision: u64, 320 } 321 322 impl_db_record!( 323 key = MetaFieldKey, 324 value = MetaFieldValue, 325 db_prefix = DbKeyPrefix::ClientMetaField 326 ); 327 328 impl_db_record!( 329 key = MetaServiceInfoKey, 330 value = MetaServiceInfo, 331 db_prefix = DbKeyPrefix::ClientMetaServiceInfo 332 ); 333 334 impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix); 335 336 /// `ClientMigrationFn` is a function that modules can implement to "migrate" 337 /// the database to the next database version. 338 pub type ClientMigrationFn = for<'r, 'tx> fn( 339 &'r mut DatabaseTransaction<'tx>, 340 Vec<(Vec<u8>, OperationId)>, // active states 341 Vec<(Vec<u8>, OperationId)>, // inactive states 342 ) -> BoxFuture< 343 'r, 344 anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>, 345 >; 346 347 /// `apply_migrations_client` iterates from the on disk database version for the 348 /// client module up to `target_db_version` and executes all of the migrations 349 /// that exist in the migrations map, including state machine migrations. 350 /// Each migration in the migrations map updates the database to have the 351 /// correct on-disk data structures that the code is expecting. The entire 352 /// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically). 353 /// This function is called before the module is initialized and as long as the 354 /// correct migrations are supplied in the migrations map, the module 355 /// will be able to read and write from the database successfully. 356 pub async fn apply_migrations_client( 357 db: &Database, 358 kind: String, 359 target_version: DatabaseVersion, 360 migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>, 361 module_instance_id: ModuleInstanceId, 362 ) -> Result<(), anyhow::Error> { 363 // Newly created databases will not have any data underneath the 364 // `MODULE_GLOBAL_PREFIX` since they have just been instantiated. 365 let mut dbtx = db.begin_transaction_nc().await; 366 let is_new_db = dbtx 367 .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX]) 368 .await? 369 .next() 370 .await 371 .is_none(); 372 373 // First write the database version to disk if it does not exist. 374 create_database_version( 375 db, 376 target_version, 377 Some(module_instance_id), 378 kind.clone(), 379 is_new_db, 380 ) 381 .await?; 382 383 let mut global_dbtx = db.begin_transaction().await; 384 let current_version = global_dbtx 385 .get_value(&DatabaseVersionKey(module_instance_id)) 386 .await; 387 388 let db_version = if let Some(mut current_version) = current_version { 389 if current_version == target_version { 390 trace!( 391 target: LOG_CLIENT_DB, 392 %current_version, 393 %target_version, 394 module_instance_id, 395 kind, 396 "Database version up to date" 397 ); 398 global_dbtx.ignore_uncommitted(); 399 return Ok(()); 400 } 401 402 if target_version < current_version { 403 return Err(anyhow::anyhow!(format!( 404 "On disk database version for module {kind} was higher ({}) than the target database version ({}).", 405 current_version, 406 target_version, 407 ))); 408 } 409 410 info!( 411 target: LOG_CLIENT_DB, 412 %current_version, 413 %target_version, 414 module_instance_id, 415 kind, 416 "Migrating client module database" 417 ); 418 let mut active_states = 419 get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await; 420 let mut inactive_states = 421 get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await; 422 423 while current_version < target_version { 424 let new_states = if let Some(migration) = migrations.get(¤t_version) { 425 debug!( 426 target: LOG_CLIENT_DB, 427 module_instance_id, 428 %kind, 429 %current_version, 430 %target_version, 431 "Running module db migration"); 432 433 migration( 434 &mut global_dbtx 435 .to_ref_with_prefix_module_id(module_instance_id) 436 .into_nc(), 437 active_states.clone(), 438 inactive_states.clone(), 439 ) 440 .await? 441 } else { 442 warn!( 443 target: LOG_CLIENT_DB, 444 ?current_version, "Missing client db migration"); 445 None 446 }; 447 448 // If the client migration returned new states, a state machine migration has 449 // occurred, and the new states need to be persisted to the database. 450 if let Some((new_active_states, new_inactive_states)) = new_states { 451 remove_old_and_persist_new_active_states( 452 &mut global_dbtx.to_ref_nc(), 453 new_active_states.clone(), 454 active_states.clone(), 455 module_instance_id, 456 ) 457 .await; 458 remove_old_and_persist_new_inactive_states( 459 &mut global_dbtx.to_ref_nc(), 460 new_inactive_states.clone(), 461 inactive_states.clone(), 462 module_instance_id, 463 ) 464 .await; 465 466 // the new states become the old states for the next migration 467 active_states = new_active_states; 468 inactive_states = new_inactive_states; 469 } 470 471 current_version.increment(); 472 global_dbtx 473 .insert_entry(&DatabaseVersionKey(module_instance_id), ¤t_version) 474 .await; 475 } 476 477 current_version 478 } else { 479 target_version 480 }; 481 482 global_dbtx.commit_tx_result().await?; 483 info!( 484 target: LOG_CLIENT_DB, 485 ?kind, ?db_version, "Migration complete"); 486 Ok(()) 487 } 488 489 /// Reads all active states from the database and returns `Vec<DynState>`. 490 /// TODO: It is unfortunate that we can't read states by the module's instance 491 /// id so we are forced to return all active states. Once we do a db migration 492 /// to add `module_instance_id` to `ActiveStateKey`, this can be improved to 493 /// only read the module's relevant states. 494 pub async fn get_active_states( 495 dbtx: &mut DatabaseTransaction<'_>, 496 module_instance_id: ModuleInstanceId, 497 ) -> Vec<(Vec<u8>, OperationId)> { 498 dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes) 499 .await 500 .filter_map(|(state, _)| async move { 501 if module_instance_id == state.module_instance_id { 502 Some((state.state, state.operation_id)) 503 } else { 504 None 505 } 506 }) 507 .collect::<Vec<_>>() 508 .await 509 } 510 511 /// Reads all inactive states from the database and returns `Vec<DynState>`. 512 /// TODO: It is unfortunate that we can't read states by the module's instance 513 /// id so we are forced to return all inactive states. Once we do a db migration 514 /// to add `module_instance_id` to `InactiveStateKey`, this can be improved to 515 /// only read the module's relevant states. 516 pub async fn get_inactive_states( 517 dbtx: &mut DatabaseTransaction<'_>, 518 module_instance_id: ModuleInstanceId, 519 ) -> Vec<(Vec<u8>, OperationId)> { 520 dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes) 521 .await 522 .filter_map(|(state, _)| async move { 523 if module_instance_id == state.module_instance_id { 524 Some((state.state, state.operation_id)) 525 } else { 526 None 527 } 528 }) 529 .collect::<Vec<_>>() 530 .await 531 } 532 533 /// Persists new active states by first removing all current active states, and 534 /// re-writing with the new set of active states. `new_active_states` is 535 /// expected to contain all active states, not just the newly created states. 536 pub async fn remove_old_and_persist_new_active_states( 537 dbtx: &mut DatabaseTransaction<'_>, 538 new_active_states: Vec<(Vec<u8>, OperationId)>, 539 states_to_remove: Vec<(Vec<u8>, OperationId)>, 540 module_instance_id: ModuleInstanceId, 541 ) { 542 // Remove all existing active states 543 for (bytes, operation_id) in states_to_remove { 544 dbtx.remove_entry(&ActiveStateKeyBytes { 545 operation_id, 546 module_instance_id, 547 state: bytes, 548 }) 549 .await 550 .expect("Did not delete anything"); 551 } 552 553 // Insert new "migrated" active states 554 for (bytes, operation_id) in new_active_states { 555 dbtx.insert_new_entry( 556 &ActiveStateKeyBytes { 557 operation_id, 558 module_instance_id, 559 state: bytes, 560 }, 561 &ActiveStateMeta::default(), 562 ) 563 .await; 564 } 565 } 566 567 /// Persists new inactive states by first removing all current inactive states, 568 /// and re-writing with the new set of inactive states. `new_inactive_states` is 569 /// expected to contain all inactive states, not just the newly created states. 570 pub async fn remove_old_and_persist_new_inactive_states( 571 dbtx: &mut DatabaseTransaction<'_>, 572 new_inactive_states: Vec<(Vec<u8>, OperationId)>, 573 states_to_remove: Vec<(Vec<u8>, OperationId)>, 574 module_instance_id: ModuleInstanceId, 575 ) { 576 // Remove all existing active states 577 for (bytes, operation_id) in states_to_remove { 578 dbtx.remove_entry(&InactiveStateKeyBytes { 579 operation_id, 580 module_instance_id, 581 state: bytes, 582 }) 583 .await 584 .expect("Did not delete anything"); 585 } 586 587 // Insert new "migrated" inactive states 588 for (bytes, operation_id) in new_inactive_states { 589 dbtx.insert_new_entry( 590 &InactiveStateKeyBytes { 591 operation_id, 592 module_instance_id, 593 state: bytes, 594 }, 595 &InactiveStateMeta { 596 created_at: fedimint_core::time::now(), 597 exited_at: fedimint_core::time::now(), 598 }, 599 ) 600 .await; 601 } 602 } 603 604 /// Helper function definition for migrating a single state. 605 type MigrateStateFn = 606 fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>; 607 608 /// Migrates a particular state by looping over all active and inactive states. 609 /// If the `migrate` closure returns `None`, this state was not migrated and 610 /// should be added to the new state machine vectors. 611 pub async fn migrate_state( 612 active_states: Vec<(Vec<u8>, OperationId)>, 613 inactive_states: Vec<(Vec<u8>, OperationId)>, 614 migrate: MigrateStateFn, 615 ) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> { 616 let mut new_active_states = Vec::with_capacity(active_states.len()); 617 for (active_state, operation_id) in active_states { 618 let bytes = active_state.as_slice(); 619 620 let decoders = ModuleDecoderRegistry::default(); 621 let mut cursor = std::io::Cursor::new(bytes); 622 let module_instance_id = 623 fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?; 624 625 let state = match migrate(operation_id, &mut cursor)? { 626 Some((mut state, operation_id)) => { 627 let mut final_state = module_instance_id.to_bytes(); 628 final_state.append(&mut state); 629 (final_state, operation_id) 630 } 631 None => (active_state, operation_id), 632 }; 633 634 new_active_states.push(state); 635 } 636 637 let mut new_inactive_states = Vec::with_capacity(inactive_states.len()); 638 for (inactive_state, operation_id) in inactive_states { 639 let bytes = inactive_state.as_slice(); 640 641 let decoders = ModuleDecoderRegistry::default(); 642 let mut cursor = std::io::Cursor::new(bytes); 643 let module_instance_id = 644 fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?; 645 646 let state = match migrate(operation_id, &mut cursor)? { 647 Some((mut state, operation_id)) => { 648 let mut final_state = module_instance_id.to_bytes(); 649 final_state.append(&mut state); 650 (final_state, operation_id) 651 } 652 None => (inactive_state, operation_id), 653 }; 654 655 new_inactive_states.push(state); 656 } 657 658 Ok(Some((new_active_states, new_inactive_states))) 659 }