db.rs
  1  use std::collections::BTreeMap;
  2  use std::io::Cursor;
  3  use std::time::SystemTime;
  4  
  5  use fedimint_api_client::api::ApiVersionSet;
  6  use fedimint_core::config::{ClientConfig, FederationId};
  7  use fedimint_core::core::{ModuleInstanceId, OperationId};
  8  use fedimint_core::db::{
  9      create_database_version, Database, DatabaseTransaction, DatabaseValue, DatabaseVersion,
 10      DatabaseVersionKey, IDatabaseTransactionOpsCore, IDatabaseTransactionOpsCoreTyped,
 11      MODULE_GLOBAL_PREFIX,
 12  };
 13  use fedimint_core::encoding::{Decodable, Encodable};
 14  use fedimint_core::module::registry::ModuleDecoderRegistry;
 15  use fedimint_core::util::BoxFuture;
 16  use fedimint_core::{impl_db_lookup, impl_db_record};
 17  use fedimint_logging::LOG_CLIENT_DB;
 18  use futures::StreamExt;
 19  use serde::Serialize;
 20  use strum_macros::EnumIter;
 21  use tracing::{debug, info, trace, warn};
 22  
 23  use crate::backup::{ClientBackup, Metadata};
 24  use crate::module::recovery::RecoveryProgress;
 25  use crate::oplog::OperationLogEntry;
 26  use crate::sm::executor::{
 27      ActiveStateKeyBytes, ActiveStateKeyPrefixBytes, InactiveStateKeyBytes,
 28      InactiveStateKeyPrefixBytes,
 29  };
 30  use crate::sm::{ActiveStateMeta, InactiveStateMeta};
 31  
 32  #[repr(u8)]
 33  #[derive(Clone, EnumIter, Debug)]
 34  pub enum DbKeyPrefix {
 35      EncodedClientSecret = 0x28,
 36      ClientSecret = 0x29, // Unused
 37      OperationLog = 0x2c,
 38      ChronologicalOperationLog = 0x2d,
 39      CommonApiVersionCache = 0x2e,
 40      ClientConfig = 0x2f,
 41      ClientInviteCode = 0x30, // Unused; clean out remnant data before re-using!
 42      ClientInitState = 0x31,
 43      ClientMetadata = 0x32,
 44      ClientLastBackup = 0x33,
 45      ClientMetaField = 0x34,
 46      ClientMetaServiceInfo = 0x35,
 47      /// Arbitrary data of the applications integrating Fedimint client and
 48      /// wanting to store some Federation-specific data in Fedimint client
 49      /// database.
 50      ///
 51      /// New users are encouraged to use this single prefix only.
 52      //
 53      // TODO: https://github.com/fedimint/fedimint/issues/4444
 54      //       in the future, we should make all global access to the db private
 55      //       and only expose a getter returning isolated database.
 56      UserData = 0xb0,
 57      /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
 58      /// historical and future external use
 59      ExternalReservedStart = 0xb1,
 60      /// Prefixes between 0xb1..=0xcf shall all be considered allocated for
 61      /// historical and future external use
 62      ExternalReservedEnd = 0xcf,
 63      /// 0xd0.. reserved for Fedimint internal use
 64      InternalReservedStart = 0xd0,
 65      /// Per-module instance data
 66      ModuleGlobalPrefix = 0xff,
 67  }
 68  
 69  impl std::fmt::Display for DbKeyPrefix {
 70      fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
 71          write!(f, "{self:?}")
 72      }
 73  }
 74  
 75  #[derive(Debug, Encodable, Decodable)]
 76  pub struct EncodedClientSecretKey;
 77  
 78  #[derive(Debug, Encodable, Decodable)]
 79  pub struct EncodedClientSecretKeyPrefix;
 80  
 81  impl_db_record!(
 82      key = EncodedClientSecretKey,
 83      value = Vec<u8>,
 84      db_prefix = DbKeyPrefix::EncodedClientSecret,
 85  );
 86  impl_db_lookup!(
 87      key = EncodedClientSecretKey,
 88      query_prefix = EncodedClientSecretKeyPrefix
 89  );
 90  
 91  #[derive(Debug, Encodable, Decodable, Serialize)]
 92  pub struct OperationLogKey {
 93      pub operation_id: OperationId,
 94  }
 95  
 96  impl_db_record!(
 97      key = OperationLogKey,
 98      value = OperationLogEntry,
 99      db_prefix = DbKeyPrefix::OperationLog
100  );
101  
102  /// Key used to lookup operation log entries in chronological order
103  #[derive(Debug, Clone, Copy, Encodable, Decodable, Serialize)]
104  pub struct ChronologicalOperationLogKey {
105      pub creation_time: std::time::SystemTime,
106      pub operation_id: OperationId,
107  }
108  
109  #[derive(Debug, Encodable)]
110  pub struct ChronologicalOperationLogKeyPrefix;
111  
112  impl_db_record!(
113      key = ChronologicalOperationLogKey,
114      value = (),
115      db_prefix = DbKeyPrefix::ChronologicalOperationLog
116  );
117  
118  impl_db_lookup!(
119      key = ChronologicalOperationLogKey,
120      query_prefix = ChronologicalOperationLogKeyPrefix
121  );
122  
123  #[derive(Debug, Encodable, Decodable)]
124  pub struct CachedApiVersionSetKey;
125  
126  #[derive(Debug, Encodable, Decodable)]
127  pub struct CachedApiVersionSet(pub ApiVersionSet);
128  
129  impl_db_record!(
130      key = CachedApiVersionSetKey,
131      value = CachedApiVersionSet,
132      db_prefix = DbKeyPrefix::CommonApiVersionCache
133  );
134  
135  #[derive(Debug, Encodable, Decodable, Serialize)]
136  pub struct ClientConfigKey {
137      pub id: FederationId,
138  }
139  
140  #[derive(Debug, Encodable)]
141  pub struct ClientConfigKeyPrefix;
142  
143  impl_db_record!(
144      key = ClientConfigKey,
145      value = ClientConfig,
146      db_prefix = DbKeyPrefix::ClientConfig
147  );
148  
149  impl_db_lookup!(key = ClientConfigKey, query_prefix = ClientConfigKeyPrefix);
150  
151  /// Client metadata that will be stored/restored on backup&recovery
152  #[derive(Debug, Encodable, Decodable, Serialize)]
153  pub struct ClientMetadataKey;
154  
155  #[derive(Debug, Encodable)]
156  pub struct ClientMetadataPrefix;
157  
158  impl_db_record!(
159      key = ClientMetadataKey,
160      value = Metadata,
161      db_prefix = DbKeyPrefix::ClientMetadata
162  );
163  
164  impl_db_lookup!(key = ClientMetadataKey, query_prefix = ClientMetadataPrefix);
165  
166  /// Does the client modules need to run recovery before being usable?
167  #[derive(Debug, Encodable, Decodable, Serialize)]
168  pub struct ClientInitStateKey;
169  
170  #[derive(Debug, Encodable)]
171  pub struct ClientInitStatePrefix;
172  
173  /// Client initialization mode
174  #[derive(Debug, Encodable, Decodable)]
175  pub enum InitMode {
176      /// Should only be used with freshly generated root secret
177      Fresh,
178      /// Should be used with root secrets provided by the user to recover a
179      /// (even if just possibly) already used secret.
180      Recover { snapshot: Option<ClientBackup> },
181  }
182  
183  /// Like `InitMode`, but without no longer required data.
184  ///
185  /// This is distinct from `InitMode` to prevent holding on to `snapshot`
186  /// forever both for user's privacy and space use. In case user get hacked
187  /// or phone gets stolen.
188  #[derive(Debug, Encodable, Decodable)]
189  pub enum InitModeComplete {
190      Fresh,
191      Recover,
192  }
193  
194  /// The state of the client initialization
195  #[derive(Debug, Encodable, Decodable)]
196  pub enum InitState {
197      /// Client data initialization might still require some work (e.g. client
198      /// recovery)
199      Pending(InitMode),
200      /// Client initialization was complete
201      Complete(InitModeComplete),
202  }
203  
204  impl InitState {
205      pub fn into_complete(self) -> Self {
206          match self {
207              InitState::Pending(p) => InitState::Complete(match p {
208                  InitMode::Fresh => InitModeComplete::Fresh,
209                  InitMode::Recover { .. } => InitModeComplete::Recover,
210              }),
211              InitState::Complete(t) => InitState::Complete(t),
212          }
213      }
214  
215      pub fn does_require_recovery(&self) -> Option<Option<ClientBackup>> {
216          match self {
217              InitState::Pending(p) => match p {
218                  InitMode::Fresh => None,
219                  InitMode::Recover { snapshot } => Some(snapshot.clone()),
220              },
221              InitState::Complete(_) => None,
222          }
223      }
224  
225      pub fn is_pending(&self) -> bool {
226          match self {
227              InitState::Pending(_) => true,
228              InitState::Complete(_) => false,
229          }
230      }
231  }
232  
233  impl_db_record!(
234      key = ClientInitStateKey,
235      value = InitState,
236      db_prefix = DbKeyPrefix::ClientInitState
237  );
238  
239  impl_db_lookup!(
240      key = ClientInitStateKey,
241      query_prefix = ClientInitStatePrefix
242  );
243  
244  #[derive(Debug, Encodable, Decodable, Serialize)]
245  pub struct ClientRecoverySnapshot;
246  
247  #[derive(Debug, Encodable, Decodable, Serialize)]
248  pub struct ClientRecoverySnapshotPrefix;
249  
250  impl_db_record!(
251      key = ClientRecoverySnapshot,
252      value = Option<ClientBackup>,
253      db_prefix = DbKeyPrefix::ClientInitState
254  );
255  
256  impl_db_lookup!(
257      key = ClientRecoverySnapshot,
258      query_prefix = ClientRecoverySnapshotPrefix
259  );
260  
261  #[derive(Debug, Encodable, Decodable, Serialize)]
262  pub struct ClientModuleRecovery {
263      pub module_instance_id: ModuleInstanceId,
264  }
265  
266  #[derive(Debug, Encodable)]
267  pub struct ClientModuleRecoveryPrefix;
268  
269  #[derive(Debug, Clone, Encodable, Decodable)]
270  pub struct ClientModuleRecoveryState {
271      pub progress: RecoveryProgress,
272  }
273  
274  impl ClientModuleRecoveryState {
275      pub fn is_done(&self) -> bool {
276          self.progress.is_done()
277      }
278  }
279  
280  impl_db_record!(
281      key = ClientModuleRecovery,
282      value = ClientModuleRecoveryState,
283      db_prefix = DbKeyPrefix::ClientInitState,
284  );
285  
286  impl_db_lookup!(
287      key = ClientModuleRecovery,
288      query_prefix = ClientModuleRecoveryPrefix
289  );
290  
291  /// Last valid backup the client attempted to make
292  ///
293  /// Can be used to find previous valid versions of
294  /// module backup.
295  #[derive(Debug, Encodable, Decodable)]
296  pub struct LastBackupKey;
297  
298  impl_db_record!(
299      key = LastBackupKey,
300      value = ClientBackup,
301      db_prefix = DbKeyPrefix::ClientLastBackup
302  );
303  
304  #[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
305  pub struct MetaFieldKey(pub String);
306  
307  #[derive(Encodable, Decodable, Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
308  pub struct MetaFieldPrefix;
309  
310  #[derive(Encodable, Decodable, Debug, Clone)]
311  pub struct MetaFieldValue(pub String);
312  
313  #[derive(Encodable, Decodable, Debug)]
314  pub struct MetaServiceInfoKey;
315  
316  #[derive(Encodable, Decodable, Debug)]
317  pub struct MetaServiceInfo {
318      pub last_updated: SystemTime,
319      pub revision: u64,
320  }
321  
322  impl_db_record!(
323      key = MetaFieldKey,
324      value = MetaFieldValue,
325      db_prefix = DbKeyPrefix::ClientMetaField
326  );
327  
328  impl_db_record!(
329      key = MetaServiceInfoKey,
330      value = MetaServiceInfo,
331      db_prefix = DbKeyPrefix::ClientMetaServiceInfo
332  );
333  
334  impl_db_lookup!(key = MetaFieldKey, query_prefix = MetaFieldPrefix);
335  
336  /// `ClientMigrationFn` is a function that modules can implement to "migrate"
337  /// the database to the next database version.
338  pub type ClientMigrationFn = for<'r, 'tx> fn(
339      &'r mut DatabaseTransaction<'tx>,
340      Vec<(Vec<u8>, OperationId)>, // active states
341      Vec<(Vec<u8>, OperationId)>, // inactive states
342  ) -> BoxFuture<
343      'r,
344      anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>>,
345  >;
346  
347  /// `apply_migrations_client` iterates from the on disk database version for the
348  /// client module up to `target_db_version` and executes all of the migrations
349  /// that exist in the migrations map, including state machine migrations.
350  /// Each migration in the migrations map updates the database to have the
351  /// correct on-disk data structures that the code is expecting. The entire
352  /// process is atomic, (i.e migration from 0->1 and 1->2 happen atomically).
353  /// This function is called before the module is initialized and as long as the
354  /// correct migrations are supplied in the migrations map, the module
355  /// will be able to read and write from the database successfully.
356  pub async fn apply_migrations_client(
357      db: &Database,
358      kind: String,
359      target_version: DatabaseVersion,
360      migrations: BTreeMap<DatabaseVersion, ClientMigrationFn>,
361      module_instance_id: ModuleInstanceId,
362  ) -> Result<(), anyhow::Error> {
363      // Newly created databases will not have any data underneath the
364      // `MODULE_GLOBAL_PREFIX` since they have just been instantiated.
365      let mut dbtx = db.begin_transaction_nc().await;
366      let is_new_db = dbtx
367          .raw_find_by_prefix(&[MODULE_GLOBAL_PREFIX])
368          .await?
369          .next()
370          .await
371          .is_none();
372  
373      // First write the database version to disk if it does not exist.
374      create_database_version(
375          db,
376          target_version,
377          Some(module_instance_id),
378          kind.clone(),
379          is_new_db,
380      )
381      .await?;
382  
383      let mut global_dbtx = db.begin_transaction().await;
384      let current_version = global_dbtx
385          .get_value(&DatabaseVersionKey(module_instance_id))
386          .await;
387  
388      let db_version = if let Some(mut current_version) = current_version {
389          if current_version == target_version {
390              trace!(
391                  target: LOG_CLIENT_DB,
392                  %current_version,
393                  %target_version,
394                  module_instance_id,
395                  kind,
396                  "Database version up to date"
397              );
398              global_dbtx.ignore_uncommitted();
399              return Ok(());
400          }
401  
402          if target_version < current_version {
403              return Err(anyhow::anyhow!(format!(
404                  "On disk database version for module {kind} was higher ({}) than the target database version ({}).",
405                  current_version,
406                  target_version,
407              )));
408          }
409  
410          info!(
411              target: LOG_CLIENT_DB,
412              %current_version,
413              %target_version,
414              module_instance_id,
415              kind,
416              "Migrating client module database"
417          );
418          let mut active_states =
419              get_active_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
420          let mut inactive_states =
421              get_inactive_states(&mut global_dbtx.to_ref_nc(), module_instance_id).await;
422  
423          while current_version < target_version {
424              let new_states = if let Some(migration) = migrations.get(&current_version) {
425                  debug!(
426                       target: LOG_CLIENT_DB,
427                       module_instance_id,
428                       %kind,
429                       %current_version,
430                       %target_version,
431                       "Running module db migration");
432  
433                  migration(
434                      &mut global_dbtx
435                          .to_ref_with_prefix_module_id(module_instance_id)
436                          .into_nc(),
437                      active_states.clone(),
438                      inactive_states.clone(),
439                  )
440                  .await?
441              } else {
442                  warn!(
443                      target: LOG_CLIENT_DB,
444                      ?current_version, "Missing client db migration");
445                  None
446              };
447  
448              // If the client migration returned new states, a state machine migration has
449              // occurred, and the new states need to be persisted to the database.
450              if let Some((new_active_states, new_inactive_states)) = new_states {
451                  remove_old_and_persist_new_active_states(
452                      &mut global_dbtx.to_ref_nc(),
453                      new_active_states.clone(),
454                      active_states.clone(),
455                      module_instance_id,
456                  )
457                  .await;
458                  remove_old_and_persist_new_inactive_states(
459                      &mut global_dbtx.to_ref_nc(),
460                      new_inactive_states.clone(),
461                      inactive_states.clone(),
462                      module_instance_id,
463                  )
464                  .await;
465  
466                  // the new states become the old states for the next migration
467                  active_states = new_active_states;
468                  inactive_states = new_inactive_states;
469              }
470  
471              current_version.increment();
472              global_dbtx
473                  .insert_entry(&DatabaseVersionKey(module_instance_id), &current_version)
474                  .await;
475          }
476  
477          current_version
478      } else {
479          target_version
480      };
481  
482      global_dbtx.commit_tx_result().await?;
483      info!(
484          target: LOG_CLIENT_DB,
485          ?kind, ?db_version, "Migration complete");
486      Ok(())
487  }
488  
489  /// Reads all active states from the database and returns `Vec<DynState>`.
490  /// TODO: It is unfortunate that we can't read states by the module's instance
491  /// id so we are forced to return all active states. Once we do a db migration
492  /// to add `module_instance_id` to `ActiveStateKey`, this can be improved to
493  /// only read the module's relevant states.
494  pub async fn get_active_states(
495      dbtx: &mut DatabaseTransaction<'_>,
496      module_instance_id: ModuleInstanceId,
497  ) -> Vec<(Vec<u8>, OperationId)> {
498      dbtx.find_by_prefix(&ActiveStateKeyPrefixBytes)
499          .await
500          .filter_map(|(state, _)| async move {
501              if module_instance_id == state.module_instance_id {
502                  Some((state.state, state.operation_id))
503              } else {
504                  None
505              }
506          })
507          .collect::<Vec<_>>()
508          .await
509  }
510  
511  /// Reads all inactive states from the database and returns `Vec<DynState>`.
512  /// TODO: It is unfortunate that we can't read states by the module's instance
513  /// id so we are forced to return all inactive states. Once we do a db migration
514  /// to add `module_instance_id` to `InactiveStateKey`, this can be improved to
515  /// only read the module's relevant states.
516  pub async fn get_inactive_states(
517      dbtx: &mut DatabaseTransaction<'_>,
518      module_instance_id: ModuleInstanceId,
519  ) -> Vec<(Vec<u8>, OperationId)> {
520      dbtx.find_by_prefix(&InactiveStateKeyPrefixBytes)
521          .await
522          .filter_map(|(state, _)| async move {
523              if module_instance_id == state.module_instance_id {
524                  Some((state.state, state.operation_id))
525              } else {
526                  None
527              }
528          })
529          .collect::<Vec<_>>()
530          .await
531  }
532  
533  /// Persists new active states by first removing all current active states, and
534  /// re-writing with the new set of active states. `new_active_states` is
535  /// expected to contain all active states, not just the newly created states.
536  pub async fn remove_old_and_persist_new_active_states(
537      dbtx: &mut DatabaseTransaction<'_>,
538      new_active_states: Vec<(Vec<u8>, OperationId)>,
539      states_to_remove: Vec<(Vec<u8>, OperationId)>,
540      module_instance_id: ModuleInstanceId,
541  ) {
542      // Remove all existing active states
543      for (bytes, operation_id) in states_to_remove {
544          dbtx.remove_entry(&ActiveStateKeyBytes {
545              operation_id,
546              module_instance_id,
547              state: bytes,
548          })
549          .await
550          .expect("Did not delete anything");
551      }
552  
553      // Insert new "migrated" active states
554      for (bytes, operation_id) in new_active_states {
555          dbtx.insert_new_entry(
556              &ActiveStateKeyBytes {
557                  operation_id,
558                  module_instance_id,
559                  state: bytes,
560              },
561              &ActiveStateMeta::default(),
562          )
563          .await;
564      }
565  }
566  
567  /// Persists new inactive states by first removing all current inactive states,
568  /// and re-writing with the new set of inactive states. `new_inactive_states` is
569  /// expected to contain all inactive states, not just the newly created states.
570  pub async fn remove_old_and_persist_new_inactive_states(
571      dbtx: &mut DatabaseTransaction<'_>,
572      new_inactive_states: Vec<(Vec<u8>, OperationId)>,
573      states_to_remove: Vec<(Vec<u8>, OperationId)>,
574      module_instance_id: ModuleInstanceId,
575  ) {
576      // Remove all existing active states
577      for (bytes, operation_id) in states_to_remove {
578          dbtx.remove_entry(&InactiveStateKeyBytes {
579              operation_id,
580              module_instance_id,
581              state: bytes,
582          })
583          .await
584          .expect("Did not delete anything");
585      }
586  
587      // Insert new "migrated" inactive states
588      for (bytes, operation_id) in new_inactive_states {
589          dbtx.insert_new_entry(
590              &InactiveStateKeyBytes {
591                  operation_id,
592                  module_instance_id,
593                  state: bytes,
594              },
595              &InactiveStateMeta {
596                  created_at: fedimint_core::time::now(),
597                  exited_at: fedimint_core::time::now(),
598              },
599          )
600          .await;
601      }
602  }
603  
604  /// Helper function definition for migrating a single state.
605  type MigrateStateFn =
606      fn(OperationId, &mut Cursor<&[u8]>) -> anyhow::Result<Option<(Vec<u8>, OperationId)>>;
607  
608  /// Migrates a particular state by looping over all active and inactive states.
609  /// If the `migrate` closure returns `None`, this state was not migrated and
610  /// should be added to the new state machine vectors.
611  pub async fn migrate_state(
612      active_states: Vec<(Vec<u8>, OperationId)>,
613      inactive_states: Vec<(Vec<u8>, OperationId)>,
614      migrate: MigrateStateFn,
615  ) -> anyhow::Result<Option<(Vec<(Vec<u8>, OperationId)>, Vec<(Vec<u8>, OperationId)>)>> {
616      let mut new_active_states = Vec::with_capacity(active_states.len());
617      for (active_state, operation_id) in active_states {
618          let bytes = active_state.as_slice();
619  
620          let decoders = ModuleDecoderRegistry::default();
621          let mut cursor = std::io::Cursor::new(bytes);
622          let module_instance_id =
623              fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
624  
625          let state = match migrate(operation_id, &mut cursor)? {
626              Some((mut state, operation_id)) => {
627                  let mut final_state = module_instance_id.to_bytes();
628                  final_state.append(&mut state);
629                  (final_state, operation_id)
630              }
631              None => (active_state, operation_id),
632          };
633  
634          new_active_states.push(state);
635      }
636  
637      let mut new_inactive_states = Vec::with_capacity(inactive_states.len());
638      for (inactive_state, operation_id) in inactive_states {
639          let bytes = inactive_state.as_slice();
640  
641          let decoders = ModuleDecoderRegistry::default();
642          let mut cursor = std::io::Cursor::new(bytes);
643          let module_instance_id =
644              fedimint_core::core::ModuleInstanceId::consensus_decode(&mut cursor, &decoders)?;
645  
646          let state = match migrate(operation_id, &mut cursor)? {
647              Some((mut state, operation_id)) => {
648                  let mut final_state = module_instance_id.to_bytes();
649                  final_state.append(&mut state);
650                  (final_state, operation_id)
651              }
652              None => (inactive_state, operation_id),
653          };
654  
655          new_inactive_states.push(state);
656      }
657  
658      Ok(Some((new_active_states, new_inactive_states)))
659  }