/ fedimint-core / src / db / mod.rs
mod.rs
   1  //! Core Fedimint database traits and types
   2  //!
   3  //! # Isolation of database transactions
   4  //!
   5  //! Fedimint requires that the database implementation implement Snapshot
   6  //! Isolation. Snapshot Isolation is a database isolation level that guarantees
   7  //! consistent reads from the time that the snapshot was created (at transaction
   8  //! creation time). Transactions with Snapshot Isolation level will only commit
   9  //! if there has been no write to the modified keys since the snapshot (i.e.
  10  //! write-write conflicts are prevented).
  11  //!
  12  //! Specifically, Fedimint expects the database implementation to prevent the
  13  //! following anomalies:
  14  //!
  15  //! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1)
  16  //! at time (t + i)
  17  //!
  18  //! Dirty Read: TX1 is able to read TX2's uncommitted writes.
  19  //!
  20  //! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at
  21  //! time (t + i) where V1 != V2.
  22  //!
  23  //! Phantom Record: TX1 retrieves X number of records for a prefix at time t and
  24  //! retrieves Y number of records for the same prefix at time (t + i).
  25  //!
  26  //! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2
  27  //! overwrites V1 as the value for K1 (write-write conflict).
  28  //!
  29  //! | Type     | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom
  30  //! Record | Lost Writes | | -------- | ------------------ | ---------- |
  31  //! ------------------- | -------------- | ----------- | | MemoryDB | Prevented
  32  //! | Prevented  | Prevented           | Prevented      | Possible    |
  33  //! | RocksDB  | Prevented          | Prevented  | Prevented           |
  34  //! Prevented      | Prevented   | | Sqlite   | Prevented          | Prevented
  35  //! | Prevented           | Prevented      | Prevented   |
  36  
  37  use std::any;
  38  use std::collections::BTreeMap;
  39  use std::error::Error;
  40  use std::fmt::{self, Debug};
  41  use std::marker::{self, PhantomData};
  42  use std::ops::{self, DerefMut};
  43  use std::pin::Pin;
  44  use std::sync::Arc;
  45  use std::time::Duration;
  46  
  47  use anyhow::{bail, Context, Result};
  48  use fedimint_core::util::BoxFuture;
  49  use fedimint_logging::LOG_DB;
  50  use futures::{Stream, StreamExt};
  51  use macro_rules_attribute::apply;
  52  use rand::Rng;
  53  use serde::Serialize;
  54  use strum_macros::EnumIter;
  55  use thiserror::Error;
  56  use tracing::{debug, error, info, instrument, trace, warn};
  57  
  58  use crate::core::ModuleInstanceId;
  59  use crate::encoding::{Decodable, Encodable};
  60  use crate::fmt_utils::AbbreviateHexBytes;
  61  use crate::task::{MaybeSend, MaybeSync};
  62  use crate::{async_trait_maybe_send, maybe_add_send, timing};
  63  
  64  pub mod mem_impl;
  65  pub mod notifications;
  66  
  67  pub use test_utils::*;
  68  
  69  use self::notifications::{Notifications, NotifyQueue};
  70  use crate::module::registry::ModuleDecoderRegistry;
  71  
  72  pub const MODULE_GLOBAL_PREFIX: u8 = 0xff;
  73  
  74  pub trait DatabaseKeyPrefix: Debug {
  75      fn to_bytes(&self) -> Vec<u8>;
  76  }
  77  
  78  /// A key + value pair in the database with a unique prefix
  79  /// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
  80  pub trait DatabaseRecord: DatabaseKeyPrefix {
  81      const DB_PREFIX: u8;
  82      const NOTIFY_ON_MODIFY: bool = false;
  83      type Key: DatabaseKey + Debug;
  84      type Value: DatabaseValue + Debug;
  85  }
  86  
  87  /// A key that can be used to query one or more `DatabaseRecord`
  88  /// Extends `DatabaseKeyPrefix` to prepend the key's prefix.
  89  pub trait DatabaseLookup: DatabaseKeyPrefix {
  90      type Record: DatabaseRecord;
  91  }
  92  
  93  // Every `DatabaseRecord` is automatically a `DatabaseLookup`
  94  impl<Record> DatabaseLookup for Record
  95  where
  96      Record: DatabaseRecord + Debug + Decodable + Encodable,
  97  {
  98      type Record = Record;
  99  }
 100  
 101  /// `DatabaseKey` that represents the lookup structure for retrieving key/value
 102  /// pairs from the database.
 103  pub trait DatabaseKey: Sized {
 104      /// Send a notification to tasks waiting to be notified if the value of
 105      /// `DatabaseKey` is modified
 106      ///
 107      /// For instance, this can be used to be notified when a key in the
 108      /// database is created. It is also possible to run a closure with the
 109      /// value of the `DatabaseKey` as parameter to verify some changes to
 110      /// that value.
 111      const NOTIFY_ON_MODIFY: bool = false;
 112      fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
 113  }
 114  
 115  /// Marker trait for `DatabaseKey`s where `NOTIFY` is true
 116  pub trait DatabaseKeyWithNotify {}
 117  
 118  /// `DatabaseValue` that represents the value structure of database records.
 119  pub trait DatabaseValue: Sized + Debug {
 120      fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>;
 121      fn to_bytes(&self) -> Vec<u8>;
 122  }
 123  
 124  pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>;
 125  
 126  /// Just ignore this type, it's only there to make compiler happy
 127  ///
 128  /// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details.
 129  pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>;
 130  
 131  /// Error returned when the autocommit function fails
 132  #[derive(Debug, Error)]
 133  pub enum AutocommitError<E> {
 134      /// Committing the transaction failed too many times, giving up
 135      CommitFailed {
 136          /// Number of attempts
 137          attempts: usize,
 138          /// Last error on commit
 139          last_error: anyhow::Error,
 140      },
 141      /// Error returned by the closure provided to `autocommit`. If returned no
 142      /// commit was attempted in that round
 143      ClosureError {
 144          /// The attempt on which the closure returned an error
 145          ///
 146          /// Values other than 0 typically indicate a logic error since the
 147          /// closure given to `autocommit` should not have side effects
 148          /// and thus keep succeeding if it succeeded once.
 149          attempts: usize,
 150          /// Error returned by the closure
 151          error: E,
 152      },
 153  }
 154  
 155  /// Raw database implementation
 156  ///
 157  /// This and [`IRawDatabaseTransaction`] are meant to be implemented
 158  /// by crates like `fedimint-rocksdb` to provide a concrete implementation
 159  /// of a database to be used by Fedimint.
 160  ///
 161  /// This is in contrast of [`IDatabase`] which includes extra
 162  /// functionality that Fedimint needs (and adds) on top of it.
 163  #[apply(async_trait_maybe_send!)]
 164  pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static {
 165      /// A raw database transaction type
 166      type Transaction<'a>: IRawDatabaseTransaction + Debug;
 167  
 168      /// Start a database transaction
 169      async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>;
 170  }
 171  
 172  #[apply(async_trait_maybe_send!)]
 173  impl<T> IRawDatabase for Box<T>
 174  where
 175      T: IRawDatabase,
 176  {
 177      type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>;
 178  
 179      async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> {
 180          (**self).begin_transaction().await
 181      }
 182  }
 183  
 184  /// An extension trait with convenience operations on [`IRawDatabase`]
 185  pub trait IRawDatabaseExt: IRawDatabase + Sized {
 186      /// Convert to type implementing [`IRawDatabase`] into [`Database`].
 187      ///
 188      /// When type inference is not an issue, [`Into::into`] can be used instead.
 189      fn into_database(self) -> Database {
 190          Database::new(self, Default::default())
 191      }
 192  }
 193  
 194  impl<T> IRawDatabaseExt for T where T: IRawDatabase {}
 195  
 196  impl<T> From<T> for Database
 197  where
 198      T: IRawDatabase,
 199  {
 200      fn from(raw: T) -> Self {
 201          Database::new(raw, Default::default())
 202      }
 203  }
 204  
 205  /// A database that on top of a raw database operation, implements
 206  /// key notification system.
 207  #[apply(async_trait_maybe_send!)]
 208  pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static {
 209      /// Start a database transaction
 210      async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>;
 211      /// Register (and wait) for `key` updates
 212      async fn register(&self, key: &[u8]);
 213      /// Notify about `key` update (creation, modification, deletion)
 214      async fn notify(&self, key: &[u8]);
 215  
 216      /// The prefix len of this database instance
 217      fn prefix_len(&self) -> usize;
 218  }
 219  
 220  #[apply(async_trait_maybe_send!)]
 221  impl<T> IDatabase for Arc<T>
 222  where
 223      T: IDatabase + ?Sized,
 224  {
 225      async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
 226          (**self).begin_transaction().await
 227      }
 228      async fn register(&self, key: &[u8]) {
 229          (**self).register(key).await
 230      }
 231      async fn notify(&self, key: &[u8]) {
 232          (**self).notify(key).await
 233      }
 234  
 235      fn prefix_len(&self) -> usize {
 236          (**self).prefix_len()
 237      }
 238  }
 239  
 240  /// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`]
 241  ///
 242  /// Mostly notification system, but also run-time single-commit handling.
 243  struct BaseDatabase<RawDatabase> {
 244      notifications: Arc<Notifications>,
 245      raw: RawDatabase,
 246  }
 247  
 248  impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> {
 249      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 250          f.write_str("BaseDatabase")
 251      }
 252  }
 253  
 254  #[apply(async_trait_maybe_send!)]
 255  impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> {
 256      async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
 257          Box::new(BaseDatabaseTransaction::new(
 258              self.raw.begin_transaction().await,
 259              self.notifications.clone(),
 260          ))
 261      }
 262      async fn register(&self, key: &[u8]) {
 263          self.notifications.register(key).await
 264      }
 265      async fn notify(&self, key: &[u8]) {
 266          self.notifications.notify(key).await
 267      }
 268  
 269      fn prefix_len(&self) -> usize {
 270          0
 271      }
 272  }
 273  
 274  /// A public-facing newtype over `IDatabase`
 275  ///
 276  /// Notably carries set of module decoders (`ModuleDecoderRegistry`)
 277  /// and implements common utility function for auto-commits, db isolation,
 278  /// and other.
 279  #[derive(Clone, Debug)]
 280  pub struct Database {
 281      inner: Arc<dyn IDatabase + 'static>,
 282      module_decoders: ModuleDecoderRegistry,
 283  }
 284  
 285  impl Database {
 286      pub fn strong_count(&self) -> usize {
 287          Arc::strong_count(&self.inner)
 288      }
 289  
 290      pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> {
 291          self.inner
 292      }
 293  }
 294  
 295  impl Database {
 296      /// Creates a new Fedimint database from any object implementing
 297      /// [`IDatabase`].
 298      ///
 299      /// See also [`Database::new_from_arc`].
 300      pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self {
 301          let inner = BaseDatabase {
 302              raw,
 303              notifications: Arc::new(Notifications::new()),
 304          };
 305          Self::new_from_arc(
 306              Arc::new(inner) as Arc<dyn IDatabase + 'static>,
 307              module_decoders,
 308          )
 309      }
 310  
 311      /// Create [`Database`] from an already typed-erased `IDatabase`.
 312      pub fn new_from_arc(
 313          inner: Arc<dyn IDatabase + 'static>,
 314          module_decoders: ModuleDecoderRegistry,
 315      ) -> Self {
 316          Self {
 317              inner,
 318              module_decoders,
 319          }
 320      }
 321  
 322      /// Create [`Database`] isolated to a partition with a given `prefix`
 323      pub fn with_prefix(&self, prefix: Vec<u8>) -> Self {
 324          Self {
 325              inner: Arc::new(PrefixDatabase {
 326                  inner: self.inner.clone(),
 327                  prefix,
 328              }),
 329              module_decoders: self.module_decoders.clone(),
 330          }
 331      }
 332  
 333      /// Create [`Database`] isolated to a partition with a prefix for a given
 334      /// `module_instance_id`
 335      pub fn with_prefix_module_id(&self, module_instance_id: ModuleInstanceId) -> Self {
 336          let prefix = module_instance_id_to_byte_prefix(module_instance_id);
 337          self.with_prefix(prefix)
 338      }
 339  
 340      pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self {
 341          Self {
 342              inner: self.inner.clone(),
 343              module_decoders,
 344          }
 345      }
 346  
 347      /// Is this `Database` a global, unpartitioned `Database`
 348      pub fn is_global(&self) -> bool {
 349          self.inner.prefix_len() == 0
 350      }
 351  
 352      /// `Err` if [`Self::is_global`] is not true
 353      pub fn ensure_global(&self) -> Result<()> {
 354          if !self.is_global() {
 355              bail!("Database instance not global");
 356          }
 357  
 358          Ok(())
 359      }
 360  
 361      /// `Err` if [`Self::is_global`] is true
 362      pub fn ensure_isolated(&self) -> Result<()> {
 363          if self.is_global() {
 364              bail!("Database instance not isolated");
 365          }
 366  
 367          Ok(())
 368      }
 369  
 370      /// Begin a new committable database transaction
 371      pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable>
 372      where
 373          's: 'tx,
 374      {
 375          DatabaseTransaction::<Committable>::new(
 376              self.inner.begin_transaction().await,
 377              self.module_decoders.clone(),
 378          )
 379      }
 380  
 381      /// Begin a new non-committable database transaction
 382      pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable>
 383      where
 384          's: 'tx,
 385      {
 386          self.begin_transaction().await.into_nc()
 387      }
 388  
 389      /// Runs a closure with a reference to a database transaction and tries to
 390      /// commit the transaction if the closure returns `Ok` and rolls it back
 391      /// otherwise. If committing fails the closure is run for up to
 392      /// `max_attempts` times. If `max_attempts` is `None` it will run
 393      /// `usize::MAX` times which is close enough to infinite times.
 394      ///
 395      /// The closure `tx_fn` provided should not have side effects outside of the
 396      /// database transaction provided, or if it does these should be
 397      /// idempotent, since the closure might be run multiple times.
 398      ///
 399      /// # Lifetime Parameters
 400      ///
 401      /// The higher rank trait bound (HRTB) `'a` that is applied to the the
 402      /// mutable reference to the database transaction ensures that the
 403      /// reference lives as least as long as the returned future of the
 404      /// closure.
 405      ///
 406      /// Further, the reference to self (`'s`) must outlive the
 407      /// `DatabaseTransaction<'dt>`. In other words, the
 408      /// `DatabaseTransaction` must live as least as long as `self` and that is
 409      /// true as the `DatabaseTransaction` is only dropped at the end of the
 410      /// `loop{}`.
 411      ///
 412      /// # Panics
 413      ///
 414      /// This function panics when the given number of maximum attempts is zero.
 415      /// `max_attempts` must be greater or equal to one.
 416      pub async fn autocommit<'s, 'dbtx, F, T, E>(
 417          &'s self,
 418          tx_fn: F,
 419          max_attempts: Option<usize>,
 420      ) -> Result<T, AutocommitError<E>>
 421      where
 422          's: 'dbtx,
 423          for<'r, 'o> F: Fn(
 424              &'r mut DatabaseTransaction<'o>,
 425              PhantomBound<'dbtx, 'o>,
 426          ) -> BoxFuture<'r, Result<T, E>>,
 427      {
 428          assert_ne!(max_attempts, Some(0));
 429          let mut curr_attempts: usize = 0;
 430  
 431          loop {
 432              // The `checked_add()` function is used to catch the `usize` overflow.
 433              // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash
 434              // after ~50 days. But if that's the case, something else must be wrong.
 435              // With `usize=64bit` it would take much longer, obviously.
 436              curr_attempts = curr_attempts
 437                  .checked_add(1)
 438                  .expect("db autocommit attempt counter overflowed");
 439  
 440              let mut dbtx = self.begin_transaction().await;
 441  
 442              let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await;
 443              let val = match tx_fn_res {
 444                  Ok(val) => val,
 445                  Err(err) => {
 446                      dbtx.ignore_uncommitted();
 447                      return Err(AutocommitError::ClosureError {
 448                          attempts: curr_attempts,
 449                          error: err,
 450                      });
 451                  }
 452              };
 453  
 454              let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx");
 455  
 456              match dbtx.commit_tx_result().await {
 457                  Ok(()) => {
 458                      return Ok(val);
 459                  }
 460                  Err(err) => {
 461                      if max_attempts
 462                          .map(|max_att| max_att <= curr_attempts)
 463                          .unwrap_or(false)
 464                      {
 465                          warn!(
 466                              target: LOG_DB,
 467                              curr_attempts,
 468                              "Database commit failed in an autocommit block - terminating"
 469                          );
 470                          return Err(AutocommitError::CommitFailed {
 471                              attempts: curr_attempts,
 472                              last_error: err,
 473                          });
 474                      }
 475  
 476                      let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000);
 477                      let delay = rand::thread_rng().gen_range(delay..(2 * delay));
 478                      warn!(
 479                          target: LOG_DB,
 480                          curr_attempts,
 481                          delay_ms = %delay,
 482                          "Database commit failed in an autocommit block - retrying"
 483                      );
 484                      crate::runtime::sleep(Duration::from_millis(delay)).await;
 485                  }
 486              }
 487          }
 488      }
 489  
 490      /// Waits for key to be notified.
 491      ///
 492      /// Calls the `checker` when value of the key may have changed.
 493      /// Returns the value when `checker` returns a `Some(T)`.
 494      pub async fn wait_key_check<'a, K, T>(
 495          &'a self,
 496          key: &K,
 497          checker: impl Fn(Option<K::Value>) -> Option<T>,
 498      ) -> (T, DatabaseTransaction<'a, Committable>)
 499      where
 500          K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
 501      {
 502          let key_bytes = key.to_bytes();
 503          loop {
 504              // register for notification
 505              let notify = self.inner.register(&key_bytes);
 506  
 507              // check for value in db
 508              let mut tx = self.inner.begin_transaction().await;
 509  
 510              let maybe_value_bytes = tx
 511                  .raw_get_bytes(&key_bytes)
 512                  .await
 513                  .expect("Unrecoverable error when reading from database")
 514                  .map(|value_bytes| {
 515                      decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes)
 516                  });
 517  
 518              if let Some(value) = checker(maybe_value_bytes) {
 519                  return (
 520                      value,
 521                      DatabaseTransaction::new(tx, self.module_decoders.clone()),
 522                  );
 523              } else {
 524                  // key not found, try again
 525                  notify.await;
 526                  // if miss a notification between await and next register, it is
 527                  // fine. because we are going check the database
 528              }
 529          }
 530      }
 531  
 532      /// Waits for key to be present in database.
 533      pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value
 534      where
 535          K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify,
 536      {
 537          self.wait_key_check(key, std::convert::identity).await.0
 538      }
 539  }
 540  
 541  fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> {
 542      let mut prefix = vec![MODULE_GLOBAL_PREFIX];
 543      module_instance_id
 544          .consensus_encode(&mut prefix)
 545          .expect("Error encoding module instance id as prefix");
 546      prefix
 547  }
 548  
 549  /// A database that wraps an `inner` one and adds a prefix to all operations,
 550  /// effectively creating an isolated partition.
 551  #[derive(Clone, Debug)]
 552  struct PrefixDatabase<Inner>
 553  where
 554      Inner: Debug,
 555  {
 556      prefix: Vec<u8>,
 557      inner: Inner,
 558  }
 559  
 560  impl<Inner> PrefixDatabase<Inner>
 561  where
 562      Inner: Debug,
 563  {
 564      // TODO: we should optimize these concatenations, maybe by having an internal
 565      // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
 566      // something
 567      fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
 568          let mut full_key = self.prefix.clone();
 569          full_key.extend_from_slice(key);
 570          full_key
 571      }
 572  }
 573  
 574  #[apply(async_trait_maybe_send!)]
 575  impl<Inner> IDatabase for PrefixDatabase<Inner>
 576  where
 577      Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase,
 578  {
 579      async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> {
 580          Box::new(PrefixDatabaseTransaction {
 581              inner: self.inner.begin_transaction().await,
 582              prefix: self.prefix.clone(),
 583          })
 584      }
 585      async fn register(&self, key: &[u8]) {
 586          self.inner.register(&self.get_full_key(key)).await
 587      }
 588  
 589      async fn notify(&self, key: &[u8]) {
 590          self.inner.notify(&self.get_full_key(key)).await
 591      }
 592  
 593      fn prefix_len(&self) -> usize {
 594          self.inner.prefix_len() + self.prefix.len()
 595      }
 596  }
 597  
 598  /// A database transactions that wraps an `inner` one and adds a prefix to all
 599  /// operations, effectively creating an isolated partition.
 600  ///
 601  /// Produced by [`PrefixDatabase`].
 602  #[derive(Debug)]
 603  struct PrefixDatabaseTransaction<Inner> {
 604      inner: Inner,
 605      prefix: Vec<u8>,
 606  }
 607  
 608  impl<Inner> PrefixDatabaseTransaction<Inner> {
 609      // TODO: we should optimize these concatenations, maybe by having an internal
 610      // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or
 611      // something
 612      fn get_full_key(&self, key: &[u8]) -> Vec<u8> {
 613          let mut full_key = self.prefix.clone();
 614          full_key.extend_from_slice(key);
 615          full_key
 616      }
 617  
 618      fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> {
 619          Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v))) /* as Pin<Box<dyn Stream<Item =
 620                                                                               * _>>> */
 621      }
 622  }
 623  
 624  #[apply(async_trait_maybe_send!)]
 625  impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner>
 626  where
 627      Inner: IDatabaseTransaction,
 628  {
 629      async fn commit_tx(&mut self) -> Result<()> {
 630          self.inner.commit_tx().await
 631      }
 632  
 633      fn prefix_len(&self) -> usize {
 634          self.inner.prefix_len() + self.prefix.len()
 635      }
 636  }
 637  
 638  #[apply(async_trait_maybe_send!)]
 639  impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner>
 640  where
 641      Inner: IDatabaseTransactionOpsCore,
 642  {
 643      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
 644          let key = self.get_full_key(key);
 645          self.inner.raw_insert_bytes(&key, value).await
 646      }
 647  
 648      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 649          let key = self.get_full_key(key);
 650          self.inner.raw_get_bytes(&key).await
 651      }
 652  
 653      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 654          let key = self.get_full_key(key);
 655          self.inner.raw_remove_entry(&key).await
 656      }
 657  
 658      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
 659          let key = self.get_full_key(key_prefix);
 660          let stream = self.inner.raw_find_by_prefix(&key).await?;
 661          Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
 662      }
 663  
 664      async fn raw_find_by_prefix_sorted_descending(
 665          &mut self,
 666          key_prefix: &[u8],
 667      ) -> Result<PrefixStream<'_>> {
 668          let key = self.get_full_key(key_prefix);
 669          let stream = self
 670              .inner
 671              .raw_find_by_prefix_sorted_descending(&key)
 672              .await?;
 673          Ok(Self::adapt_prefix_stream(stream, self.prefix.len()))
 674      }
 675  
 676      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
 677          let key = self.get_full_key(key_prefix);
 678          self.inner.raw_remove_by_prefix(&key).await
 679      }
 680  }
 681  
 682  #[apply(async_trait_maybe_send!)]
 683  impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner>
 684  where
 685      Inner: IDatabaseTransactionOps,
 686  {
 687      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
 688          self.inner.rollback_tx_to_savepoint().await
 689      }
 690  
 691      async fn set_tx_savepoint(&mut self) -> Result<()> {
 692          self.set_tx_savepoint().await
 693      }
 694  }
 695  
 696  /// Core raw a operations database transactions supports
 697  ///
 698  /// Used to enforce the same signature on all types supporting it
 699  #[apply(async_trait_maybe_send!)]
 700  pub trait IDatabaseTransactionOpsCore: MaybeSend {
 701      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>;
 702  
 703      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
 704  
 705      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
 706  
 707      /// Returns an stream of key-value pairs with keys that start with
 708      /// `key_prefix`. No particular ordering is guaranteed.
 709      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>;
 710  
 711      /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key.
 712      async fn raw_find_by_prefix_sorted_descending(
 713          &mut self,
 714          key_prefix: &[u8],
 715      ) -> Result<PrefixStream<'_>>;
 716  
 717      /// Delete keys matching prefix
 718      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>;
 719  }
 720  
 721  #[apply(async_trait_maybe_send!)]
 722  impl<T> IDatabaseTransactionOpsCore for Box<T>
 723  where
 724      T: IDatabaseTransactionOpsCore + ?Sized,
 725  {
 726      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
 727          (**self).raw_insert_bytes(key, value).await
 728      }
 729  
 730      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 731          (**self).raw_get_bytes(key).await
 732      }
 733  
 734      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 735          (**self).raw_remove_entry(key).await
 736      }
 737  
 738      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
 739          (**self).raw_find_by_prefix(key_prefix).await
 740      }
 741  
 742      async fn raw_find_by_prefix_sorted_descending(
 743          &mut self,
 744          key_prefix: &[u8],
 745      ) -> Result<PrefixStream<'_>> {
 746          (**self)
 747              .raw_find_by_prefix_sorted_descending(key_prefix)
 748              .await
 749      }
 750  
 751      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
 752          (**self).raw_remove_by_prefix(key_prefix).await
 753      }
 754  }
 755  
 756  #[apply(async_trait_maybe_send!)]
 757  impl<T> IDatabaseTransactionOpsCore for &mut T
 758  where
 759      T: IDatabaseTransactionOpsCore + ?Sized,
 760  {
 761      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
 762          (**self).raw_insert_bytes(key, value).await
 763      }
 764  
 765      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 766          (**self).raw_get_bytes(key).await
 767      }
 768  
 769      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 770          (**self).raw_remove_entry(key).await
 771      }
 772  
 773      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
 774          (**self).raw_find_by_prefix(key_prefix).await
 775      }
 776  
 777      async fn raw_find_by_prefix_sorted_descending(
 778          &mut self,
 779          key_prefix: &[u8],
 780      ) -> Result<PrefixStream<'_>> {
 781          (**self)
 782              .raw_find_by_prefix_sorted_descending(key_prefix)
 783              .await
 784      }
 785  
 786      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
 787          (**self).raw_remove_by_prefix(key_prefix).await
 788      }
 789  }
 790  
 791  /// Additional operations (only some) database transactions expose, on top of
 792  /// [`IDatabaseTransactionOpsCore`]
 793  ///
 794  /// In certain contexts exposing these operations would be a problem, so they
 795  /// are moved to a separate trait.
 796  #[apply(async_trait_maybe_send!)]
 797  pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend {
 798      /// Create a savepoint during the transaction that can be rolled back to
 799      /// using rollback_tx_to_savepoint. Rolling back to the savepoint will
 800      /// atomically remove the writes that were applied since the savepoint
 801      /// was created.
 802      ///
 803      /// Warning: Avoid using this in fedimint client code as not all database
 804      /// transaction implementations will support setting a savepoint during
 805      /// a transaction.
 806      async fn set_tx_savepoint(&mut self) -> Result<()>;
 807  
 808      async fn rollback_tx_to_savepoint(&mut self) -> Result<()>;
 809  }
 810  
 811  #[apply(async_trait_maybe_send!)]
 812  impl<T> IDatabaseTransactionOps for Box<T>
 813  where
 814      T: IDatabaseTransactionOps + ?Sized,
 815  {
 816      async fn set_tx_savepoint(&mut self) -> Result<()> {
 817          (**self).set_tx_savepoint().await
 818      }
 819  
 820      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
 821          (**self).rollback_tx_to_savepoint().await
 822      }
 823  }
 824  
 825  #[apply(async_trait_maybe_send!)]
 826  impl<T> IDatabaseTransactionOps for &mut T
 827  where
 828      T: IDatabaseTransactionOps + ?Sized,
 829  {
 830      async fn set_tx_savepoint(&mut self) -> Result<()> {
 831          (**self).set_tx_savepoint().await
 832      }
 833  
 834      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
 835          (**self).rollback_tx_to_savepoint().await
 836      }
 837  }
 838  
 839  /// Like [`IDatabaseTransactionOpsCore`], but typed
 840  ///
 841  /// Implemented via blanket impl for everything that implements
 842  /// [`IDatabaseTransactionOpsCore`] that has decoders (implements
 843  /// [`WithDecoders`]).
 844  #[apply(async_trait_maybe_send!)]
 845  pub trait IDatabaseTransactionOpsCoreTyped<'a> {
 846      async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
 847      where
 848          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
 849  
 850      async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
 851      where
 852          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
 853          K::Value: MaybeSend + MaybeSync;
 854  
 855      async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
 856      where
 857          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
 858          K::Value: MaybeSend + MaybeSync;
 859  
 860      async fn find_by_prefix<KP>(
 861          &mut self,
 862          key_prefix: &KP,
 863      ) -> Pin<
 864          Box<
 865              maybe_add_send!(
 866                  dyn Stream<
 867                          Item = (
 868                              KP::Record,
 869                              <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
 870                          ),
 871                      > + '_
 872              ),
 873          >,
 874      >
 875      where
 876          KP: DatabaseLookup + MaybeSend + MaybeSync,
 877          KP::Record: DatabaseKey;
 878  
 879      async fn find_by_prefix_sorted_descending<KP>(
 880          &mut self,
 881          key_prefix: &KP,
 882      ) -> Pin<
 883          Box<
 884              maybe_add_send!(
 885                  dyn Stream<
 886                          Item = (
 887                              KP::Record,
 888                              <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
 889                          ),
 890                      > + '_
 891              ),
 892          >,
 893      >
 894      where
 895          KP: DatabaseLookup + MaybeSend + MaybeSync,
 896          KP::Record: DatabaseKey;
 897  
 898      async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
 899      where
 900          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync;
 901  
 902      async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
 903      where
 904          KP: DatabaseLookup + MaybeSend + MaybeSync;
 905  }
 906  
 907  // blanket implementation of typed ops for anything that implements raw ops and
 908  // has decoders
 909  #[apply(async_trait_maybe_send!)]
 910  impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T
 911  where
 912      T: IDatabaseTransactionOpsCore + WithDecoders,
 913  {
 914      async fn get_value<K>(&mut self, key: &K) -> Option<K::Value>
 915      where
 916          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
 917      {
 918          let key_bytes = key.to_bytes();
 919          let raw = self
 920              .raw_get_bytes(&key_bytes)
 921              .await
 922              .expect("Unrecoverable error occurred while reading and entry from the database");
 923          raw.map(|value_bytes| {
 924              decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
 925          })
 926      }
 927  
 928      async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value>
 929      where
 930          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
 931          K::Value: MaybeSend + MaybeSync,
 932      {
 933          let key_bytes = key.to_bytes();
 934          self.raw_insert_bytes(&key_bytes, &value.to_bytes())
 935              .await
 936              .expect("Unrecoverable error occurred while inserting entry into the database")
 937              .map(|value_bytes| {
 938                  decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
 939              })
 940      }
 941  
 942      async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value)
 943      where
 944          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
 945          K::Value: MaybeSend + MaybeSync,
 946      {
 947          if let Some(prev) = self.insert_entry(key, value).await {
 948              warn!(
 949                  target: LOG_DB,
 950                  "Database overwriting element when expecting insertion of new
 951              entry. Key: {:?} Prev Value: {:?}",             key,
 952                  prev,
 953              );
 954          }
 955      }
 956  
 957      async fn find_by_prefix<KP>(
 958          &mut self,
 959          key_prefix: &KP,
 960      ) -> Pin<
 961          Box<
 962              maybe_add_send!(
 963                  dyn Stream<
 964                          Item = (
 965                              KP::Record,
 966                              <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
 967                          ),
 968                      > + '_
 969              ),
 970          >,
 971      >
 972      where
 973          KP: DatabaseLookup + MaybeSend + MaybeSync,
 974          KP::Record: DatabaseKey,
 975      {
 976          let decoders = self.decoders().clone();
 977          Box::pin(
 978              self.raw_find_by_prefix(&key_prefix.to_bytes())
 979                  .await
 980                  .expect("Unrecoverable error occurred while listing entries from the database")
 981                  .map(move |(key_bytes, value_bytes)| {
 982                      let key = decode_key_expect(&key_bytes, &decoders);
 983                      let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
 984                      (key, value)
 985                  }),
 986          )
 987      }
 988  
 989      async fn find_by_prefix_sorted_descending<KP>(
 990          &mut self,
 991          key_prefix: &KP,
 992      ) -> Pin<
 993          Box<
 994              maybe_add_send!(
 995                  dyn Stream<
 996                          Item = (
 997                              KP::Record,
 998                              <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
 999                          ),
1000                      > + '_
1001              ),
1002          >,
1003      >
1004      where
1005          KP: DatabaseLookup + MaybeSend + MaybeSync,
1006          KP::Record: DatabaseKey,
1007      {
1008          let decoders = self.decoders().clone();
1009          Box::pin(
1010              self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes())
1011                  .await
1012                  .expect("Unrecoverable error occurred while listing entries from the database")
1013                  .map(move |(key_bytes, value_bytes)| {
1014                      let key = decode_key_expect(&key_bytes, &decoders);
1015                      let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
1016                      (key, value)
1017                  }),
1018          )
1019      }
1020      async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value>
1021      where
1022          K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1023      {
1024          let key_bytes = key.to_bytes();
1025          self.raw_remove_entry(&key_bytes)
1026              .await
1027              .expect("Unrecoverable error occurred while inserting removing entry from the database")
1028              .map(|value_bytes| {
1029                  decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes)
1030              })
1031      }
1032      async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP)
1033      where
1034          KP: DatabaseLookup + MaybeSend + MaybeSync,
1035      {
1036          self.raw_remove_by_prefix(&key_prefix.to_bytes())
1037              .await
1038              .expect("Unrecoverable error when removing entries from the database")
1039      }
1040  }
1041  
1042  /// A database type that has decoders, which allows it to implement
1043  /// [`IDatabaseTransactionOpsCoreTyped`]
1044  pub trait WithDecoders {
1045      fn decoders(&self) -> &ModuleDecoderRegistry;
1046  }
1047  
1048  /// Raw database transaction (e.g. rocksdb implementation)
1049  #[apply(async_trait_maybe_send!)]
1050  pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps {
1051      async fn commit_tx(self) -> Result<()>;
1052  }
1053  
1054  /// Fedimint database transaction
1055  ///
1056  /// See [`IDatabase`] for more info.
1057  #[apply(async_trait_maybe_send!)]
1058  pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug {
1059      /// Commit the transaction
1060      async fn commit_tx(&mut self) -> Result<()>;
1061  
1062      /// The prefix len of this database instance
1063      fn prefix_len(&self) -> usize;
1064  }
1065  
1066  #[apply(async_trait_maybe_send!)]
1067  impl<T> IDatabaseTransaction for Box<T>
1068  where
1069      T: IDatabaseTransaction + ?Sized,
1070  {
1071      async fn commit_tx(&mut self) -> Result<()> {
1072          (**self).commit_tx().await
1073      }
1074      fn prefix_len(&self) -> usize {
1075          (**self).prefix_len()
1076      }
1077  }
1078  
1079  #[apply(async_trait_maybe_send!)]
1080  impl<'a, T> IDatabaseTransaction for &'a mut T
1081  where
1082      T: IDatabaseTransaction + ?Sized,
1083  {
1084      async fn commit_tx(&mut self) -> Result<()> {
1085          (**self).commit_tx().await
1086      }
1087      fn prefix_len(&self) -> usize {
1088          (**self).prefix_len()
1089      }
1090  }
1091  
1092  /// Struct that implements `IRawDatabaseTransaction` and can be wrapped
1093  /// easier in other structs since it does not consumed `self` by move.
1094  struct BaseDatabaseTransaction<Tx> {
1095      // TODO: merge options
1096      raw: Option<Tx>,
1097      notify_queue: Option<NotifyQueue>,
1098      notifications: Arc<Notifications>,
1099  }
1100  
1101  impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx>
1102  where
1103      Tx: fmt::Debug,
1104  {
1105      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1106          f.write_fmt(format_args!(
1107              "BaseDatabaseTransaction{{ raw={:?} }}",
1108              self.raw
1109          ))
1110      }
1111  }
1112  impl<Tx> BaseDatabaseTransaction<Tx>
1113  where
1114      Tx: IRawDatabaseTransaction,
1115  {
1116      fn new(dbtx: Tx, notifications: Arc<Notifications>) -> BaseDatabaseTransaction<Tx> {
1117          BaseDatabaseTransaction {
1118              raw: Some(dbtx),
1119              notifications,
1120              notify_queue: Some(NotifyQueue::new()),
1121          }
1122      }
1123  
1124      fn add_notification_key(&mut self, key: &[u8]) -> Result<()> {
1125          self.notify_queue
1126              .as_mut()
1127              .context("can not call add_notification_key after commit")?
1128              .add(&key);
1129          Ok(())
1130      }
1131  }
1132  
1133  #[apply(async_trait_maybe_send!)]
1134  impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> {
1135      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1136          self.add_notification_key(key)?;
1137          self.raw
1138              .as_mut()
1139              .context("Cannot insert into already consumed transaction")?
1140              .raw_insert_bytes(key, value)
1141              .await
1142      }
1143  
1144      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1145          self.raw
1146              .as_mut()
1147              .context("Cannot retrieve from already consumed transaction")?
1148              .raw_get_bytes(key)
1149              .await
1150      }
1151  
1152      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1153          self.add_notification_key(key)?;
1154          self.raw
1155              .as_mut()
1156              .context("Cannot remove from already consumed transaction")?
1157              .raw_remove_entry(key)
1158              .await
1159      }
1160  
1161      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1162          self.raw
1163              .as_mut()
1164              .context("Cannot retrieve from already consumed transaction")?
1165              .raw_find_by_prefix(key_prefix)
1166              .await
1167      }
1168  
1169      async fn raw_find_by_prefix_sorted_descending(
1170          &mut self,
1171          key_prefix: &[u8],
1172      ) -> Result<PrefixStream<'_>> {
1173          self.raw
1174              .as_mut()
1175              .context("Cannot retrieve from already consumed transaction")?
1176              .raw_find_by_prefix_sorted_descending(key_prefix)
1177              .await
1178      }
1179  
1180      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1181          self.raw
1182              .as_mut()
1183              .context("Cannot remove from already consumed transaction")?
1184              .raw_remove_by_prefix(key_prefix)
1185              .await
1186      }
1187  }
1188  
1189  #[apply(async_trait_maybe_send!)]
1190  impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> {
1191      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1192          self.raw
1193              .as_mut()
1194              .context("Cannot rollback to a savepoint on an already consumed transaction")?
1195              .rollback_tx_to_savepoint()
1196              .await?;
1197          Ok(())
1198      }
1199  
1200      async fn set_tx_savepoint(&mut self) -> Result<()> {
1201          self.raw
1202              .as_mut()
1203              .context("Cannot set a tx savepoint on an already consumed transaction")?
1204              .set_tx_savepoint()
1205              .await?;
1206          Ok(())
1207      }
1208  }
1209  
1210  #[apply(async_trait_maybe_send!)]
1211  impl<Tx: IRawDatabaseTransaction> IDatabaseTransaction for BaseDatabaseTransaction<Tx>
1212  where
1213      Tx: fmt::Debug,
1214  {
1215      async fn commit_tx(&mut self) -> Result<()> {
1216          self.raw
1217              .take()
1218              .context("Cannot commit an already committed transaction")?
1219              .commit_tx()
1220              .await?;
1221          self.notifications.submit_queue(
1222              self.notify_queue
1223                  .take()
1224                  .expect("commit must be called only once"),
1225          );
1226          Ok(())
1227      }
1228  
1229      fn prefix_len(&self) -> usize {
1230          0
1231      }
1232  }
1233  
1234  /// A helper for tracking and logging on `Drop` any instances of uncommitted
1235  /// writes
1236  #[derive(Clone)]
1237  struct CommitTracker {
1238      /// Is the dbtx committed
1239      is_committed: bool,
1240      /// Does the dbtx have any writes
1241      has_writes: bool,
1242      /// Don't warn-log uncommitted writes
1243      ignore_uncommitted: bool,
1244  }
1245  
1246  impl Drop for CommitTracker {
1247      fn drop(&mut self) {
1248          if self.has_writes && !self.is_committed {
1249              if self.ignore_uncommitted {
1250                  trace!(
1251                      target: LOG_DB,
1252                      "DatabaseTransaction has writes and has not called commit, but that's expected."
1253                  );
1254              } else {
1255                  warn!(
1256                      target: LOG_DB,
1257                      location = ?backtrace::Backtrace::new(),
1258                      "DatabaseTransaction has writes and has not called commit."
1259                  );
1260              }
1261          }
1262      }
1263  }
1264  
1265  enum MaybeRef<'a, T> {
1266      Owned(T),
1267      Borrowed(&'a mut T),
1268  }
1269  
1270  impl<'a, T> ops::Deref for MaybeRef<'a, T> {
1271      type Target = T;
1272  
1273      fn deref(&self) -> &Self::Target {
1274          match self {
1275              MaybeRef::Owned(o) => o,
1276              MaybeRef::Borrowed(r) => r,
1277          }
1278      }
1279  }
1280  
1281  impl<'a, T> ops::DerefMut for MaybeRef<'a, T> {
1282      fn deref_mut(&mut self) -> &mut Self::Target {
1283          match self {
1284              MaybeRef::Owned(o) => o,
1285              MaybeRef::Borrowed(r) => r,
1286          }
1287      }
1288  }
1289  
1290  /// Session type for [`DatabaseTransaction`] that is allowed to commit
1291  ///
1292  /// Opposite of [`NonCommittable`].
1293  pub struct Committable;
1294  
1295  /// Session type for a [`DatabaseTransaction`] that is not allowed to commit
1296  ///
1297  /// Opposite of [`Committable`].
1298  
1299  pub struct NonCommittable;
1300  /// A high level database transaction handle
1301  ///
1302  /// `Cap` is a session type
1303  pub struct DatabaseTransaction<'tx, Cap = NonCommittable> {
1304      tx: Box<dyn IDatabaseTransaction + 'tx>,
1305      decoders: ModuleDecoderRegistry,
1306      commit_tracker: MaybeRef<'tx, CommitTracker>,
1307      on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>,
1308      capability: marker::PhantomData<Cap>,
1309  }
1310  
1311  impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> {
1312      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1313          f.write_fmt(format_args!(
1314              "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}",
1315              self.tx, self.decoders
1316          ))
1317      }
1318  }
1319  
1320  impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> {
1321      fn decoders(&self) -> &ModuleDecoderRegistry {
1322          &self.decoders
1323      }
1324  }
1325  
1326  #[instrument(level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)]
1327  fn decode_value<V: DatabaseValue>(
1328      value_bytes: &[u8],
1329      decoders: &ModuleDecoderRegistry,
1330  ) -> Result<V, DecodingError> {
1331      trace!(
1332          bytes = %AbbreviateHexBytes(value_bytes),
1333          "decoding value",
1334      );
1335      V::from_bytes(value_bytes, decoders)
1336  }
1337  
1338  fn decode_value_expect<V: DatabaseValue>(
1339      value_bytes: &[u8],
1340      decoders: &ModuleDecoderRegistry,
1341      key_bytes: &[u8],
1342  ) -> V {
1343      decode_value(value_bytes, decoders).unwrap_or_else(|err| {
1344          panic!(
1345              "Unrecoverable decoding DatabaseValue as {}; err={}, bytes={}; key_bytes={}",
1346              any::type_name::<V>(),
1347              err,
1348              AbbreviateHexBytes(value_bytes),
1349              AbbreviateHexBytes(key_bytes),
1350          )
1351      })
1352  }
1353  
1354  fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K {
1355      trace!(
1356          bytes = %AbbreviateHexBytes(key_bytes),
1357          "decoding key",
1358      );
1359      K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| {
1360          panic!(
1361              "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}",
1362              any::type_name::<K>(),
1363              err,
1364              AbbreviateHexBytes(key_bytes)
1365          )
1366      })
1367  }
1368  
1369  impl<'tx, Cap> DatabaseTransaction<'tx, Cap> {
1370      /// Convert into a non-committable version
1371      pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> {
1372          DatabaseTransaction {
1373              tx: self.tx,
1374              decoders: self.decoders,
1375              commit_tracker: self.commit_tracker,
1376              on_commit_hooks: self.on_commit_hooks,
1377              capability: PhantomData::<NonCommittable>,
1378          }
1379      }
1380  
1381      /// Get a reference to a non-committeable version
1382      pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable>
1383      where
1384          's: 'a,
1385      {
1386          self.to_ref().into_nc()
1387      }
1388  
1389      /// Get [`DatabaseTransaction`] isolated to a `prefix`
1390      pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1391      where
1392          'tx: 'a,
1393      {
1394          DatabaseTransaction {
1395              tx: Box::new(PrefixDatabaseTransaction {
1396                  inner: self.tx,
1397                  prefix,
1398              }),
1399              decoders: self.decoders,
1400              commit_tracker: self.commit_tracker,
1401              on_commit_hooks: self.on_commit_hooks,
1402              capability: self.capability,
1403          }
1404      }
1405  
1406      /// Get [`DatabaseTransaction`] isolated to a prefix of a given
1407      /// `module_instance_id`
1408      pub fn with_prefix_module_id<'a: 'tx>(
1409          self,
1410          module_instance_id: ModuleInstanceId,
1411      ) -> DatabaseTransaction<'a, Cap>
1412      where
1413          'tx: 'a,
1414      {
1415          let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1416          self.with_prefix(prefix)
1417      }
1418  
1419      /// Get [`DatabaseTransaction`] to `self`
1420      pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap>
1421      where
1422          's: 'a,
1423      {
1424          let decoders = self.decoders.clone();
1425  
1426          DatabaseTransaction {
1427              tx: Box::new(&mut self.tx),
1428              decoders,
1429              commit_tracker: match self.commit_tracker {
1430                  MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1431                  MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1432              },
1433              on_commit_hooks: match self.on_commit_hooks {
1434                  MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1435                  MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1436              },
1437              capability: self.capability,
1438          }
1439      }
1440  
1441      /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self`
1442      pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap>
1443      where
1444          'tx: 'a,
1445      {
1446          DatabaseTransaction {
1447              tx: Box::new(PrefixDatabaseTransaction {
1448                  inner: &mut self.tx,
1449                  prefix,
1450              }),
1451              decoders: self.decoders.clone(),
1452              commit_tracker: match self.commit_tracker {
1453                  MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1454                  MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1455              },
1456              on_commit_hooks: match self.on_commit_hooks {
1457                  MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o),
1458                  MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b),
1459              },
1460              capability: self.capability,
1461          }
1462      }
1463  
1464      pub fn to_ref_with_prefix_module_id<'a>(
1465          &'a mut self,
1466          module_instance_id: ModuleInstanceId,
1467      ) -> DatabaseTransaction<'a, Cap>
1468      where
1469          'tx: 'a,
1470      {
1471          let prefix = module_instance_id_to_byte_prefix(module_instance_id);
1472          self.to_ref_with_prefix(prefix)
1473      }
1474  
1475      /// Is this `Database` a global, unpartitioned `Database`
1476      pub fn is_global(&self) -> bool {
1477          self.tx.prefix_len() == 0
1478      }
1479  
1480      /// `Err` if [`Self::is_global`] is not true
1481      pub fn ensure_global(&self) -> Result<()> {
1482          if !self.is_global() {
1483              bail!("Database instance not global");
1484          }
1485  
1486          Ok(())
1487      }
1488  
1489      /// `Err` if [`Self::is_global`] is true
1490      pub fn ensure_isolated(&self) -> Result<()> {
1491          if self.is_global() {
1492              bail!("Database instance not isolated");
1493          }
1494  
1495          Ok(())
1496      }
1497  
1498      /// Cancel the tx to avoid debugging warnings about uncommitted writes
1499      pub fn ignore_uncommitted(&mut self) -> &mut Self {
1500          self.commit_tracker.ignore_uncommitted = true;
1501          self
1502      }
1503  
1504      /// Create warnings about uncommitted writes
1505      pub fn warn_uncommitted(&mut self) -> &mut Self {
1506          self.commit_tracker.ignore_uncommitted = false;
1507          self
1508      }
1509  
1510      /// Register a hook that will be run after commit succeeds.
1511      #[instrument(level = "debug", skip_all, ret)]
1512      pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) {
1513          self.on_commit_hooks.push(Box::new(f));
1514      }
1515  }
1516  
1517  impl<'tx> DatabaseTransaction<'tx, Committable> {
1518      pub fn new(
1519          dbtx: Box<dyn IDatabaseTransaction + 'tx>,
1520          decoders: ModuleDecoderRegistry,
1521      ) -> DatabaseTransaction<'tx, Committable> {
1522          DatabaseTransaction {
1523              tx: dbtx,
1524              decoders,
1525              commit_tracker: MaybeRef::Owned(CommitTracker {
1526                  is_committed: false,
1527                  has_writes: false,
1528                  ignore_uncommitted: false,
1529              }),
1530              on_commit_hooks: MaybeRef::Owned(vec![]),
1531              capability: PhantomData,
1532          }
1533      }
1534  
1535      pub async fn commit_tx_result(mut self) -> Result<()> {
1536          self.commit_tracker.is_committed = true;
1537          let commit_result = self.tx.commit_tx().await;
1538  
1539          // Run commit hooks in case commit was successful
1540          if commit_result.is_ok() {
1541              for hook in self.on_commit_hooks.deref_mut().drain(..) {
1542                  hook();
1543              }
1544          }
1545  
1546          commit_result
1547      }
1548  
1549      pub async fn commit_tx(mut self) {
1550          self.commit_tracker.is_committed = true;
1551          self.commit_tx_result()
1552              .await
1553              .expect("Unrecoverable error occurred while committing to the database.");
1554      }
1555  }
1556  
1557  #[apply(async_trait_maybe_send!)]
1558  impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap>
1559  where
1560      Cap: Send,
1561  {
1562      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
1563          self.commit_tracker.has_writes = true;
1564          self.tx.raw_insert_bytes(key, value).await
1565      }
1566  
1567      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1568          self.tx.raw_get_bytes(key).await
1569      }
1570  
1571      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1572          self.tx.raw_remove_entry(key).await
1573      }
1574  
1575      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
1576          self.tx.raw_find_by_prefix(key_prefix).await
1577      }
1578  
1579      async fn raw_find_by_prefix_sorted_descending(
1580          &mut self,
1581          key_prefix: &[u8],
1582      ) -> Result<PrefixStream<'_>> {
1583          self.tx
1584              .raw_find_by_prefix_sorted_descending(key_prefix)
1585              .await
1586      }
1587  
1588      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> {
1589          self.commit_tracker.has_writes = true;
1590          self.tx.raw_remove_by_prefix(key_prefix).await
1591      }
1592  }
1593  #[apply(async_trait_maybe_send!)]
1594  impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> {
1595      async fn set_tx_savepoint(&mut self) -> Result<()> {
1596          self.tx.set_tx_savepoint().await
1597      }
1598  
1599      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
1600          self.tx.rollback_tx_to_savepoint().await
1601      }
1602  }
1603  
1604  impl<T> DatabaseKeyPrefix for T
1605  where
1606      T: DatabaseLookup + crate::encoding::Encodable + Debug,
1607  {
1608      fn to_bytes(&self) -> Vec<u8> {
1609          let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX];
1610          self.consensus_encode(&mut data)
1611              .expect("Writing to vec is infallible");
1612          data
1613      }
1614  }
1615  
1616  impl<T> DatabaseKey for T
1617  where
1618      // Note: key can only be `T` that can be decoded without modules (even if
1619      // module type is `()`)
1620      T: DatabaseRecord + crate::encoding::Decodable + Sized,
1621  {
1622      const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY;
1623      fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1624          if data.is_empty() {
1625              // TODO: build better coding errors, pretty useless right now
1626              return Err(DecodingError::wrong_length(1, 0));
1627          }
1628  
1629          if data[0] != Self::DB_PREFIX {
1630              return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0]));
1631          }
1632  
1633          <Self as crate::encoding::Decodable>::consensus_decode(
1634              &mut std::io::Cursor::new(&data[1..]),
1635              modules,
1636          )
1637          .map_err(|decode_error| DecodingError::Other(decode_error.0))
1638      }
1639  }
1640  
1641  impl<T> DatabaseValue for T
1642  where
1643      T: Debug + Encodable + Decodable,
1644  {
1645      fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> {
1646          T::consensus_decode(&mut std::io::Cursor::new(data), modules)
1647              .map_err(|e| DecodingError::Other(e.0))
1648      }
1649  
1650      fn to_bytes(&self) -> Vec<u8> {
1651          let mut bytes = Vec::new();
1652          self.consensus_encode(&mut bytes)
1653              .expect("writing to vec can't fail");
1654          bytes
1655      }
1656  }
1657  
1658  /// This is a helper macro that generates the implementations of
1659  /// `DatabaseRecord` necessary for reading/writing to the
1660  /// database and fetching by prefix.
1661  ///
1662  /// - `key`: This is the type of struct that will be used as the key into the
1663  ///   database
1664  /// - `value`: This is the type of struct that will be used as the value into
1665  ///   the database
1666  /// - `db_prefix`: Required enum expression that is represented as a `u8` and is
1667  ///   prepended to this key
1668  /// - `query_prefix`: Optional type of struct that can be passed zero or more
1669  ///   times. Every query prefix can be used to query the database via
1670  ///   `find_by_prefix`
1671  ///
1672  /// # Examples
1673  ///
1674  /// ```
1675  /// use fedimint_core::encoding::{Decodable, Encodable};
1676  /// use fedimint_core::impl_db_record;
1677  ///
1678  /// #[derive(Debug, Encodable, Decodable)]
1679  /// struct MyKey;
1680  ///
1681  /// #[derive(Debug, Encodable, Decodable)]
1682  /// struct MyValue;
1683  ///
1684  /// #[repr(u8)]
1685  /// #[derive(Clone, Debug)]
1686  /// pub enum DbKeyPrefix {
1687  ///     MyKey = 0x50,
1688  /// }
1689  ///
1690  /// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey);
1691  /// ```
1692  ///
1693  /// Use the required parameters and specify one `query_prefix`
1694  ///
1695  /// ```
1696  /// use fedimint_core::encoding::{Decodable, Encodable};
1697  /// use fedimint_core::{impl_db_lookup, impl_db_record};
1698  ///
1699  /// #[derive(Debug, Encodable, Decodable)]
1700  /// struct MyKey;
1701  ///
1702  /// #[derive(Debug, Encodable, Decodable)]
1703  /// struct MyValue;
1704  ///
1705  /// #[repr(u8)]
1706  /// #[derive(Clone, Debug)]
1707  /// pub enum DbKeyPrefix {
1708  ///     MyKey = 0x50,
1709  /// }
1710  ///
1711  /// #[derive(Debug, Encodable, Decodable)]
1712  /// struct MyKeyPrefix;
1713  ///
1714  /// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,);
1715  ///
1716  /// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix);
1717  /// ```
1718  #[macro_export]
1719  macro_rules! impl_db_record {
1720      (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => {
1721          impl $crate::db::DatabaseRecord for $key {
1722              const DB_PREFIX: u8 = $db_prefix as u8;
1723              $(const NOTIFY_ON_MODIFY: bool = $notify;)?
1724              type Key = Self;
1725              type Value = $val;
1726          }
1727          $(
1728              impl_db_record! {
1729                  @impl_notify_marker key = $key, notify_on_modify = $notify
1730              }
1731          )?
1732      };
1733      // if notify is set to true
1734      (@impl_notify_marker key = $key:ty, notify_on_modify = true) => {
1735          impl $crate::db::DatabaseKeyWithNotify for $key {}
1736      };
1737      // if notify is set to false
1738      (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {};
1739  }
1740  
1741  #[macro_export]
1742  macro_rules! impl_db_lookup{
1743      (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => {
1744          $(
1745              impl $crate::db::DatabaseLookup for $query_prefix {
1746                  type Record = $key;
1747              }
1748          )*
1749      };
1750  }
1751  
1752  /// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead.
1753  #[derive(Debug, Encodable, Decodable, Serialize)]
1754  pub struct DatabaseVersionKeyV0;
1755  
1756  #[derive(Debug, Encodable, Decodable, Serialize)]
1757  pub struct DatabaseVersionKey(pub ModuleInstanceId);
1758  
1759  #[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)]
1760  pub struct DatabaseVersion(pub u64);
1761  
1762  impl_db_record!(
1763      key = DatabaseVersionKeyV0,
1764      value = DatabaseVersion,
1765      db_prefix = DbKeyPrefix::DatabaseVersion
1766  );
1767  
1768  impl_db_record!(
1769      key = DatabaseVersionKey,
1770      value = DatabaseVersion,
1771      db_prefix = DbKeyPrefix::DatabaseVersion
1772  );
1773  
1774  impl std::fmt::Display for DatabaseVersion {
1775      fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1776          write!(f, "{}", self.0)
1777      }
1778  }
1779  
1780  impl DatabaseVersion {
1781      pub fn increment(&mut self) {
1782          self.0 += 1;
1783      }
1784  }
1785  
1786  impl std::fmt::Display for DbKeyPrefix {
1787      fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1788          write!(f, "{self:?}")
1789      }
1790  }
1791  
1792  #[repr(u8)]
1793  #[derive(Clone, EnumIter, Debug)]
1794  pub enum DbKeyPrefix {
1795      DatabaseVersion = 0x50,
1796      ClientBackup = 0x51,
1797  }
1798  
1799  #[derive(Debug, Error)]
1800  pub enum DecodingError {
1801      #[error("Key had a wrong prefix, expected {expected} but got {found}")]
1802      WrongPrefix { expected: u8, found: u8 },
1803      #[error("Key had a wrong length, expected {expected} but got {found}")]
1804      WrongLength { expected: usize, found: usize },
1805      #[error("Other decoding error: {0}")]
1806      Other(anyhow::Error),
1807  }
1808  
1809  impl DecodingError {
1810      pub fn other<E: Error + Send + Sync + 'static>(error: E) -> DecodingError {
1811          DecodingError::Other(anyhow::Error::from(error))
1812      }
1813  
1814      pub fn wrong_prefix(expected: u8, found: u8) -> DecodingError {
1815          DecodingError::WrongPrefix { expected, found }
1816      }
1817  
1818      pub fn wrong_length(expected: usize, found: usize) -> DecodingError {
1819          DecodingError::WrongLength { expected, found }
1820      }
1821  }
1822  
1823  #[macro_export]
1824  macro_rules! push_db_pair_items {
1825      ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
1826          let db_items =
1827              $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
1828                  .await
1829                  .map(|(key, val)| {
1830                      (
1831                          $crate::encoding::Encodable::consensus_encode_to_hex(&key),
1832                          val,
1833                      )
1834                  })
1835                  .collect::<BTreeMap<String, $value_type>>()
1836                  .await;
1837  
1838          $map.insert($key_literal.to_string(), Box::new(db_items));
1839      };
1840  }
1841  
1842  #[macro_export]
1843  macro_rules! push_db_pair_items_no_serde {
1844      ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => {
1845          let db_items =
1846              $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
1847                  .await
1848                  .map(|(key, val)| {
1849                      (
1850                          $crate::encoding::Encodable::consensus_encode_to_hex(&key),
1851                          SerdeWrapper::from_encodable(val),
1852                      )
1853                  })
1854                  .collect::<BTreeMap<_, _>>()
1855                  .await;
1856  
1857          $map.insert($key_literal.to_string(), Box::new(db_items));
1858      };
1859  }
1860  
1861  #[macro_export]
1862  macro_rules! push_db_key_items {
1863      ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => {
1864          let db_items =
1865              $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type)
1866                  .await
1867                  .map(|(key, _)| key)
1868                  .collect::<Vec<$key_type>>()
1869                  .await;
1870  
1871          $map.insert($key_literal.to_string(), Box::new(db_items));
1872      };
1873  }
1874  
1875  /// `ServerMigrationFn` that modules can implement to "migrate" the database
1876  /// to the next database version.
1877  pub type ServerMigrationFn =
1878      for<'r, 'tx> fn(
1879          &'r mut DatabaseTransaction<'tx>,
1880      ) -> Pin<Box<dyn futures::Future<Output = anyhow::Result<()>> + Send + 'r>>;
1881  
1882  /// Applies the database migrations to a non-isolated database.
1883  pub async fn apply_migrations_server(
1884      db: &Database,
1885      kind: String,
1886      target_db_version: DatabaseVersion,
1887      migrations: BTreeMap<DatabaseVersion, ServerMigrationFn>,
1888  ) -> Result<(), anyhow::Error> {
1889      apply_migrations(db, kind, target_db_version, migrations, None).await
1890  }
1891  
1892  /// `apply_migrations` iterates from the on disk database version for the module
1893  /// up to `target_db_version` and executes all of the migrations that exist in
1894  /// the migrations map. Each migration in migrations map updates the
1895  /// database to have the correct on-disk structures that the code is expecting.
1896  /// The entire migration process is atomic (i.e migration from 0->1 and 1->2
1897  /// happen atomically). This function is called before the module is initialized
1898  /// and as long as the correct migrations are supplied in the migrations map,
1899  /// the module will be able to read and write from the database successfully.
1900  pub async fn apply_migrations(
1901      db: &Database,
1902      kind: String,
1903      target_db_version: DatabaseVersion,
1904      migrations: BTreeMap<DatabaseVersion, ServerMigrationFn>,
1905      module_instance_id: Option<ModuleInstanceId>,
1906  ) -> Result<(), anyhow::Error> {
1907      // Newly created databases will not have any data since they have just been
1908      // instantiated.
1909      let mut dbtx = db.begin_transaction_nc().await;
1910      let is_new_db = dbtx.raw_find_by_prefix(&[]).await?.next().await.is_none();
1911  
1912      // First write the database version to disk if it does not exist.
1913      create_database_version(
1914          db,
1915          target_db_version,
1916          module_instance_id,
1917          kind.clone(),
1918          is_new_db,
1919      )
1920      .await?;
1921  
1922      let mut global_dbtx = db.begin_transaction().await;
1923      let module_instance_id_key = module_instance_id_or_global(module_instance_id);
1924  
1925      let disk_version = global_dbtx
1926          .get_value(&DatabaseVersionKey(module_instance_id_key))
1927          .await;
1928  
1929      let db_version = if let Some(disk_version) = disk_version {
1930          let mut current_db_version = disk_version;
1931  
1932          if current_db_version > target_db_version {
1933              return Err(anyhow::anyhow!(format!(
1934                  "On disk database version for module {kind} was higher than the code database version."
1935              )));
1936          }
1937  
1938          while current_db_version < target_db_version {
1939              if let Some(migration) = migrations.get(&current_db_version) {
1940                  info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module...");
1941                  if let Some(module_instance_id) = module_instance_id {
1942                      migration(
1943                          &mut global_dbtx
1944                              .to_ref_with_prefix_module_id(module_instance_id)
1945                              .into_nc(),
1946                      )
1947                      .await?;
1948                  } else {
1949                      migration(&mut global_dbtx.to_ref_nc()).await?;
1950                  }
1951              } else {
1952                  warn!(target: LOG_DB, ?current_db_version, "Missing server db migration");
1953              }
1954  
1955              current_db_version.increment();
1956              global_dbtx
1957                  .insert_entry(
1958                      &DatabaseVersionKey(module_instance_id_key),
1959                      &current_db_version,
1960                  )
1961                  .await;
1962          }
1963  
1964          current_db_version
1965      } else {
1966          target_db_version
1967      };
1968  
1969      global_dbtx.commit_tx_result().await?;
1970      info!(target: LOG_DB, ?kind, ?db_version, "Migration complete");
1971      Ok(())
1972  }
1973  
1974  /// Creates the `DatabaseVersion` inside the database if it does not exist. If
1975  /// necessary, this function will migrate the legacy database version to the
1976  /// expected `DatabaseVersionKey`.
1977  pub async fn create_database_version(
1978      db: &Database,
1979      target_db_version: DatabaseVersion,
1980      module_instance_id: Option<ModuleInstanceId>,
1981      kind: String,
1982      is_new_db: bool,
1983  ) -> Result<(), anyhow::Error> {
1984      let key_module_instance_id = module_instance_id_or_global(module_instance_id);
1985  
1986      // First check if the module has a `DatabaseVersion` written to
1987      // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is
1988      // nothing to do.
1989      let mut global_dbtx = db.begin_transaction().await;
1990      if global_dbtx
1991          .get_value(&DatabaseVersionKey(key_module_instance_id))
1992          .await
1993          .is_none()
1994      {
1995          // If it exists, read and remove the legacy `DatabaseVersion`, which used to be
1996          // in the module's isolated namespace (but not for fedimint-server).
1997          //
1998          // Otherwise, if the previous database contains data and no legacy database
1999          // version, use `DatabaseVersion(0)` so that all database migrations are
2000          // run. Otherwise, this database can assumed to be new and can use
2001          // `target_db_version` to skip the database migrations.
2002          let current_version_in_module = if let Some(module_instance_id) = module_instance_id {
2003              remove_current_db_version_if_exists(
2004                  &mut global_dbtx
2005                      .to_ref_with_prefix_module_id(module_instance_id)
2006                      .into_nc(),
2007                  is_new_db,
2008                  target_db_version,
2009              )
2010              .await
2011          } else {
2012              remove_current_db_version_if_exists(
2013                  &mut global_dbtx.to_ref().into_nc(),
2014                  is_new_db,
2015                  target_db_version,
2016              )
2017              .await
2018          };
2019  
2020          // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey`
2021          info!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey...");
2022          global_dbtx
2023              .insert_new_entry(
2024                  &DatabaseVersionKey(key_module_instance_id),
2025                  &current_version_in_module,
2026              )
2027              .await;
2028          global_dbtx.commit_tx_result().await?;
2029      }
2030  
2031      Ok(())
2032  }
2033  
2034  /// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and
2035  /// returns the current database version. If the current version does not
2036  /// exist, use `target_db_version` if the database is new. Otherwise, return
2037  /// `DatabaseVersion(0)` to ensure all migrations are run.
2038  async fn remove_current_db_version_if_exists(
2039      version_dbtx: &mut DatabaseTransaction<'_>,
2040      is_new_db: bool,
2041      target_db_version: DatabaseVersion,
2042  ) -> DatabaseVersion {
2043      // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't
2044      // exist, just use the 0 for the version so that all of the migrations are
2045      // executed.
2046      let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await;
2047      match current_version_in_module {
2048          Some(database_version) => database_version,
2049          None if is_new_db => target_db_version,
2050          None => DatabaseVersion(0),
2051      }
2052  }
2053  
2054  /// Helper function to retrieve the `module_instance_id` for modules, otherwise
2055  /// return 0xff for the global namespace.
2056  fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId {
2057      // Use 0xff for fedimint-server and the `module_instance_id` for each module
2058      if let Some(module_instance_id) = module_instance_id {
2059          module_instance_id
2060      } else {
2061          MODULE_GLOBAL_PREFIX.into()
2062      }
2063  }
2064  
2065  #[allow(unused_imports)]
2066  mod test_utils {
2067      use std::collections::BTreeMap;
2068      use std::time::Duration;
2069  
2070      use futures::future::ready;
2071      use futures::{Future, FutureExt, StreamExt};
2072      use rand::Rng;
2073      use tokio::join;
2074  
2075      use super::{
2076          apply_migrations, Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey,
2077          DatabaseVersionKeyV0, ServerMigrationFn,
2078      };
2079      use crate::core::ModuleKind;
2080      use crate::db::mem_impl::MemDatabase;
2081      use crate::db::{
2082          IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX,
2083      };
2084      use crate::encoding::{Decodable, Encodable};
2085      use crate::module::registry::ModuleDecoderRegistry;
2086  
2087      pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> {
2088          crate::runtime::timeout(Duration::from_millis(10), fut)
2089              .await
2090              .ok()
2091      }
2092  
2093      #[repr(u8)]
2094      #[derive(Clone)]
2095      pub enum TestDbKeyPrefix {
2096          Test = 0x42,
2097          AltTest = 0x43,
2098          PercentTestKey = 0x25,
2099      }
2100  
2101      #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
2102      pub(super) struct TestKey(pub u64);
2103  
2104      #[derive(Debug, Encodable, Decodable)]
2105      struct DbPrefixTestPrefix;
2106  
2107      impl_db_record!(
2108          key = TestKey,
2109          value = TestVal,
2110          db_prefix = TestDbKeyPrefix::Test,
2111          notify_on_modify = true,
2112      );
2113      impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
2114  
2115      #[derive(Debug, Encodable, Decodable)]
2116      struct TestKeyV0(u64, u64);
2117  
2118      #[derive(Debug, Encodable, Decodable)]
2119      struct DbPrefixTestPrefixV0;
2120  
2121      impl_db_record!(
2122          key = TestKeyV0,
2123          value = TestVal,
2124          db_prefix = TestDbKeyPrefix::Test,
2125      );
2126      impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0);
2127  
2128      #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)]
2129      struct AltTestKey(u64);
2130  
2131      #[derive(Debug, Encodable, Decodable)]
2132      struct AltDbPrefixTestPrefix;
2133  
2134      impl_db_record!(
2135          key = AltTestKey,
2136          value = TestVal,
2137          db_prefix = TestDbKeyPrefix::AltTest,
2138      );
2139      impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix);
2140  
2141      #[derive(Debug, Encodable, Decodable)]
2142      struct PercentTestKey(u64);
2143  
2144      #[derive(Debug, Encodable, Decodable)]
2145      struct PercentPrefixTestPrefix;
2146  
2147      impl_db_record!(
2148          key = PercentTestKey,
2149          value = TestVal,
2150          db_prefix = TestDbKeyPrefix::PercentTestKey,
2151      );
2152  
2153      impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix);
2154      #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)]
2155      pub(super) struct TestVal(pub u64);
2156  
2157      const TEST_MODULE_PREFIX: u16 = 1;
2158      const ALT_MODULE_PREFIX: u16 = 2;
2159  
2160      pub async fn verify_insert_elements(db: Database) {
2161          let mut dbtx = db.begin_transaction().await;
2162          assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2163          assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none());
2164          dbtx.commit_tx().await;
2165  
2166          // Test values were persisted
2167          let mut dbtx = db.begin_transaction().await;
2168          assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2169          assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3)));
2170          dbtx.commit_tx().await;
2171  
2172          // Test overwrites work as expected
2173          let mut dbtx = db.begin_transaction().await;
2174          assert_eq!(
2175              dbtx.insert_entry(&TestKey(1), &TestVal(4)).await,
2176              Some(TestVal(2))
2177          );
2178          assert_eq!(
2179              dbtx.insert_entry(&TestKey(2), &TestVal(5)).await,
2180              Some(TestVal(3))
2181          );
2182          dbtx.commit_tx().await;
2183  
2184          let mut dbtx = db.begin_transaction().await;
2185          assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4)));
2186          assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5)));
2187          dbtx.commit_tx().await;
2188      }
2189  
2190      pub async fn verify_remove_nonexisting(db: Database) {
2191          let mut dbtx = db.begin_transaction().await;
2192          assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2193          let removed = dbtx.remove_entry(&TestKey(1)).await;
2194          assert!(removed.is_none());
2195  
2196          // Commit to suppress the warning message
2197          dbtx.commit_tx().await;
2198      }
2199  
2200      pub async fn verify_remove_existing(db: Database) {
2201          let mut dbtx = db.begin_transaction().await;
2202  
2203          assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2204  
2205          assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2206  
2207          let removed = dbtx.remove_entry(&TestKey(1)).await;
2208          assert_eq!(removed, Some(TestVal(2)));
2209          assert_eq!(dbtx.get_value(&TestKey(1)).await, None);
2210  
2211          // Commit to suppress the warning message
2212          dbtx.commit_tx().await;
2213      }
2214  
2215      pub async fn verify_read_own_writes(db: Database) {
2216          let mut dbtx = db.begin_transaction().await;
2217  
2218          assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2219  
2220          assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2)));
2221  
2222          // Commit to suppress the warning message
2223          dbtx.commit_tx().await;
2224      }
2225  
2226      pub async fn verify_prevent_dirty_reads(db: Database) {
2227          let mut dbtx = db.begin_transaction().await;
2228  
2229          assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2230  
2231          // dbtx2 should not be able to see uncommitted changes
2232          let mut dbtx2 = db.begin_transaction().await;
2233          assert_eq!(dbtx2.get_value(&TestKey(1)).await, None);
2234  
2235          // Commit to suppress the warning message
2236          dbtx.commit_tx().await;
2237      }
2238  
2239      pub async fn verify_find_by_prefix(db: Database) {
2240          let mut dbtx = db.begin_transaction().await;
2241          dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await;
2242          dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await;
2243  
2244          dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await;
2245          dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await;
2246          dbtx.commit_tx().await;
2247  
2248          // Verify finding by prefix returns the correct set of key pairs
2249          let mut dbtx = db.begin_transaction().await;
2250  
2251          let mut returned_keys = dbtx
2252              .find_by_prefix(&DbPrefixTestPrefix)
2253              .await
2254              .collect::<Vec<_>>()
2255              .await;
2256          returned_keys.sort();
2257          let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))];
2258          assert_eq!(returned_keys, expected);
2259  
2260          let reversed = dbtx
2261              .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2262              .await
2263              .collect::<Vec<_>>()
2264              .await;
2265          let mut reversed_expected = expected;
2266          reversed_expected.reverse();
2267          assert_eq!(reversed, reversed_expected);
2268  
2269          let mut returned_keys = dbtx
2270              .find_by_prefix(&AltDbPrefixTestPrefix)
2271              .await
2272              .collect::<Vec<_>>()
2273              .await;
2274          returned_keys.sort();
2275          let expected = vec![
2276              (AltTestKey(54), TestVal(6666)),
2277              (AltTestKey(55), TestVal(7777)),
2278          ];
2279          assert_eq!(returned_keys, expected);
2280  
2281          let reversed = dbtx
2282              .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix)
2283              .await
2284              .collect::<Vec<_>>()
2285              .await;
2286          let mut reversed_expected = expected;
2287          reversed_expected.reverse();
2288          assert_eq!(reversed, reversed_expected);
2289      }
2290  
2291      pub async fn verify_commit(db: Database) {
2292          let mut dbtx = db.begin_transaction().await;
2293  
2294          assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none());
2295          dbtx.commit_tx().await;
2296  
2297          // Verify dbtx2 can see committed transactions
2298          let mut dbtx2 = db.begin_transaction().await;
2299          assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2)));
2300      }
2301  
2302      pub async fn verify_rollback_to_savepoint(db: Database) {
2303          let mut dbtx_rollback = db.begin_transaction().await;
2304  
2305          dbtx_rollback
2306              .insert_entry(&TestKey(20), &TestVal(2000))
2307              .await;
2308  
2309          dbtx_rollback
2310              .set_tx_savepoint()
2311              .await
2312              .expect("Error setting transaction savepoint");
2313  
2314          dbtx_rollback
2315              .insert_entry(&TestKey(21), &TestVal(2001))
2316              .await;
2317  
2318          assert_eq!(
2319              dbtx_rollback.get_value(&TestKey(20)).await,
2320              Some(TestVal(2000))
2321          );
2322          assert_eq!(
2323              dbtx_rollback.get_value(&TestKey(21)).await,
2324              Some(TestVal(2001))
2325          );
2326  
2327          dbtx_rollback
2328              .rollback_tx_to_savepoint()
2329              .await
2330              .expect("Error setting transaction savepoint");
2331  
2332          assert_eq!(
2333              dbtx_rollback.get_value(&TestKey(20)).await,
2334              Some(TestVal(2000))
2335          );
2336  
2337          assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None);
2338  
2339          // Commit to suppress the warning message
2340          dbtx_rollback.commit_tx().await;
2341      }
2342  
2343      pub async fn verify_prevent_nonrepeatable_reads(db: Database) {
2344          let mut dbtx = db.begin_transaction().await;
2345          assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2346  
2347          let mut dbtx2 = db.begin_transaction().await;
2348  
2349          dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await;
2350  
2351          assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2352  
2353          dbtx2.commit_tx().await;
2354  
2355          // dbtx should still read None because it is operating over a snapshot
2356          // of the data when the transaction started
2357          assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2358  
2359          let expected_keys = 0;
2360          let returned_keys = dbtx
2361              .find_by_prefix(&DbPrefixTestPrefix)
2362              .await
2363              .fold(0, |returned_keys, (key, value)| async move {
2364                  if let TestKey(100) = key {
2365                      assert!(value.eq(&TestVal(101)));
2366                  }
2367                  returned_keys + 1
2368              })
2369              .await;
2370  
2371          assert_eq!(returned_keys, expected_keys);
2372      }
2373  
2374      pub async fn verify_snapshot_isolation(db: Database) {
2375          // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug
2376          for i in 0..1000 {
2377              let base_key = i * 2;
2378              let tx_accepted_key = base_key;
2379              let spent_input_key = base_key + 1;
2380  
2381              async fn random_yield() {
2382                  let times = if rand::thread_rng().gen_bool(0.5) {
2383                      0
2384                  } else {
2385                      10
2386                  };
2387                  for _ in 0..times {
2388                      tokio::task::yield_now().await
2389                  }
2390              }
2391  
2392              join!(
2393                  async {
2394                      random_yield().await;
2395                      let mut dbtx = db.begin_transaction().await;
2396  
2397                      random_yield().await;
2398                      let a = dbtx.get_value(&TestKey(tx_accepted_key)).await;
2399                      random_yield().await;
2400                      // we have 4 operations that can give you the db key,
2401                      // try all of them
2402                      let s = match i % 5 {
2403                          0 => dbtx.get_value(&TestKey(spent_input_key)).await,
2404                          1 => dbtx.remove_entry(&TestKey(spent_input_key)).await,
2405                          2 => {
2406                              dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200))
2407                                  .await
2408                          }
2409                          3 => {
2410                              dbtx.find_by_prefix(&DbPrefixTestPrefix)
2411                                  .await
2412                                  .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2413                                  .map(|(_k, v)| v)
2414                                  .next()
2415                                  .await
2416                          }
2417                          4 => {
2418                              dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
2419                                  .await
2420                                  .filter(|(k, _v)| ready(k == &TestKey(spent_input_key)))
2421                                  .map(|(_k, v)| v)
2422                                  .next()
2423                                  .await
2424                          }
2425                          _ => {
2426                              panic!("woot?");
2427                          }
2428                      };
2429  
2430                      match (a, s) {
2431                          (None, None) | (Some(_), Some(_)) => {}
2432                          (None, Some(_)) => panic!("none some?! {}", i),
2433                          (Some(_), None) => panic!("some none?! {}", i),
2434                      }
2435                  },
2436                  async {
2437                      random_yield().await;
2438  
2439                      let mut dbtx = db.begin_transaction().await;
2440                      random_yield().await;
2441                      assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None);
2442  
2443                      random_yield().await;
2444                      assert_eq!(
2445                          dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100))
2446                              .await,
2447                          None
2448                      );
2449  
2450                      random_yield().await;
2451                      assert_eq!(
2452                          dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100))
2453                              .await,
2454                          None
2455                      );
2456                      random_yield().await;
2457                      dbtx.commit_tx().await;
2458                  }
2459              );
2460          }
2461      }
2462  
2463      pub async fn verify_phantom_entry(db: Database) {
2464          let mut dbtx = db.begin_transaction().await;
2465  
2466          dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2467  
2468          dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2469  
2470          dbtx.commit_tx().await;
2471  
2472          let mut dbtx = db.begin_transaction().await;
2473          let expected_keys = 2;
2474          let returned_keys = dbtx
2475              .find_by_prefix(&DbPrefixTestPrefix)
2476              .await
2477              .fold(0, |returned_keys, (key, value)| async move {
2478                  match key {
2479                      TestKey(100) => {
2480                          assert!(value.eq(&TestVal(101)));
2481                      }
2482                      TestKey(101) => {
2483                          assert!(value.eq(&TestVal(102)));
2484                      }
2485                      _ => {}
2486                  };
2487                  returned_keys + 1
2488              })
2489              .await;
2490  
2491          assert_eq!(returned_keys, expected_keys);
2492  
2493          let mut dbtx2 = db.begin_transaction().await;
2494  
2495          dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await;
2496  
2497          dbtx2.commit_tx().await;
2498  
2499          let returned_keys = dbtx
2500              .find_by_prefix(&DbPrefixTestPrefix)
2501              .await
2502              .fold(0, |returned_keys, (key, value)| async move {
2503                  match key {
2504                      TestKey(100) => {
2505                          assert!(value.eq(&TestVal(101)));
2506                      }
2507                      TestKey(101) => {
2508                          assert!(value.eq(&TestVal(102)));
2509                      }
2510                      _ => {}
2511                  };
2512                  returned_keys + 1
2513              })
2514              .await;
2515  
2516          assert_eq!(returned_keys, expected_keys);
2517      }
2518  
2519      pub async fn expect_write_conflict(db: Database) {
2520          let mut dbtx = db.begin_transaction().await;
2521          dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2522          dbtx.commit_tx().await;
2523  
2524          let mut dbtx2 = db.begin_transaction().await;
2525          let mut dbtx3 = db.begin_transaction().await;
2526  
2527          dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await;
2528  
2529          // Depending on if the database implementation supports optimistic or
2530          // pessimistic transactions, this test should generate an error here
2531          // (pessimistic) or at commit time (optimistic)
2532          dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await;
2533  
2534          dbtx2.commit_tx().await;
2535          dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx");
2536      }
2537  
2538      pub async fn verify_string_prefix(db: Database) {
2539          let mut dbtx = db.begin_transaction().await;
2540          dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await;
2541  
2542          assert_eq!(
2543              dbtx.get_value(&PercentTestKey(100)).await,
2544              Some(TestVal(101))
2545          );
2546  
2547          dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2548  
2549          dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2550  
2551          dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await;
2552  
2553          // If the wildcard character ('%') is not handled properly, this will make
2554          // find_by_prefix return 5 results instead of 4
2555          dbtx.insert_entry(&TestKey(101), &TestVal(100)).await;
2556  
2557          let expected_keys = 4;
2558          let returned_keys = dbtx
2559              .find_by_prefix(&PercentPrefixTestPrefix)
2560              .await
2561              .fold(0, |returned_keys, (key, value)| async move {
2562                  if let PercentTestKey(101) = key {
2563                      assert!(value.eq(&TestVal(100)));
2564                  }
2565                  returned_keys + 1
2566              })
2567              .await;
2568  
2569          assert_eq!(returned_keys, expected_keys);
2570      }
2571  
2572      pub async fn verify_remove_by_prefix(db: Database) {
2573          let mut dbtx = db.begin_transaction().await;
2574  
2575          dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2576  
2577          dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2578  
2579          dbtx.commit_tx().await;
2580  
2581          let mut remove_dbtx = db.begin_transaction().await;
2582          remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await;
2583          remove_dbtx.commit_tx().await;
2584  
2585          let mut dbtx = db.begin_transaction().await;
2586          let expected_keys = 0;
2587          let returned_keys = dbtx
2588              .find_by_prefix(&DbPrefixTestPrefix)
2589              .await
2590              .fold(0, |returned_keys, (key, value)| async move {
2591                  match key {
2592                      TestKey(100) => {
2593                          assert!(value.eq(&TestVal(101)));
2594                      }
2595                      TestKey(101) => {
2596                          assert!(value.eq(&TestVal(102)));
2597                      }
2598                      _ => {}
2599                  };
2600                  returned_keys + 1
2601              })
2602              .await;
2603  
2604          assert_eq!(returned_keys, expected_keys);
2605      }
2606  
2607      pub async fn verify_module_db(db: Database, module_db: Database) {
2608          let mut dbtx = db.begin_transaction().await;
2609  
2610          dbtx.insert_entry(&TestKey(100), &TestVal(101)).await;
2611  
2612          dbtx.insert_entry(&TestKey(101), &TestVal(102)).await;
2613  
2614          dbtx.commit_tx().await;
2615  
2616          // verify module_dbtx can only read key/value pairs from its own module
2617          let mut module_dbtx = module_db.begin_transaction().await;
2618          assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None);
2619  
2620          assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None);
2621  
2622          // verify module_dbtx can read key/value pairs that it wrote
2623          let mut dbtx = db.begin_transaction().await;
2624          assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101)));
2625  
2626          assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102)));
2627  
2628          let mut module_dbtx = module_db.begin_transaction().await;
2629  
2630          module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await;
2631  
2632          module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await;
2633  
2634          module_dbtx.commit_tx().await;
2635  
2636          let expected_keys = 2;
2637          let mut dbtx = db.begin_transaction().await;
2638          let returned_keys = dbtx
2639              .find_by_prefix(&DbPrefixTestPrefix)
2640              .await
2641              .fold(0, |returned_keys, (key, value)| async move {
2642                  match key {
2643                      TestKey(100) => {
2644                          assert!(value.eq(&TestVal(101)));
2645                      }
2646                      TestKey(101) => {
2647                          assert!(value.eq(&TestVal(102)));
2648                      }
2649                      _ => {}
2650                  };
2651                  returned_keys + 1
2652              })
2653              .await;
2654  
2655          assert_eq!(returned_keys, expected_keys);
2656  
2657          let removed = dbtx.remove_entry(&TestKey(100)).await;
2658          assert_eq!(removed, Some(TestVal(101)));
2659          assert_eq!(dbtx.get_value(&TestKey(100)).await, None);
2660  
2661          let mut module_dbtx = module_db.begin_transaction().await;
2662          assert_eq!(
2663              module_dbtx.get_value(&TestKey(100)).await,
2664              Some(TestVal(103))
2665          );
2666      }
2667  
2668      pub async fn verify_module_prefix(db: Database) {
2669          let mut test_dbtx = db.begin_transaction().await;
2670          {
2671              let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX);
2672  
2673              test_module_dbtx
2674                  .insert_entry(&TestKey(100), &TestVal(101))
2675                  .await;
2676  
2677              test_module_dbtx
2678                  .insert_entry(&TestKey(101), &TestVal(102))
2679                  .await;
2680          }
2681  
2682          test_dbtx.commit_tx().await;
2683  
2684          let mut alt_dbtx = db.begin_transaction().await;
2685          {
2686              let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX);
2687  
2688              alt_module_dbtx
2689                  .insert_entry(&TestKey(100), &TestVal(103))
2690                  .await;
2691  
2692              alt_module_dbtx
2693                  .insert_entry(&TestKey(101), &TestVal(104))
2694                  .await;
2695          }
2696  
2697          alt_dbtx.commit_tx().await;
2698  
2699          // verify test_module_dbtx can only see key/value pairs from its own module
2700          let mut test_dbtx = db.begin_transaction().await;
2701          let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX);
2702          assert_eq!(
2703              test_module_dbtx.get_value(&TestKey(100)).await,
2704              Some(TestVal(101))
2705          );
2706  
2707          assert_eq!(
2708              test_module_dbtx.get_value(&TestKey(101)).await,
2709              Some(TestVal(102))
2710          );
2711  
2712          let expected_keys = 2;
2713          let returned_keys = test_module_dbtx
2714              .find_by_prefix(&DbPrefixTestPrefix)
2715              .await
2716              .fold(0, |returned_keys, (key, value)| async move {
2717                  match key {
2718                      TestKey(100) => {
2719                          assert!(value.eq(&TestVal(101)));
2720                      }
2721                      TestKey(101) => {
2722                          assert!(value.eq(&TestVal(102)));
2723                      }
2724                      _ => {}
2725                  };
2726                  returned_keys + 1
2727              })
2728              .await;
2729  
2730          assert_eq!(returned_keys, expected_keys);
2731  
2732          let removed = test_module_dbtx.remove_entry(&TestKey(100)).await;
2733          assert_eq!(removed, Some(TestVal(101)));
2734          assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None);
2735  
2736          // test_dbtx on its own wont find the key because it does not use a module
2737          // prefix
2738          let mut test_dbtx = db.begin_transaction().await;
2739          assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None);
2740  
2741          test_dbtx.commit_tx().await;
2742      }
2743  
2744      #[cfg(test)]
2745      #[tokio::test]
2746      pub async fn verify_test_migration() {
2747          // Insert a bunch of old dummy data that needs to be migrated to a new version
2748          let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default());
2749          let expected_test_keys_size: usize = 100;
2750          let mut dbtx = db.begin_transaction().await;
2751          for i in 0..expected_test_keys_size {
2752              dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64))
2753                  .await;
2754          }
2755  
2756          // Will also be migrated to `DatabaseVersionKey`
2757          dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0))
2758              .await;
2759          dbtx.commit_tx().await;
2760  
2761          let mut migrations: BTreeMap<DatabaseVersion, ServerMigrationFn> = BTreeMap::new();
2762  
2763          migrations.insert(DatabaseVersion(0), move |dbtx| {
2764              migrate_test_db_version_0(dbtx).boxed()
2765          });
2766  
2767          apply_migrations(
2768              &db,
2769              "TestModule".to_string(),
2770              DatabaseVersion(1),
2771              migrations,
2772              None,
2773          )
2774          .await
2775          .expect("Error applying migrations for TestModule");
2776  
2777          // Verify that the migrations completed successfully
2778          let mut dbtx = db.begin_transaction().await;
2779  
2780          // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated
2781          // to `DatabaseVersionKey`
2782          assert!(dbtx
2783              .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into()))
2784              .await
2785              .is_some());
2786  
2787          // Verify Dummy module migration
2788          let test_keys = dbtx
2789              .find_by_prefix(&DbPrefixTestPrefix)
2790              .await
2791              .collect::<Vec<_>>()
2792              .await;
2793          let test_keys_size = test_keys.len();
2794          assert_eq!(test_keys_size, expected_test_keys_size);
2795          for (key, val) in test_keys {
2796              assert_eq!(key.0, val.0 + 1);
2797          }
2798      }
2799  
2800      #[allow(dead_code)]
2801      async fn migrate_test_db_version_0<'a, 'b>(
2802          dbtx: &'b mut DatabaseTransaction<'a>,
2803      ) -> Result<(), anyhow::Error> {
2804          let example_keys_v0 = dbtx
2805              .find_by_prefix(&DbPrefixTestPrefixV0)
2806              .await
2807              .collect::<Vec<_>>()
2808              .await;
2809          dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await;
2810          for (key, val) in example_keys_v0 {
2811              let key_v2 = TestKey(key.1);
2812              dbtx.insert_new_entry(&key_v2, &val).await;
2813          }
2814          Ok(())
2815      }
2816  
2817      #[cfg(test)]
2818      #[tokio::test]
2819      async fn test_autocommit() {
2820          use std::marker::PhantomData;
2821  
2822          use anyhow::anyhow;
2823          use async_trait::async_trait;
2824  
2825          use crate::db::{
2826              AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction,
2827              IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase,
2828              IRawDatabaseTransaction,
2829          };
2830          use crate::ModuleDecoderRegistry;
2831  
2832          #[derive(Debug)]
2833          struct FakeDatabase;
2834  
2835          #[async_trait]
2836          impl IRawDatabase for FakeDatabase {
2837              type Transaction<'a> = FakeTransaction<'a>;
2838              async fn begin_transaction(&self) -> FakeTransaction {
2839                  FakeTransaction(PhantomData)
2840              }
2841          }
2842  
2843          #[derive(Debug)]
2844          struct FakeTransaction<'a>(PhantomData<&'a ()>);
2845  
2846          #[async_trait]
2847          impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> {
2848              async fn raw_insert_bytes(
2849                  &mut self,
2850                  _key: &[u8],
2851                  _value: &[u8],
2852              ) -> anyhow::Result<Option<Vec<u8>>> {
2853                  unimplemented!()
2854              }
2855  
2856              async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
2857                  unimplemented!()
2858              }
2859  
2860              async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> {
2861                  unimplemented!()
2862              }
2863  
2864              async fn raw_find_by_prefix(
2865                  &mut self,
2866                  _key_prefix: &[u8],
2867              ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
2868                  unimplemented!()
2869              }
2870  
2871              async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
2872                  unimplemented!()
2873              }
2874  
2875              async fn raw_find_by_prefix_sorted_descending(
2876                  &mut self,
2877                  _key_prefix: &[u8],
2878              ) -> anyhow::Result<crate::db::PrefixStream<'_>> {
2879                  unimplemented!()
2880              }
2881          }
2882  
2883          #[async_trait]
2884          impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> {
2885              async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> {
2886                  unimplemented!()
2887              }
2888  
2889              async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> {
2890                  unimplemented!()
2891              }
2892          }
2893  
2894          #[async_trait]
2895          impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> {
2896              async fn commit_tx(self) -> anyhow::Result<()> {
2897                  Err(anyhow!("Can't commit!"))
2898              }
2899          }
2900  
2901          let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default());
2902          let err = db
2903              .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5))
2904              .await
2905              .unwrap_err();
2906  
2907          match err {
2908              AutocommitError::CommitFailed {
2909                  attempts: failed_attempts,
2910                  ..
2911              } => {
2912                  assert_eq!(failed_attempts, 5)
2913              }
2914              AutocommitError::ClosureError { .. } => panic!("Closure did not return error"),
2915          }
2916      }
2917  }
2918  
2919  pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>(
2920      tx: &'r mut (dyn IDatabaseTransaction + 'inner),
2921      decoders: ModuleDecoderRegistry,
2922      key_prefix: &KP,
2923  ) -> impl Stream<
2924      Item = (
2925          KP::Record,
2926          <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value,
2927      ),
2928  > + 'r
2929  where
2930      'inner: 'r,
2931      KP: DatabaseLookup,
2932      KP::Record: DatabaseKey,
2933  {
2934      debug!("find by prefix sorted descending");
2935      let prefix_bytes = key_prefix.to_bytes();
2936      tx.raw_find_by_prefix_sorted_descending(&prefix_bytes)
2937          .await
2938          .expect("Error doing prefix search in database")
2939          .map(move |(key_bytes, value_bytes)| {
2940              let key = decode_key_expect(&key_bytes, &decoders);
2941              let value = decode_value_expect(&value_bytes, &decoders, &key_bytes);
2942              (key, value)
2943          })
2944  }
2945  
2946  #[cfg(test)]
2947  mod tests {
2948      use tokio::sync::oneshot;
2949  
2950      use super::mem_impl::MemDatabase;
2951      use super::*;
2952      use crate::runtime::spawn;
2953  
2954      async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> {
2955          let db = db.clone();
2956          let (tx, rx) = oneshot::channel::<()>();
2957          let join_handle = spawn("wait key exists", async move {
2958              let sub = db.wait_key_exists(&key);
2959              tx.send(()).unwrap();
2960              sub.await
2961          });
2962          rx.await.unwrap();
2963          join_handle
2964      }
2965  
2966      #[tokio::test]
2967      async fn test_wait_key_before_transaction() {
2968          let key = TestKey(1);
2969          let val = TestVal(2);
2970          let db = MemDatabase::new().into_database();
2971  
2972          let key_task = waiter(&db, TestKey(1)).await;
2973  
2974          let mut tx = db.begin_transaction().await;
2975          tx.insert_new_entry(&key, &val).await;
2976          tx.commit_tx().await;
2977  
2978          assert_eq!(
2979              future_returns_shortly(async { key_task.await.unwrap() }).await,
2980              Some(TestVal(2)),
2981              "should notify"
2982          );
2983      }
2984  
2985      #[tokio::test]
2986      async fn test_wait_key_before_insert() {
2987          let key = TestKey(1);
2988          let val = TestVal(2);
2989          let db = MemDatabase::new().into_database();
2990  
2991          let mut tx = db.begin_transaction().await;
2992          let key_task = waiter(&db, TestKey(1)).await;
2993          tx.insert_new_entry(&key, &val).await;
2994          tx.commit_tx().await;
2995  
2996          assert_eq!(
2997              future_returns_shortly(async { key_task.await.unwrap() }).await,
2998              Some(TestVal(2)),
2999              "should notify"
3000          );
3001      }
3002  
3003      #[tokio::test]
3004      async fn test_wait_key_after_insert() {
3005          let key = TestKey(1);
3006          let val = TestVal(2);
3007          let db = MemDatabase::new().into_database();
3008  
3009          let mut tx = db.begin_transaction().await;
3010          tx.insert_new_entry(&key, &val).await;
3011  
3012          let key_task = waiter(&db, TestKey(1)).await;
3013  
3014          tx.commit_tx().await;
3015  
3016          assert_eq!(
3017              future_returns_shortly(async { key_task.await.unwrap() }).await,
3018              Some(TestVal(2)),
3019              "should notify"
3020          );
3021      }
3022  
3023      #[tokio::test]
3024      async fn test_wait_key_after_commit() {
3025          let key = TestKey(1);
3026          let val = TestVal(2);
3027          let db = MemDatabase::new().into_database();
3028  
3029          let mut tx = db.begin_transaction().await;
3030          tx.insert_new_entry(&key, &val).await;
3031          tx.commit_tx().await;
3032  
3033          let key_task = waiter(&db, TestKey(1)).await;
3034          assert_eq!(
3035              future_returns_shortly(async { key_task.await.unwrap() }).await,
3036              Some(TestVal(2)),
3037              "should notify"
3038          );
3039      }
3040  
3041      #[tokio::test]
3042      async fn test_wait_key_isolated_db() {
3043          let module_instance_id = 10;
3044          let key = TestKey(1);
3045          let val = TestVal(2);
3046          let db = MemDatabase::new().into_database();
3047          let db = db.with_prefix_module_id(module_instance_id);
3048  
3049          let key_task = waiter(&db, TestKey(1)).await;
3050  
3051          let mut tx = db.begin_transaction().await;
3052          tx.insert_new_entry(&key, &val).await;
3053          tx.commit_tx().await;
3054  
3055          assert_eq!(
3056              future_returns_shortly(async { key_task.await.unwrap() }).await,
3057              Some(TestVal(2)),
3058              "should notify"
3059          );
3060      }
3061  
3062      #[tokio::test]
3063      async fn test_wait_key_isolated_tx() {
3064          let module_instance_id = 10;
3065          let key = TestKey(1);
3066          let val = TestVal(2);
3067          let db = MemDatabase::new().into_database();
3068  
3069          let key_task = waiter(&db.with_prefix_module_id(module_instance_id), TestKey(1)).await;
3070  
3071          let mut tx = db.begin_transaction().await;
3072          let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id);
3073          tx_mod.insert_new_entry(&key, &val).await;
3074          drop(tx_mod);
3075          tx.commit_tx().await;
3076  
3077          assert_eq!(
3078              future_returns_shortly(async { key_task.await.unwrap() }).await,
3079              Some(TestVal(2)),
3080              "should notify"
3081          );
3082      }
3083  
3084      #[tokio::test]
3085      async fn test_wait_key_no_transaction() {
3086          let db = MemDatabase::new().into_database();
3087  
3088          let key_task = waiter(&db, TestKey(1)).await;
3089          assert_eq!(
3090              future_returns_shortly(async { key_task.await.unwrap() }).await,
3091              None,
3092              "should not notify"
3093          );
3094      }
3095  }