mod.rs
1 //! Core Fedimint database traits and types 2 //! 3 //! # Isolation of database transactions 4 //! 5 //! Fedimint requires that the database implementation implement Snapshot 6 //! Isolation. Snapshot Isolation is a database isolation level that guarantees 7 //! consistent reads from the time that the snapshot was created (at transaction 8 //! creation time). Transactions with Snapshot Isolation level will only commit 9 //! if there has been no write to the modified keys since the snapshot (i.e. 10 //! write-write conflicts are prevented). 11 //! 12 //! Specifically, Fedimint expects the database implementation to prevent the 13 //! following anomalies: 14 //! 15 //! Non-Readable Write: TX1 writes (K1, V1) at time t but cannot read (K1, V1) 16 //! at time (t + i) 17 //! 18 //! Dirty Read: TX1 is able to read TX2's uncommitted writes. 19 //! 20 //! Non-Repeatable Read: TX1 reads (K1, V1) at time t and retrieves (K1, V2) at 21 //! time (t + i) where V1 != V2. 22 //! 23 //! Phantom Record: TX1 retrieves X number of records for a prefix at time t and 24 //! retrieves Y number of records for the same prefix at time (t + i). 25 //! 26 //! Lost Writes: TX1 writes (K1, V1) at the same time as TX2 writes (K1, V2). V2 27 //! overwrites V1 as the value for K1 (write-write conflict). 28 //! 29 //! | Type | Non-Readable Write | Dirty Read | Non-Repeatable Read | Phantom 30 //! Record | Lost Writes | | -------- | ------------------ | ---------- | 31 //! ------------------- | -------------- | ----------- | | MemoryDB | Prevented 32 //! | Prevented | Prevented | Prevented | Possible | 33 //! | RocksDB | Prevented | Prevented | Prevented | 34 //! Prevented | Prevented | | Sqlite | Prevented | Prevented 35 //! | Prevented | Prevented | Prevented | 36 37 use std::any; 38 use std::collections::BTreeMap; 39 use std::error::Error; 40 use std::fmt::{self, Debug}; 41 use std::marker::{self, PhantomData}; 42 use std::ops::{self, DerefMut}; 43 use std::pin::Pin; 44 use std::sync::Arc; 45 use std::time::Duration; 46 47 use anyhow::{bail, Context, Result}; 48 use fedimint_core::util::BoxFuture; 49 use fedimint_logging::LOG_DB; 50 use futures::{Stream, StreamExt}; 51 use macro_rules_attribute::apply; 52 use rand::Rng; 53 use serde::Serialize; 54 use strum_macros::EnumIter; 55 use thiserror::Error; 56 use tracing::{debug, error, info, instrument, trace, warn}; 57 58 use crate::core::ModuleInstanceId; 59 use crate::encoding::{Decodable, Encodable}; 60 use crate::fmt_utils::AbbreviateHexBytes; 61 use crate::task::{MaybeSend, MaybeSync}; 62 use crate::{async_trait_maybe_send, maybe_add_send, timing}; 63 64 pub mod mem_impl; 65 pub mod notifications; 66 67 pub use test_utils::*; 68 69 use self::notifications::{Notifications, NotifyQueue}; 70 use crate::module::registry::ModuleDecoderRegistry; 71 72 pub const MODULE_GLOBAL_PREFIX: u8 = 0xff; 73 74 pub trait DatabaseKeyPrefix: Debug { 75 fn to_bytes(&self) -> Vec<u8>; 76 } 77 78 /// A key + value pair in the database with a unique prefix 79 /// Extends `DatabaseKeyPrefix` to prepend the key's prefix. 80 pub trait DatabaseRecord: DatabaseKeyPrefix { 81 const DB_PREFIX: u8; 82 const NOTIFY_ON_MODIFY: bool = false; 83 type Key: DatabaseKey + Debug; 84 type Value: DatabaseValue + Debug; 85 } 86 87 /// A key that can be used to query one or more `DatabaseRecord` 88 /// Extends `DatabaseKeyPrefix` to prepend the key's prefix. 89 pub trait DatabaseLookup: DatabaseKeyPrefix { 90 type Record: DatabaseRecord; 91 } 92 93 // Every `DatabaseRecord` is automatically a `DatabaseLookup` 94 impl<Record> DatabaseLookup for Record 95 where 96 Record: DatabaseRecord + Debug + Decodable + Encodable, 97 { 98 type Record = Record; 99 } 100 101 /// `DatabaseKey` that represents the lookup structure for retrieving key/value 102 /// pairs from the database. 103 pub trait DatabaseKey: Sized { 104 /// Send a notification to tasks waiting to be notified if the value of 105 /// `DatabaseKey` is modified 106 /// 107 /// For instance, this can be used to be notified when a key in the 108 /// database is created. It is also possible to run a closure with the 109 /// value of the `DatabaseKey` as parameter to verify some changes to 110 /// that value. 111 const NOTIFY_ON_MODIFY: bool = false; 112 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>; 113 } 114 115 /// Marker trait for `DatabaseKey`s where `NOTIFY` is true 116 pub trait DatabaseKeyWithNotify {} 117 118 /// `DatabaseValue` that represents the value structure of database records. 119 pub trait DatabaseValue: Sized + Debug { 120 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError>; 121 fn to_bytes(&self) -> Vec<u8>; 122 } 123 124 pub type PrefixStream<'a> = Pin<Box<maybe_add_send!(dyn Stream<Item = (Vec<u8>, Vec<u8>)> + 'a)>>; 125 126 /// Just ignore this type, it's only there to make compiler happy 127 /// 128 /// See <https://users.rust-lang.org/t/argument-requires-that-is-borrowed-for-static/66503/2?u=yandros> for details. 129 pub type PhantomBound<'big, 'small> = PhantomData<&'small &'big ()>; 130 131 /// Error returned when the autocommit function fails 132 #[derive(Debug, Error)] 133 pub enum AutocommitError<E> { 134 /// Committing the transaction failed too many times, giving up 135 CommitFailed { 136 /// Number of attempts 137 attempts: usize, 138 /// Last error on commit 139 last_error: anyhow::Error, 140 }, 141 /// Error returned by the closure provided to `autocommit`. If returned no 142 /// commit was attempted in that round 143 ClosureError { 144 /// The attempt on which the closure returned an error 145 /// 146 /// Values other than 0 typically indicate a logic error since the 147 /// closure given to `autocommit` should not have side effects 148 /// and thus keep succeeding if it succeeded once. 149 attempts: usize, 150 /// Error returned by the closure 151 error: E, 152 }, 153 } 154 155 /// Raw database implementation 156 /// 157 /// This and [`IRawDatabaseTransaction`] are meant to be implemented 158 /// by crates like `fedimint-rocksdb` to provide a concrete implementation 159 /// of a database to be used by Fedimint. 160 /// 161 /// This is in contrast of [`IDatabase`] which includes extra 162 /// functionality that Fedimint needs (and adds) on top of it. 163 #[apply(async_trait_maybe_send!)] 164 pub trait IRawDatabase: Debug + MaybeSend + MaybeSync + 'static { 165 /// A raw database transaction type 166 type Transaction<'a>: IRawDatabaseTransaction + Debug; 167 168 /// Start a database transaction 169 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a>; 170 } 171 172 #[apply(async_trait_maybe_send!)] 173 impl<T> IRawDatabase for Box<T> 174 where 175 T: IRawDatabase, 176 { 177 type Transaction<'a> = <T as IRawDatabase>::Transaction<'a>; 178 179 async fn begin_transaction<'a>(&'a self) -> Self::Transaction<'a> { 180 (**self).begin_transaction().await 181 } 182 } 183 184 /// An extension trait with convenience operations on [`IRawDatabase`] 185 pub trait IRawDatabaseExt: IRawDatabase + Sized { 186 /// Convert to type implementing [`IRawDatabase`] into [`Database`]. 187 /// 188 /// When type inference is not an issue, [`Into::into`] can be used instead. 189 fn into_database(self) -> Database { 190 Database::new(self, Default::default()) 191 } 192 } 193 194 impl<T> IRawDatabaseExt for T where T: IRawDatabase {} 195 196 impl<T> From<T> for Database 197 where 198 T: IRawDatabase, 199 { 200 fn from(raw: T) -> Self { 201 Database::new(raw, Default::default()) 202 } 203 } 204 205 /// A database that on top of a raw database operation, implements 206 /// key notification system. 207 #[apply(async_trait_maybe_send!)] 208 pub trait IDatabase: Debug + MaybeSend + MaybeSync + 'static { 209 /// Start a database transaction 210 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a>; 211 /// Register (and wait) for `key` updates 212 async fn register(&self, key: &[u8]); 213 /// Notify about `key` update (creation, modification, deletion) 214 async fn notify(&self, key: &[u8]); 215 216 /// The prefix len of this database instance 217 fn prefix_len(&self) -> usize; 218 } 219 220 #[apply(async_trait_maybe_send!)] 221 impl<T> IDatabase for Arc<T> 222 where 223 T: IDatabase + ?Sized, 224 { 225 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> { 226 (**self).begin_transaction().await 227 } 228 async fn register(&self, key: &[u8]) { 229 (**self).register(key).await 230 } 231 async fn notify(&self, key: &[u8]) { 232 (**self).notify(key).await 233 } 234 235 fn prefix_len(&self) -> usize { 236 (**self).prefix_len() 237 } 238 } 239 240 /// Base functionality around [`IRawDatabase`] to make it a [`IDatabase`] 241 /// 242 /// Mostly notification system, but also run-time single-commit handling. 243 struct BaseDatabase<RawDatabase> { 244 notifications: Arc<Notifications>, 245 raw: RawDatabase, 246 } 247 248 impl<RawDatabase> fmt::Debug for BaseDatabase<RawDatabase> { 249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 250 f.write_str("BaseDatabase") 251 } 252 } 253 254 #[apply(async_trait_maybe_send!)] 255 impl<RawDatabase: IRawDatabase + MaybeSend + 'static> IDatabase for BaseDatabase<RawDatabase> { 256 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> { 257 Box::new(BaseDatabaseTransaction::new( 258 self.raw.begin_transaction().await, 259 self.notifications.clone(), 260 )) 261 } 262 async fn register(&self, key: &[u8]) { 263 self.notifications.register(key).await 264 } 265 async fn notify(&self, key: &[u8]) { 266 self.notifications.notify(key).await 267 } 268 269 fn prefix_len(&self) -> usize { 270 0 271 } 272 } 273 274 /// A public-facing newtype over `IDatabase` 275 /// 276 /// Notably carries set of module decoders (`ModuleDecoderRegistry`) 277 /// and implements common utility function for auto-commits, db isolation, 278 /// and other. 279 #[derive(Clone, Debug)] 280 pub struct Database { 281 inner: Arc<dyn IDatabase + 'static>, 282 module_decoders: ModuleDecoderRegistry, 283 } 284 285 impl Database { 286 pub fn strong_count(&self) -> usize { 287 Arc::strong_count(&self.inner) 288 } 289 290 pub fn into_inner(self) -> Arc<dyn IDatabase + 'static> { 291 self.inner 292 } 293 } 294 295 impl Database { 296 /// Creates a new Fedimint database from any object implementing 297 /// [`IDatabase`]. 298 /// 299 /// See also [`Database::new_from_arc`]. 300 pub fn new(raw: impl IRawDatabase + 'static, module_decoders: ModuleDecoderRegistry) -> Self { 301 let inner = BaseDatabase { 302 raw, 303 notifications: Arc::new(Notifications::new()), 304 }; 305 Self::new_from_arc( 306 Arc::new(inner) as Arc<dyn IDatabase + 'static>, 307 module_decoders, 308 ) 309 } 310 311 /// Create [`Database`] from an already typed-erased `IDatabase`. 312 pub fn new_from_arc( 313 inner: Arc<dyn IDatabase + 'static>, 314 module_decoders: ModuleDecoderRegistry, 315 ) -> Self { 316 Self { 317 inner, 318 module_decoders, 319 } 320 } 321 322 /// Create [`Database`] isolated to a partition with a given `prefix` 323 pub fn with_prefix(&self, prefix: Vec<u8>) -> Self { 324 Self { 325 inner: Arc::new(PrefixDatabase { 326 inner: self.inner.clone(), 327 prefix, 328 }), 329 module_decoders: self.module_decoders.clone(), 330 } 331 } 332 333 /// Create [`Database`] isolated to a partition with a prefix for a given 334 /// `module_instance_id` 335 pub fn with_prefix_module_id(&self, module_instance_id: ModuleInstanceId) -> Self { 336 let prefix = module_instance_id_to_byte_prefix(module_instance_id); 337 self.with_prefix(prefix) 338 } 339 340 pub fn with_decoders(&self, module_decoders: ModuleDecoderRegistry) -> Self { 341 Self { 342 inner: self.inner.clone(), 343 module_decoders, 344 } 345 } 346 347 /// Is this `Database` a global, unpartitioned `Database` 348 pub fn is_global(&self) -> bool { 349 self.inner.prefix_len() == 0 350 } 351 352 /// `Err` if [`Self::is_global`] is not true 353 pub fn ensure_global(&self) -> Result<()> { 354 if !self.is_global() { 355 bail!("Database instance not global"); 356 } 357 358 Ok(()) 359 } 360 361 /// `Err` if [`Self::is_global`] is true 362 pub fn ensure_isolated(&self) -> Result<()> { 363 if self.is_global() { 364 bail!("Database instance not isolated"); 365 } 366 367 Ok(()) 368 } 369 370 /// Begin a new committable database transaction 371 pub async fn begin_transaction<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, Committable> 372 where 373 's: 'tx, 374 { 375 DatabaseTransaction::<Committable>::new( 376 self.inner.begin_transaction().await, 377 self.module_decoders.clone(), 378 ) 379 } 380 381 /// Begin a new non-committable database transaction 382 pub async fn begin_transaction_nc<'s, 'tx>(&'s self) -> DatabaseTransaction<'tx, NonCommittable> 383 where 384 's: 'tx, 385 { 386 self.begin_transaction().await.into_nc() 387 } 388 389 /// Runs a closure with a reference to a database transaction and tries to 390 /// commit the transaction if the closure returns `Ok` and rolls it back 391 /// otherwise. If committing fails the closure is run for up to 392 /// `max_attempts` times. If `max_attempts` is `None` it will run 393 /// `usize::MAX` times which is close enough to infinite times. 394 /// 395 /// The closure `tx_fn` provided should not have side effects outside of the 396 /// database transaction provided, or if it does these should be 397 /// idempotent, since the closure might be run multiple times. 398 /// 399 /// # Lifetime Parameters 400 /// 401 /// The higher rank trait bound (HRTB) `'a` that is applied to the the 402 /// mutable reference to the database transaction ensures that the 403 /// reference lives as least as long as the returned future of the 404 /// closure. 405 /// 406 /// Further, the reference to self (`'s`) must outlive the 407 /// `DatabaseTransaction<'dt>`. In other words, the 408 /// `DatabaseTransaction` must live as least as long as `self` and that is 409 /// true as the `DatabaseTransaction` is only dropped at the end of the 410 /// `loop{}`. 411 /// 412 /// # Panics 413 /// 414 /// This function panics when the given number of maximum attempts is zero. 415 /// `max_attempts` must be greater or equal to one. 416 pub async fn autocommit<'s, 'dbtx, F, T, E>( 417 &'s self, 418 tx_fn: F, 419 max_attempts: Option<usize>, 420 ) -> Result<T, AutocommitError<E>> 421 where 422 's: 'dbtx, 423 for<'r, 'o> F: Fn( 424 &'r mut DatabaseTransaction<'o>, 425 PhantomBound<'dbtx, 'o>, 426 ) -> BoxFuture<'r, Result<T, E>>, 427 { 428 assert_ne!(max_attempts, Some(0)); 429 let mut curr_attempts: usize = 0; 430 431 loop { 432 // The `checked_add()` function is used to catch the `usize` overflow. 433 // With `usize=32bit` and an assumed time of 1ms per iteration, this would crash 434 // after ~50 days. But if that's the case, something else must be wrong. 435 // With `usize=64bit` it would take much longer, obviously. 436 curr_attempts = curr_attempts 437 .checked_add(1) 438 .expect("db autocommit attempt counter overflowed"); 439 440 let mut dbtx = self.begin_transaction().await; 441 442 let tx_fn_res = tx_fn(&mut dbtx.to_ref_nc(), PhantomData).await; 443 let val = match tx_fn_res { 444 Ok(val) => val, 445 Err(err) => { 446 dbtx.ignore_uncommitted(); 447 return Err(AutocommitError::ClosureError { 448 attempts: curr_attempts, 449 error: err, 450 }); 451 } 452 }; 453 454 let _timing /* logs on drop */ = timing::TimeReporter::new("autocommit - commit_tx"); 455 456 match dbtx.commit_tx_result().await { 457 Ok(()) => { 458 return Ok(val); 459 } 460 Err(err) => { 461 if max_attempts 462 .map(|max_att| max_att <= curr_attempts) 463 .unwrap_or(false) 464 { 465 warn!( 466 target: LOG_DB, 467 curr_attempts, 468 "Database commit failed in an autocommit block - terminating" 469 ); 470 return Err(AutocommitError::CommitFailed { 471 attempts: curr_attempts, 472 last_error: err, 473 }); 474 } 475 476 let delay = (2u64.pow(curr_attempts.min(7) as u32) * 10).min(1000); 477 let delay = rand::thread_rng().gen_range(delay..(2 * delay)); 478 warn!( 479 target: LOG_DB, 480 curr_attempts, 481 delay_ms = %delay, 482 "Database commit failed in an autocommit block - retrying" 483 ); 484 crate::runtime::sleep(Duration::from_millis(delay)).await; 485 } 486 } 487 } 488 } 489 490 /// Waits for key to be notified. 491 /// 492 /// Calls the `checker` when value of the key may have changed. 493 /// Returns the value when `checker` returns a `Some(T)`. 494 pub async fn wait_key_check<'a, K, T>( 495 &'a self, 496 key: &K, 497 checker: impl Fn(Option<K::Value>) -> Option<T>, 498 ) -> (T, DatabaseTransaction<'a, Committable>) 499 where 500 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify, 501 { 502 let key_bytes = key.to_bytes(); 503 loop { 504 // register for notification 505 let notify = self.inner.register(&key_bytes); 506 507 // check for value in db 508 let mut tx = self.inner.begin_transaction().await; 509 510 let maybe_value_bytes = tx 511 .raw_get_bytes(&key_bytes) 512 .await 513 .expect("Unrecoverable error when reading from database") 514 .map(|value_bytes| { 515 decode_value_expect(&value_bytes, &self.module_decoders, &key_bytes) 516 }); 517 518 if let Some(value) = checker(maybe_value_bytes) { 519 return ( 520 value, 521 DatabaseTransaction::new(tx, self.module_decoders.clone()), 522 ); 523 } else { 524 // key not found, try again 525 notify.await; 526 // if miss a notification between await and next register, it is 527 // fine. because we are going check the database 528 } 529 } 530 } 531 532 /// Waits for key to be present in database. 533 pub async fn wait_key_exists<K>(&self, key: &K) -> K::Value 534 where 535 K: DatabaseKey + DatabaseRecord + DatabaseKeyWithNotify, 536 { 537 self.wait_key_check(key, std::convert::identity).await.0 538 } 539 } 540 541 fn module_instance_id_to_byte_prefix(module_instance_id: u16) -> Vec<u8> { 542 let mut prefix = vec![MODULE_GLOBAL_PREFIX]; 543 module_instance_id 544 .consensus_encode(&mut prefix) 545 .expect("Error encoding module instance id as prefix"); 546 prefix 547 } 548 549 /// A database that wraps an `inner` one and adds a prefix to all operations, 550 /// effectively creating an isolated partition. 551 #[derive(Clone, Debug)] 552 struct PrefixDatabase<Inner> 553 where 554 Inner: Debug, 555 { 556 prefix: Vec<u8>, 557 inner: Inner, 558 } 559 560 impl<Inner> PrefixDatabase<Inner> 561 where 562 Inner: Debug, 563 { 564 // TODO: we should optimize these concatenations, maybe by having an internal 565 // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or 566 // something 567 fn get_full_key(&self, key: &[u8]) -> Vec<u8> { 568 let mut full_key = self.prefix.clone(); 569 full_key.extend_from_slice(key); 570 full_key 571 } 572 } 573 574 #[apply(async_trait_maybe_send!)] 575 impl<Inner> IDatabase for PrefixDatabase<Inner> 576 where 577 Inner: Debug + MaybeSend + MaybeSync + 'static + IDatabase, 578 { 579 async fn begin_transaction<'a>(&'a self) -> Box<dyn IDatabaseTransaction + 'a> { 580 Box::new(PrefixDatabaseTransaction { 581 inner: self.inner.begin_transaction().await, 582 prefix: self.prefix.clone(), 583 }) 584 } 585 async fn register(&self, key: &[u8]) { 586 self.inner.register(&self.get_full_key(key)).await 587 } 588 589 async fn notify(&self, key: &[u8]) { 590 self.inner.notify(&self.get_full_key(key)).await 591 } 592 593 fn prefix_len(&self) -> usize { 594 self.inner.prefix_len() + self.prefix.len() 595 } 596 } 597 598 /// A database transactions that wraps an `inner` one and adds a prefix to all 599 /// operations, effectively creating an isolated partition. 600 /// 601 /// Produced by [`PrefixDatabase`]. 602 #[derive(Debug)] 603 struct PrefixDatabaseTransaction<Inner> { 604 inner: Inner, 605 prefix: Vec<u8>, 606 } 607 608 impl<Inner> PrefixDatabaseTransaction<Inner> { 609 // TODO: we should optimize these concatenations, maybe by having an internal 610 // `key: &[&[u8]]` that we flatten once, when passing to lowest layer, or 611 // something 612 fn get_full_key(&self, key: &[u8]) -> Vec<u8> { 613 let mut full_key = self.prefix.clone(); 614 full_key.extend_from_slice(key); 615 full_key 616 } 617 618 fn adapt_prefix_stream(stream: PrefixStream<'_>, prefix_len: usize) -> PrefixStream<'_> { 619 Box::pin(stream.map(move |(k, v)| (k[prefix_len..].to_owned(), v))) /* as Pin<Box<dyn Stream<Item = 620 * _>>> */ 621 } 622 } 623 624 #[apply(async_trait_maybe_send!)] 625 impl<Inner> IDatabaseTransaction for PrefixDatabaseTransaction<Inner> 626 where 627 Inner: IDatabaseTransaction, 628 { 629 async fn commit_tx(&mut self) -> Result<()> { 630 self.inner.commit_tx().await 631 } 632 633 fn prefix_len(&self) -> usize { 634 self.inner.prefix_len() + self.prefix.len() 635 } 636 } 637 638 #[apply(async_trait_maybe_send!)] 639 impl<Inner> IDatabaseTransactionOpsCore for PrefixDatabaseTransaction<Inner> 640 where 641 Inner: IDatabaseTransactionOpsCore, 642 { 643 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 644 let key = self.get_full_key(key); 645 self.inner.raw_insert_bytes(&key, value).await 646 } 647 648 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 649 let key = self.get_full_key(key); 650 self.inner.raw_get_bytes(&key).await 651 } 652 653 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 654 let key = self.get_full_key(key); 655 self.inner.raw_remove_entry(&key).await 656 } 657 658 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 659 let key = self.get_full_key(key_prefix); 660 let stream = self.inner.raw_find_by_prefix(&key).await?; 661 Ok(Self::adapt_prefix_stream(stream, self.prefix.len())) 662 } 663 664 async fn raw_find_by_prefix_sorted_descending( 665 &mut self, 666 key_prefix: &[u8], 667 ) -> Result<PrefixStream<'_>> { 668 let key = self.get_full_key(key_prefix); 669 let stream = self 670 .inner 671 .raw_find_by_prefix_sorted_descending(&key) 672 .await?; 673 Ok(Self::adapt_prefix_stream(stream, self.prefix.len())) 674 } 675 676 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> { 677 let key = self.get_full_key(key_prefix); 678 self.inner.raw_remove_by_prefix(&key).await 679 } 680 } 681 682 #[apply(async_trait_maybe_send!)] 683 impl<Inner> IDatabaseTransactionOps for PrefixDatabaseTransaction<Inner> 684 where 685 Inner: IDatabaseTransactionOps, 686 { 687 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 688 self.inner.rollback_tx_to_savepoint().await 689 } 690 691 async fn set_tx_savepoint(&mut self) -> Result<()> { 692 self.set_tx_savepoint().await 693 } 694 } 695 696 /// Core raw a operations database transactions supports 697 /// 698 /// Used to enforce the same signature on all types supporting it 699 #[apply(async_trait_maybe_send!)] 700 pub trait IDatabaseTransactionOpsCore: MaybeSend { 701 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>>; 702 703 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>; 704 705 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>; 706 707 /// Returns an stream of key-value pairs with keys that start with 708 /// `key_prefix`. No particular ordering is guaranteed. 709 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>>; 710 711 /// Same as [`Self::raw_find_by_prefix`] but the order is descending by key. 712 async fn raw_find_by_prefix_sorted_descending( 713 &mut self, 714 key_prefix: &[u8], 715 ) -> Result<PrefixStream<'_>>; 716 717 /// Delete keys matching prefix 718 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()>; 719 } 720 721 #[apply(async_trait_maybe_send!)] 722 impl<T> IDatabaseTransactionOpsCore for Box<T> 723 where 724 T: IDatabaseTransactionOpsCore + ?Sized, 725 { 726 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 727 (**self).raw_insert_bytes(key, value).await 728 } 729 730 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 731 (**self).raw_get_bytes(key).await 732 } 733 734 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 735 (**self).raw_remove_entry(key).await 736 } 737 738 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 739 (**self).raw_find_by_prefix(key_prefix).await 740 } 741 742 async fn raw_find_by_prefix_sorted_descending( 743 &mut self, 744 key_prefix: &[u8], 745 ) -> Result<PrefixStream<'_>> { 746 (**self) 747 .raw_find_by_prefix_sorted_descending(key_prefix) 748 .await 749 } 750 751 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> { 752 (**self).raw_remove_by_prefix(key_prefix).await 753 } 754 } 755 756 #[apply(async_trait_maybe_send!)] 757 impl<T> IDatabaseTransactionOpsCore for &mut T 758 where 759 T: IDatabaseTransactionOpsCore + ?Sized, 760 { 761 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 762 (**self).raw_insert_bytes(key, value).await 763 } 764 765 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 766 (**self).raw_get_bytes(key).await 767 } 768 769 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 770 (**self).raw_remove_entry(key).await 771 } 772 773 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 774 (**self).raw_find_by_prefix(key_prefix).await 775 } 776 777 async fn raw_find_by_prefix_sorted_descending( 778 &mut self, 779 key_prefix: &[u8], 780 ) -> Result<PrefixStream<'_>> { 781 (**self) 782 .raw_find_by_prefix_sorted_descending(key_prefix) 783 .await 784 } 785 786 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> { 787 (**self).raw_remove_by_prefix(key_prefix).await 788 } 789 } 790 791 /// Additional operations (only some) database transactions expose, on top of 792 /// [`IDatabaseTransactionOpsCore`] 793 /// 794 /// In certain contexts exposing these operations would be a problem, so they 795 /// are moved to a separate trait. 796 #[apply(async_trait_maybe_send!)] 797 pub trait IDatabaseTransactionOps: IDatabaseTransactionOpsCore + MaybeSend { 798 /// Create a savepoint during the transaction that can be rolled back to 799 /// using rollback_tx_to_savepoint. Rolling back to the savepoint will 800 /// atomically remove the writes that were applied since the savepoint 801 /// was created. 802 /// 803 /// Warning: Avoid using this in fedimint client code as not all database 804 /// transaction implementations will support setting a savepoint during 805 /// a transaction. 806 async fn set_tx_savepoint(&mut self) -> Result<()>; 807 808 async fn rollback_tx_to_savepoint(&mut self) -> Result<()>; 809 } 810 811 #[apply(async_trait_maybe_send!)] 812 impl<T> IDatabaseTransactionOps for Box<T> 813 where 814 T: IDatabaseTransactionOps + ?Sized, 815 { 816 async fn set_tx_savepoint(&mut self) -> Result<()> { 817 (**self).set_tx_savepoint().await 818 } 819 820 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 821 (**self).rollback_tx_to_savepoint().await 822 } 823 } 824 825 #[apply(async_trait_maybe_send!)] 826 impl<T> IDatabaseTransactionOps for &mut T 827 where 828 T: IDatabaseTransactionOps + ?Sized, 829 { 830 async fn set_tx_savepoint(&mut self) -> Result<()> { 831 (**self).set_tx_savepoint().await 832 } 833 834 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 835 (**self).rollback_tx_to_savepoint().await 836 } 837 } 838 839 /// Like [`IDatabaseTransactionOpsCore`], but typed 840 /// 841 /// Implemented via blanket impl for everything that implements 842 /// [`IDatabaseTransactionOpsCore`] that has decoders (implements 843 /// [`WithDecoders`]). 844 #[apply(async_trait_maybe_send!)] 845 pub trait IDatabaseTransactionOpsCoreTyped<'a> { 846 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value> 847 where 848 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync; 849 850 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value> 851 where 852 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 853 K::Value: MaybeSend + MaybeSync; 854 855 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value) 856 where 857 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 858 K::Value: MaybeSend + MaybeSync; 859 860 async fn find_by_prefix<KP>( 861 &mut self, 862 key_prefix: &KP, 863 ) -> Pin< 864 Box< 865 maybe_add_send!( 866 dyn Stream< 867 Item = ( 868 KP::Record, 869 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value, 870 ), 871 > + '_ 872 ), 873 >, 874 > 875 where 876 KP: DatabaseLookup + MaybeSend + MaybeSync, 877 KP::Record: DatabaseKey; 878 879 async fn find_by_prefix_sorted_descending<KP>( 880 &mut self, 881 key_prefix: &KP, 882 ) -> Pin< 883 Box< 884 maybe_add_send!( 885 dyn Stream< 886 Item = ( 887 KP::Record, 888 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value, 889 ), 890 > + '_ 891 ), 892 >, 893 > 894 where 895 KP: DatabaseLookup + MaybeSend + MaybeSync, 896 KP::Record: DatabaseKey; 897 898 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value> 899 where 900 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync; 901 902 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP) 903 where 904 KP: DatabaseLookup + MaybeSend + MaybeSync; 905 } 906 907 // blanket implementation of typed ops for anything that implements raw ops and 908 // has decoders 909 #[apply(async_trait_maybe_send!)] 910 impl<'a, T> IDatabaseTransactionOpsCoreTyped<'a> for T 911 where 912 T: IDatabaseTransactionOpsCore + WithDecoders, 913 { 914 async fn get_value<K>(&mut self, key: &K) -> Option<K::Value> 915 where 916 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 917 { 918 let key_bytes = key.to_bytes(); 919 let raw = self 920 .raw_get_bytes(&key_bytes) 921 .await 922 .expect("Unrecoverable error occurred while reading and entry from the database"); 923 raw.map(|value_bytes| { 924 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes) 925 }) 926 } 927 928 async fn insert_entry<K>(&mut self, key: &K, value: &K::Value) -> Option<K::Value> 929 where 930 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 931 K::Value: MaybeSend + MaybeSync, 932 { 933 let key_bytes = key.to_bytes(); 934 self.raw_insert_bytes(&key_bytes, &value.to_bytes()) 935 .await 936 .expect("Unrecoverable error occurred while inserting entry into the database") 937 .map(|value_bytes| { 938 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes) 939 }) 940 } 941 942 async fn insert_new_entry<K>(&mut self, key: &K, value: &K::Value) 943 where 944 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 945 K::Value: MaybeSend + MaybeSync, 946 { 947 if let Some(prev) = self.insert_entry(key, value).await { 948 warn!( 949 target: LOG_DB, 950 "Database overwriting element when expecting insertion of new 951 entry. Key: {:?} Prev Value: {:?}", key, 952 prev, 953 ); 954 } 955 } 956 957 async fn find_by_prefix<KP>( 958 &mut self, 959 key_prefix: &KP, 960 ) -> Pin< 961 Box< 962 maybe_add_send!( 963 dyn Stream< 964 Item = ( 965 KP::Record, 966 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value, 967 ), 968 > + '_ 969 ), 970 >, 971 > 972 where 973 KP: DatabaseLookup + MaybeSend + MaybeSync, 974 KP::Record: DatabaseKey, 975 { 976 let decoders = self.decoders().clone(); 977 Box::pin( 978 self.raw_find_by_prefix(&key_prefix.to_bytes()) 979 .await 980 .expect("Unrecoverable error occurred while listing entries from the database") 981 .map(move |(key_bytes, value_bytes)| { 982 let key = decode_key_expect(&key_bytes, &decoders); 983 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes); 984 (key, value) 985 }), 986 ) 987 } 988 989 async fn find_by_prefix_sorted_descending<KP>( 990 &mut self, 991 key_prefix: &KP, 992 ) -> Pin< 993 Box< 994 maybe_add_send!( 995 dyn Stream< 996 Item = ( 997 KP::Record, 998 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value, 999 ), 1000 > + '_ 1001 ), 1002 >, 1003 > 1004 where 1005 KP: DatabaseLookup + MaybeSend + MaybeSync, 1006 KP::Record: DatabaseKey, 1007 { 1008 let decoders = self.decoders().clone(); 1009 Box::pin( 1010 self.raw_find_by_prefix_sorted_descending(&key_prefix.to_bytes()) 1011 .await 1012 .expect("Unrecoverable error occurred while listing entries from the database") 1013 .map(move |(key_bytes, value_bytes)| { 1014 let key = decode_key_expect(&key_bytes, &decoders); 1015 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes); 1016 (key, value) 1017 }), 1018 ) 1019 } 1020 async fn remove_entry<K>(&mut self, key: &K) -> Option<K::Value> 1021 where 1022 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync, 1023 { 1024 let key_bytes = key.to_bytes(); 1025 self.raw_remove_entry(&key_bytes) 1026 .await 1027 .expect("Unrecoverable error occurred while inserting removing entry from the database") 1028 .map(|value_bytes| { 1029 decode_value_expect::<K::Value>(&value_bytes, self.decoders(), &key_bytes) 1030 }) 1031 } 1032 async fn remove_by_prefix<KP>(&mut self, key_prefix: &KP) 1033 where 1034 KP: DatabaseLookup + MaybeSend + MaybeSync, 1035 { 1036 self.raw_remove_by_prefix(&key_prefix.to_bytes()) 1037 .await 1038 .expect("Unrecoverable error when removing entries from the database") 1039 } 1040 } 1041 1042 /// A database type that has decoders, which allows it to implement 1043 /// [`IDatabaseTransactionOpsCoreTyped`] 1044 pub trait WithDecoders { 1045 fn decoders(&self) -> &ModuleDecoderRegistry; 1046 } 1047 1048 /// Raw database transaction (e.g. rocksdb implementation) 1049 #[apply(async_trait_maybe_send!)] 1050 pub trait IRawDatabaseTransaction: MaybeSend + IDatabaseTransactionOps { 1051 async fn commit_tx(self) -> Result<()>; 1052 } 1053 1054 /// Fedimint database transaction 1055 /// 1056 /// See [`IDatabase`] for more info. 1057 #[apply(async_trait_maybe_send!)] 1058 pub trait IDatabaseTransaction: MaybeSend + IDatabaseTransactionOps + fmt::Debug { 1059 /// Commit the transaction 1060 async fn commit_tx(&mut self) -> Result<()>; 1061 1062 /// The prefix len of this database instance 1063 fn prefix_len(&self) -> usize; 1064 } 1065 1066 #[apply(async_trait_maybe_send!)] 1067 impl<T> IDatabaseTransaction for Box<T> 1068 where 1069 T: IDatabaseTransaction + ?Sized, 1070 { 1071 async fn commit_tx(&mut self) -> Result<()> { 1072 (**self).commit_tx().await 1073 } 1074 fn prefix_len(&self) -> usize { 1075 (**self).prefix_len() 1076 } 1077 } 1078 1079 #[apply(async_trait_maybe_send!)] 1080 impl<'a, T> IDatabaseTransaction for &'a mut T 1081 where 1082 T: IDatabaseTransaction + ?Sized, 1083 { 1084 async fn commit_tx(&mut self) -> Result<()> { 1085 (**self).commit_tx().await 1086 } 1087 fn prefix_len(&self) -> usize { 1088 (**self).prefix_len() 1089 } 1090 } 1091 1092 /// Struct that implements `IRawDatabaseTransaction` and can be wrapped 1093 /// easier in other structs since it does not consumed `self` by move. 1094 struct BaseDatabaseTransaction<Tx> { 1095 // TODO: merge options 1096 raw: Option<Tx>, 1097 notify_queue: Option<NotifyQueue>, 1098 notifications: Arc<Notifications>, 1099 } 1100 1101 impl<Tx> fmt::Debug for BaseDatabaseTransaction<Tx> 1102 where 1103 Tx: fmt::Debug, 1104 { 1105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1106 f.write_fmt(format_args!( 1107 "BaseDatabaseTransaction{{ raw={:?} }}", 1108 self.raw 1109 )) 1110 } 1111 } 1112 impl<Tx> BaseDatabaseTransaction<Tx> 1113 where 1114 Tx: IRawDatabaseTransaction, 1115 { 1116 fn new(dbtx: Tx, notifications: Arc<Notifications>) -> BaseDatabaseTransaction<Tx> { 1117 BaseDatabaseTransaction { 1118 raw: Some(dbtx), 1119 notifications, 1120 notify_queue: Some(NotifyQueue::new()), 1121 } 1122 } 1123 1124 fn add_notification_key(&mut self, key: &[u8]) -> Result<()> { 1125 self.notify_queue 1126 .as_mut() 1127 .context("can not call add_notification_key after commit")? 1128 .add(&key); 1129 Ok(()) 1130 } 1131 } 1132 1133 #[apply(async_trait_maybe_send!)] 1134 impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOpsCore for BaseDatabaseTransaction<Tx> { 1135 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 1136 self.add_notification_key(key)?; 1137 self.raw 1138 .as_mut() 1139 .context("Cannot insert into already consumed transaction")? 1140 .raw_insert_bytes(key, value) 1141 .await 1142 } 1143 1144 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 1145 self.raw 1146 .as_mut() 1147 .context("Cannot retrieve from already consumed transaction")? 1148 .raw_get_bytes(key) 1149 .await 1150 } 1151 1152 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 1153 self.add_notification_key(key)?; 1154 self.raw 1155 .as_mut() 1156 .context("Cannot remove from already consumed transaction")? 1157 .raw_remove_entry(key) 1158 .await 1159 } 1160 1161 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 1162 self.raw 1163 .as_mut() 1164 .context("Cannot retrieve from already consumed transaction")? 1165 .raw_find_by_prefix(key_prefix) 1166 .await 1167 } 1168 1169 async fn raw_find_by_prefix_sorted_descending( 1170 &mut self, 1171 key_prefix: &[u8], 1172 ) -> Result<PrefixStream<'_>> { 1173 self.raw 1174 .as_mut() 1175 .context("Cannot retrieve from already consumed transaction")? 1176 .raw_find_by_prefix_sorted_descending(key_prefix) 1177 .await 1178 } 1179 1180 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> { 1181 self.raw 1182 .as_mut() 1183 .context("Cannot remove from already consumed transaction")? 1184 .raw_remove_by_prefix(key_prefix) 1185 .await 1186 } 1187 } 1188 1189 #[apply(async_trait_maybe_send!)] 1190 impl<Tx: IRawDatabaseTransaction> IDatabaseTransactionOps for BaseDatabaseTransaction<Tx> { 1191 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 1192 self.raw 1193 .as_mut() 1194 .context("Cannot rollback to a savepoint on an already consumed transaction")? 1195 .rollback_tx_to_savepoint() 1196 .await?; 1197 Ok(()) 1198 } 1199 1200 async fn set_tx_savepoint(&mut self) -> Result<()> { 1201 self.raw 1202 .as_mut() 1203 .context("Cannot set a tx savepoint on an already consumed transaction")? 1204 .set_tx_savepoint() 1205 .await?; 1206 Ok(()) 1207 } 1208 } 1209 1210 #[apply(async_trait_maybe_send!)] 1211 impl<Tx: IRawDatabaseTransaction> IDatabaseTransaction for BaseDatabaseTransaction<Tx> 1212 where 1213 Tx: fmt::Debug, 1214 { 1215 async fn commit_tx(&mut self) -> Result<()> { 1216 self.raw 1217 .take() 1218 .context("Cannot commit an already committed transaction")? 1219 .commit_tx() 1220 .await?; 1221 self.notifications.submit_queue( 1222 self.notify_queue 1223 .take() 1224 .expect("commit must be called only once"), 1225 ); 1226 Ok(()) 1227 } 1228 1229 fn prefix_len(&self) -> usize { 1230 0 1231 } 1232 } 1233 1234 /// A helper for tracking and logging on `Drop` any instances of uncommitted 1235 /// writes 1236 #[derive(Clone)] 1237 struct CommitTracker { 1238 /// Is the dbtx committed 1239 is_committed: bool, 1240 /// Does the dbtx have any writes 1241 has_writes: bool, 1242 /// Don't warn-log uncommitted writes 1243 ignore_uncommitted: bool, 1244 } 1245 1246 impl Drop for CommitTracker { 1247 fn drop(&mut self) { 1248 if self.has_writes && !self.is_committed { 1249 if self.ignore_uncommitted { 1250 trace!( 1251 target: LOG_DB, 1252 "DatabaseTransaction has writes and has not called commit, but that's expected." 1253 ); 1254 } else { 1255 warn!( 1256 target: LOG_DB, 1257 location = ?backtrace::Backtrace::new(), 1258 "DatabaseTransaction has writes and has not called commit." 1259 ); 1260 } 1261 } 1262 } 1263 } 1264 1265 enum MaybeRef<'a, T> { 1266 Owned(T), 1267 Borrowed(&'a mut T), 1268 } 1269 1270 impl<'a, T> ops::Deref for MaybeRef<'a, T> { 1271 type Target = T; 1272 1273 fn deref(&self) -> &Self::Target { 1274 match self { 1275 MaybeRef::Owned(o) => o, 1276 MaybeRef::Borrowed(r) => r, 1277 } 1278 } 1279 } 1280 1281 impl<'a, T> ops::DerefMut for MaybeRef<'a, T> { 1282 fn deref_mut(&mut self) -> &mut Self::Target { 1283 match self { 1284 MaybeRef::Owned(o) => o, 1285 MaybeRef::Borrowed(r) => r, 1286 } 1287 } 1288 } 1289 1290 /// Session type for [`DatabaseTransaction`] that is allowed to commit 1291 /// 1292 /// Opposite of [`NonCommittable`]. 1293 pub struct Committable; 1294 1295 /// Session type for a [`DatabaseTransaction`] that is not allowed to commit 1296 /// 1297 /// Opposite of [`Committable`]. 1298 1299 pub struct NonCommittable; 1300 /// A high level database transaction handle 1301 /// 1302 /// `Cap` is a session type 1303 pub struct DatabaseTransaction<'tx, Cap = NonCommittable> { 1304 tx: Box<dyn IDatabaseTransaction + 'tx>, 1305 decoders: ModuleDecoderRegistry, 1306 commit_tracker: MaybeRef<'tx, CommitTracker>, 1307 on_commit_hooks: MaybeRef<'tx, Vec<Box<maybe_add_send!(dyn FnOnce())>>>, 1308 capability: marker::PhantomData<Cap>, 1309 } 1310 1311 impl<'tx, Cap> fmt::Debug for DatabaseTransaction<'tx, Cap> { 1312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1313 f.write_fmt(format_args!( 1314 "DatabaseTransaction {{ tx: {:?}, decoders={:?} }}", 1315 self.tx, self.decoders 1316 )) 1317 } 1318 } 1319 1320 impl<'tx, Cap> WithDecoders for DatabaseTransaction<'tx, Cap> { 1321 fn decoders(&self) -> &ModuleDecoderRegistry { 1322 &self.decoders 1323 } 1324 } 1325 1326 #[instrument(level = "trace", skip_all, fields(value_type = std::any::type_name::<V>()), err)] 1327 fn decode_value<V: DatabaseValue>( 1328 value_bytes: &[u8], 1329 decoders: &ModuleDecoderRegistry, 1330 ) -> Result<V, DecodingError> { 1331 trace!( 1332 bytes = %AbbreviateHexBytes(value_bytes), 1333 "decoding value", 1334 ); 1335 V::from_bytes(value_bytes, decoders) 1336 } 1337 1338 fn decode_value_expect<V: DatabaseValue>( 1339 value_bytes: &[u8], 1340 decoders: &ModuleDecoderRegistry, 1341 key_bytes: &[u8], 1342 ) -> V { 1343 decode_value(value_bytes, decoders).unwrap_or_else(|err| { 1344 panic!( 1345 "Unrecoverable decoding DatabaseValue as {}; err={}, bytes={}; key_bytes={}", 1346 any::type_name::<V>(), 1347 err, 1348 AbbreviateHexBytes(value_bytes), 1349 AbbreviateHexBytes(key_bytes), 1350 ) 1351 }) 1352 } 1353 1354 fn decode_key_expect<K: DatabaseKey>(key_bytes: &[u8], decoders: &ModuleDecoderRegistry) -> K { 1355 trace!( 1356 bytes = %AbbreviateHexBytes(key_bytes), 1357 "decoding key", 1358 ); 1359 K::from_bytes(key_bytes, decoders).unwrap_or_else(|err| { 1360 panic!( 1361 "Unrecoverable decoding DatabaseKey as {}; err={}; bytes={}", 1362 any::type_name::<K>(), 1363 err, 1364 AbbreviateHexBytes(key_bytes) 1365 ) 1366 }) 1367 } 1368 1369 impl<'tx, Cap> DatabaseTransaction<'tx, Cap> { 1370 /// Convert into a non-committable version 1371 pub fn into_nc(self) -> DatabaseTransaction<'tx, NonCommittable> { 1372 DatabaseTransaction { 1373 tx: self.tx, 1374 decoders: self.decoders, 1375 commit_tracker: self.commit_tracker, 1376 on_commit_hooks: self.on_commit_hooks, 1377 capability: PhantomData::<NonCommittable>, 1378 } 1379 } 1380 1381 /// Get a reference to a non-committeable version 1382 pub fn to_ref_nc<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, NonCommittable> 1383 where 1384 's: 'a, 1385 { 1386 self.to_ref().into_nc() 1387 } 1388 1389 /// Get [`DatabaseTransaction`] isolated to a `prefix` 1390 pub fn with_prefix<'a: 'tx>(self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap> 1391 where 1392 'tx: 'a, 1393 { 1394 DatabaseTransaction { 1395 tx: Box::new(PrefixDatabaseTransaction { 1396 inner: self.tx, 1397 prefix, 1398 }), 1399 decoders: self.decoders, 1400 commit_tracker: self.commit_tracker, 1401 on_commit_hooks: self.on_commit_hooks, 1402 capability: self.capability, 1403 } 1404 } 1405 1406 /// Get [`DatabaseTransaction`] isolated to a prefix of a given 1407 /// `module_instance_id` 1408 pub fn with_prefix_module_id<'a: 'tx>( 1409 self, 1410 module_instance_id: ModuleInstanceId, 1411 ) -> DatabaseTransaction<'a, Cap> 1412 where 1413 'tx: 'a, 1414 { 1415 let prefix = module_instance_id_to_byte_prefix(module_instance_id); 1416 self.with_prefix(prefix) 1417 } 1418 1419 /// Get [`DatabaseTransaction`] to `self` 1420 pub fn to_ref<'s, 'a>(&'s mut self) -> DatabaseTransaction<'a, Cap> 1421 where 1422 's: 'a, 1423 { 1424 let decoders = self.decoders.clone(); 1425 1426 DatabaseTransaction { 1427 tx: Box::new(&mut self.tx), 1428 decoders, 1429 commit_tracker: match self.commit_tracker { 1430 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o), 1431 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b), 1432 }, 1433 on_commit_hooks: match self.on_commit_hooks { 1434 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o), 1435 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b), 1436 }, 1437 capability: self.capability, 1438 } 1439 } 1440 1441 /// Get [`DatabaseTransaction`] isolated to a `prefix` of `self` 1442 pub fn to_ref_with_prefix<'a>(&'a mut self, prefix: Vec<u8>) -> DatabaseTransaction<'a, Cap> 1443 where 1444 'tx: 'a, 1445 { 1446 DatabaseTransaction { 1447 tx: Box::new(PrefixDatabaseTransaction { 1448 inner: &mut self.tx, 1449 prefix, 1450 }), 1451 decoders: self.decoders.clone(), 1452 commit_tracker: match self.commit_tracker { 1453 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o), 1454 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b), 1455 }, 1456 on_commit_hooks: match self.on_commit_hooks { 1457 MaybeRef::Owned(ref mut o) => MaybeRef::Borrowed(o), 1458 MaybeRef::Borrowed(ref mut b) => MaybeRef::Borrowed(b), 1459 }, 1460 capability: self.capability, 1461 } 1462 } 1463 1464 pub fn to_ref_with_prefix_module_id<'a>( 1465 &'a mut self, 1466 module_instance_id: ModuleInstanceId, 1467 ) -> DatabaseTransaction<'a, Cap> 1468 where 1469 'tx: 'a, 1470 { 1471 let prefix = module_instance_id_to_byte_prefix(module_instance_id); 1472 self.to_ref_with_prefix(prefix) 1473 } 1474 1475 /// Is this `Database` a global, unpartitioned `Database` 1476 pub fn is_global(&self) -> bool { 1477 self.tx.prefix_len() == 0 1478 } 1479 1480 /// `Err` if [`Self::is_global`] is not true 1481 pub fn ensure_global(&self) -> Result<()> { 1482 if !self.is_global() { 1483 bail!("Database instance not global"); 1484 } 1485 1486 Ok(()) 1487 } 1488 1489 /// `Err` if [`Self::is_global`] is true 1490 pub fn ensure_isolated(&self) -> Result<()> { 1491 if self.is_global() { 1492 bail!("Database instance not isolated"); 1493 } 1494 1495 Ok(()) 1496 } 1497 1498 /// Cancel the tx to avoid debugging warnings about uncommitted writes 1499 pub fn ignore_uncommitted(&mut self) -> &mut Self { 1500 self.commit_tracker.ignore_uncommitted = true; 1501 self 1502 } 1503 1504 /// Create warnings about uncommitted writes 1505 pub fn warn_uncommitted(&mut self) -> &mut Self { 1506 self.commit_tracker.ignore_uncommitted = false; 1507 self 1508 } 1509 1510 /// Register a hook that will be run after commit succeeds. 1511 #[instrument(level = "debug", skip_all, ret)] 1512 pub fn on_commit(&mut self, f: maybe_add_send!(impl FnOnce() + 'static)) { 1513 self.on_commit_hooks.push(Box::new(f)); 1514 } 1515 } 1516 1517 impl<'tx> DatabaseTransaction<'tx, Committable> { 1518 pub fn new( 1519 dbtx: Box<dyn IDatabaseTransaction + 'tx>, 1520 decoders: ModuleDecoderRegistry, 1521 ) -> DatabaseTransaction<'tx, Committable> { 1522 DatabaseTransaction { 1523 tx: dbtx, 1524 decoders, 1525 commit_tracker: MaybeRef::Owned(CommitTracker { 1526 is_committed: false, 1527 has_writes: false, 1528 ignore_uncommitted: false, 1529 }), 1530 on_commit_hooks: MaybeRef::Owned(vec![]), 1531 capability: PhantomData, 1532 } 1533 } 1534 1535 pub async fn commit_tx_result(mut self) -> Result<()> { 1536 self.commit_tracker.is_committed = true; 1537 let commit_result = self.tx.commit_tx().await; 1538 1539 // Run commit hooks in case commit was successful 1540 if commit_result.is_ok() { 1541 for hook in self.on_commit_hooks.deref_mut().drain(..) { 1542 hook(); 1543 } 1544 } 1545 1546 commit_result 1547 } 1548 1549 pub async fn commit_tx(mut self) { 1550 self.commit_tracker.is_committed = true; 1551 self.commit_tx_result() 1552 .await 1553 .expect("Unrecoverable error occurred while committing to the database."); 1554 } 1555 } 1556 1557 #[apply(async_trait_maybe_send!)] 1558 impl<'a, Cap> IDatabaseTransactionOpsCore for DatabaseTransaction<'a, Cap> 1559 where 1560 Cap: Send, 1561 { 1562 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 1563 self.commit_tracker.has_writes = true; 1564 self.tx.raw_insert_bytes(key, value).await 1565 } 1566 1567 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 1568 self.tx.raw_get_bytes(key).await 1569 } 1570 1571 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 1572 self.tx.raw_remove_entry(key).await 1573 } 1574 1575 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 1576 self.tx.raw_find_by_prefix(key_prefix).await 1577 } 1578 1579 async fn raw_find_by_prefix_sorted_descending( 1580 &mut self, 1581 key_prefix: &[u8], 1582 ) -> Result<PrefixStream<'_>> { 1583 self.tx 1584 .raw_find_by_prefix_sorted_descending(key_prefix) 1585 .await 1586 } 1587 1588 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> Result<()> { 1589 self.commit_tracker.has_writes = true; 1590 self.tx.raw_remove_by_prefix(key_prefix).await 1591 } 1592 } 1593 #[apply(async_trait_maybe_send!)] 1594 impl<'a> IDatabaseTransactionOps for DatabaseTransaction<'a, Committable> { 1595 async fn set_tx_savepoint(&mut self) -> Result<()> { 1596 self.tx.set_tx_savepoint().await 1597 } 1598 1599 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 1600 self.tx.rollback_tx_to_savepoint().await 1601 } 1602 } 1603 1604 impl<T> DatabaseKeyPrefix for T 1605 where 1606 T: DatabaseLookup + crate::encoding::Encodable + Debug, 1607 { 1608 fn to_bytes(&self) -> Vec<u8> { 1609 let mut data = vec![<Self as DatabaseLookup>::Record::DB_PREFIX]; 1610 self.consensus_encode(&mut data) 1611 .expect("Writing to vec is infallible"); 1612 data 1613 } 1614 } 1615 1616 impl<T> DatabaseKey for T 1617 where 1618 // Note: key can only be `T` that can be decoded without modules (even if 1619 // module type is `()`) 1620 T: DatabaseRecord + crate::encoding::Decodable + Sized, 1621 { 1622 const NOTIFY_ON_MODIFY: bool = <T as DatabaseRecord>::NOTIFY_ON_MODIFY; 1623 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> { 1624 if data.is_empty() { 1625 // TODO: build better coding errors, pretty useless right now 1626 return Err(DecodingError::wrong_length(1, 0)); 1627 } 1628 1629 if data[0] != Self::DB_PREFIX { 1630 return Err(DecodingError::wrong_prefix(Self::DB_PREFIX, data[0])); 1631 } 1632 1633 <Self as crate::encoding::Decodable>::consensus_decode( 1634 &mut std::io::Cursor::new(&data[1..]), 1635 modules, 1636 ) 1637 .map_err(|decode_error| DecodingError::Other(decode_error.0)) 1638 } 1639 } 1640 1641 impl<T> DatabaseValue for T 1642 where 1643 T: Debug + Encodable + Decodable, 1644 { 1645 fn from_bytes(data: &[u8], modules: &ModuleDecoderRegistry) -> Result<Self, DecodingError> { 1646 T::consensus_decode(&mut std::io::Cursor::new(data), modules) 1647 .map_err(|e| DecodingError::Other(e.0)) 1648 } 1649 1650 fn to_bytes(&self) -> Vec<u8> { 1651 let mut bytes = Vec::new(); 1652 self.consensus_encode(&mut bytes) 1653 .expect("writing to vec can't fail"); 1654 bytes 1655 } 1656 } 1657 1658 /// This is a helper macro that generates the implementations of 1659 /// `DatabaseRecord` necessary for reading/writing to the 1660 /// database and fetching by prefix. 1661 /// 1662 /// - `key`: This is the type of struct that will be used as the key into the 1663 /// database 1664 /// - `value`: This is the type of struct that will be used as the value into 1665 /// the database 1666 /// - `db_prefix`: Required enum expression that is represented as a `u8` and is 1667 /// prepended to this key 1668 /// - `query_prefix`: Optional type of struct that can be passed zero or more 1669 /// times. Every query prefix can be used to query the database via 1670 /// `find_by_prefix` 1671 /// 1672 /// # Examples 1673 /// 1674 /// ``` 1675 /// use fedimint_core::encoding::{Decodable, Encodable}; 1676 /// use fedimint_core::impl_db_record; 1677 /// 1678 /// #[derive(Debug, Encodable, Decodable)] 1679 /// struct MyKey; 1680 /// 1681 /// #[derive(Debug, Encodable, Decodable)] 1682 /// struct MyValue; 1683 /// 1684 /// #[repr(u8)] 1685 /// #[derive(Clone, Debug)] 1686 /// pub enum DbKeyPrefix { 1687 /// MyKey = 0x50, 1688 /// } 1689 /// 1690 /// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey); 1691 /// ``` 1692 /// 1693 /// Use the required parameters and specify one `query_prefix` 1694 /// 1695 /// ``` 1696 /// use fedimint_core::encoding::{Decodable, Encodable}; 1697 /// use fedimint_core::{impl_db_lookup, impl_db_record}; 1698 /// 1699 /// #[derive(Debug, Encodable, Decodable)] 1700 /// struct MyKey; 1701 /// 1702 /// #[derive(Debug, Encodable, Decodable)] 1703 /// struct MyValue; 1704 /// 1705 /// #[repr(u8)] 1706 /// #[derive(Clone, Debug)] 1707 /// pub enum DbKeyPrefix { 1708 /// MyKey = 0x50, 1709 /// } 1710 /// 1711 /// #[derive(Debug, Encodable, Decodable)] 1712 /// struct MyKeyPrefix; 1713 /// 1714 /// impl_db_record!(key = MyKey, value = MyValue, db_prefix = DbKeyPrefix::MyKey,); 1715 /// 1716 /// impl_db_lookup!(key = MyKey, query_prefix = MyKeyPrefix); 1717 /// ``` 1718 #[macro_export] 1719 macro_rules! impl_db_record { 1720 (key = $key:ty, value = $val:ty, db_prefix = $db_prefix:expr $(, notify_on_modify = $notify:tt)? $(,)?) => { 1721 impl $crate::db::DatabaseRecord for $key { 1722 const DB_PREFIX: u8 = $db_prefix as u8; 1723 $(const NOTIFY_ON_MODIFY: bool = $notify;)? 1724 type Key = Self; 1725 type Value = $val; 1726 } 1727 $( 1728 impl_db_record! { 1729 @impl_notify_marker key = $key, notify_on_modify = $notify 1730 } 1731 )? 1732 }; 1733 // if notify is set to true 1734 (@impl_notify_marker key = $key:ty, notify_on_modify = true) => { 1735 impl $crate::db::DatabaseKeyWithNotify for $key {} 1736 }; 1737 // if notify is set to false 1738 (@impl_notify_marker key = $key:ty, notify_on_modify = false) => {}; 1739 } 1740 1741 #[macro_export] 1742 macro_rules! impl_db_lookup{ 1743 (key = $key:ty $(, query_prefix = $query_prefix:ty)* $(,)?) => { 1744 $( 1745 impl $crate::db::DatabaseLookup for $query_prefix { 1746 type Record = $key; 1747 } 1748 )* 1749 }; 1750 } 1751 1752 /// Deprecated: Use `DatabaseVersionKey(ModuleInstanceId)` instead. 1753 #[derive(Debug, Encodable, Decodable, Serialize)] 1754 pub struct DatabaseVersionKeyV0; 1755 1756 #[derive(Debug, Encodable, Decodable, Serialize)] 1757 pub struct DatabaseVersionKey(pub ModuleInstanceId); 1758 1759 #[derive(Debug, Encodable, Decodable, Serialize, Clone, PartialOrd, Ord, PartialEq, Eq, Copy)] 1760 pub struct DatabaseVersion(pub u64); 1761 1762 impl_db_record!( 1763 key = DatabaseVersionKeyV0, 1764 value = DatabaseVersion, 1765 db_prefix = DbKeyPrefix::DatabaseVersion 1766 ); 1767 1768 impl_db_record!( 1769 key = DatabaseVersionKey, 1770 value = DatabaseVersion, 1771 db_prefix = DbKeyPrefix::DatabaseVersion 1772 ); 1773 1774 impl std::fmt::Display for DatabaseVersion { 1775 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 1776 write!(f, "{}", self.0) 1777 } 1778 } 1779 1780 impl DatabaseVersion { 1781 pub fn increment(&mut self) { 1782 self.0 += 1; 1783 } 1784 } 1785 1786 impl std::fmt::Display for DbKeyPrefix { 1787 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { 1788 write!(f, "{self:?}") 1789 } 1790 } 1791 1792 #[repr(u8)] 1793 #[derive(Clone, EnumIter, Debug)] 1794 pub enum DbKeyPrefix { 1795 DatabaseVersion = 0x50, 1796 ClientBackup = 0x51, 1797 } 1798 1799 #[derive(Debug, Error)] 1800 pub enum DecodingError { 1801 #[error("Key had a wrong prefix, expected {expected} but got {found}")] 1802 WrongPrefix { expected: u8, found: u8 }, 1803 #[error("Key had a wrong length, expected {expected} but got {found}")] 1804 WrongLength { expected: usize, found: usize }, 1805 #[error("Other decoding error: {0}")] 1806 Other(anyhow::Error), 1807 } 1808 1809 impl DecodingError { 1810 pub fn other<E: Error + Send + Sync + 'static>(error: E) -> DecodingError { 1811 DecodingError::Other(anyhow::Error::from(error)) 1812 } 1813 1814 pub fn wrong_prefix(expected: u8, found: u8) -> DecodingError { 1815 DecodingError::WrongPrefix { expected, found } 1816 } 1817 1818 pub fn wrong_length(expected: usize, found: usize) -> DecodingError { 1819 DecodingError::WrongLength { expected, found } 1820 } 1821 } 1822 1823 #[macro_export] 1824 macro_rules! push_db_pair_items { 1825 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => { 1826 let db_items = 1827 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type) 1828 .await 1829 .map(|(key, val)| { 1830 ( 1831 $crate::encoding::Encodable::consensus_encode_to_hex(&key), 1832 val, 1833 ) 1834 }) 1835 .collect::<BTreeMap<String, $value_type>>() 1836 .await; 1837 1838 $map.insert($key_literal.to_string(), Box::new(db_items)); 1839 }; 1840 } 1841 1842 #[macro_export] 1843 macro_rules! push_db_pair_items_no_serde { 1844 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $value_type:ty, $map:ident, $key_literal:literal) => { 1845 let db_items = 1846 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type) 1847 .await 1848 .map(|(key, val)| { 1849 ( 1850 $crate::encoding::Encodable::consensus_encode_to_hex(&key), 1851 SerdeWrapper::from_encodable(val), 1852 ) 1853 }) 1854 .collect::<BTreeMap<_, _>>() 1855 .await; 1856 1857 $map.insert($key_literal.to_string(), Box::new(db_items)); 1858 }; 1859 } 1860 1861 #[macro_export] 1862 macro_rules! push_db_key_items { 1863 ($dbtx:ident, $prefix_type:expr, $key_type:ty, $map:ident, $key_literal:literal) => { 1864 let db_items = 1865 $crate::db::IDatabaseTransactionOpsCoreTyped::find_by_prefix($dbtx, &$prefix_type) 1866 .await 1867 .map(|(key, _)| key) 1868 .collect::<Vec<$key_type>>() 1869 .await; 1870 1871 $map.insert($key_literal.to_string(), Box::new(db_items)); 1872 }; 1873 } 1874 1875 /// `ServerMigrationFn` that modules can implement to "migrate" the database 1876 /// to the next database version. 1877 pub type ServerMigrationFn = 1878 for<'r, 'tx> fn( 1879 &'r mut DatabaseTransaction<'tx>, 1880 ) -> Pin<Box<dyn futures::Future<Output = anyhow::Result<()>> + Send + 'r>>; 1881 1882 /// Applies the database migrations to a non-isolated database. 1883 pub async fn apply_migrations_server( 1884 db: &Database, 1885 kind: String, 1886 target_db_version: DatabaseVersion, 1887 migrations: BTreeMap<DatabaseVersion, ServerMigrationFn>, 1888 ) -> Result<(), anyhow::Error> { 1889 apply_migrations(db, kind, target_db_version, migrations, None).await 1890 } 1891 1892 /// `apply_migrations` iterates from the on disk database version for the module 1893 /// up to `target_db_version` and executes all of the migrations that exist in 1894 /// the migrations map. Each migration in migrations map updates the 1895 /// database to have the correct on-disk structures that the code is expecting. 1896 /// The entire migration process is atomic (i.e migration from 0->1 and 1->2 1897 /// happen atomically). This function is called before the module is initialized 1898 /// and as long as the correct migrations are supplied in the migrations map, 1899 /// the module will be able to read and write from the database successfully. 1900 pub async fn apply_migrations( 1901 db: &Database, 1902 kind: String, 1903 target_db_version: DatabaseVersion, 1904 migrations: BTreeMap<DatabaseVersion, ServerMigrationFn>, 1905 module_instance_id: Option<ModuleInstanceId>, 1906 ) -> Result<(), anyhow::Error> { 1907 // Newly created databases will not have any data since they have just been 1908 // instantiated. 1909 let mut dbtx = db.begin_transaction_nc().await; 1910 let is_new_db = dbtx.raw_find_by_prefix(&[]).await?.next().await.is_none(); 1911 1912 // First write the database version to disk if it does not exist. 1913 create_database_version( 1914 db, 1915 target_db_version, 1916 module_instance_id, 1917 kind.clone(), 1918 is_new_db, 1919 ) 1920 .await?; 1921 1922 let mut global_dbtx = db.begin_transaction().await; 1923 let module_instance_id_key = module_instance_id_or_global(module_instance_id); 1924 1925 let disk_version = global_dbtx 1926 .get_value(&DatabaseVersionKey(module_instance_id_key)) 1927 .await; 1928 1929 let db_version = if let Some(disk_version) = disk_version { 1930 let mut current_db_version = disk_version; 1931 1932 if current_db_version > target_db_version { 1933 return Err(anyhow::anyhow!(format!( 1934 "On disk database version for module {kind} was higher than the code database version." 1935 ))); 1936 } 1937 1938 while current_db_version < target_db_version { 1939 if let Some(migration) = migrations.get(¤t_db_version) { 1940 info!(target: LOG_DB, ?kind, ?current_db_version, ?target_db_version, "Migrating module..."); 1941 if let Some(module_instance_id) = module_instance_id { 1942 migration( 1943 &mut global_dbtx 1944 .to_ref_with_prefix_module_id(module_instance_id) 1945 .into_nc(), 1946 ) 1947 .await?; 1948 } else { 1949 migration(&mut global_dbtx.to_ref_nc()).await?; 1950 } 1951 } else { 1952 warn!(target: LOG_DB, ?current_db_version, "Missing server db migration"); 1953 } 1954 1955 current_db_version.increment(); 1956 global_dbtx 1957 .insert_entry( 1958 &DatabaseVersionKey(module_instance_id_key), 1959 ¤t_db_version, 1960 ) 1961 .await; 1962 } 1963 1964 current_db_version 1965 } else { 1966 target_db_version 1967 }; 1968 1969 global_dbtx.commit_tx_result().await?; 1970 info!(target: LOG_DB, ?kind, ?db_version, "Migration complete"); 1971 Ok(()) 1972 } 1973 1974 /// Creates the `DatabaseVersion` inside the database if it does not exist. If 1975 /// necessary, this function will migrate the legacy database version to the 1976 /// expected `DatabaseVersionKey`. 1977 pub async fn create_database_version( 1978 db: &Database, 1979 target_db_version: DatabaseVersion, 1980 module_instance_id: Option<ModuleInstanceId>, 1981 kind: String, 1982 is_new_db: bool, 1983 ) -> Result<(), anyhow::Error> { 1984 let key_module_instance_id = module_instance_id_or_global(module_instance_id); 1985 1986 // First check if the module has a `DatabaseVersion` written to 1987 // `DatabaseVersionKey`. If `DatabaseVersion` already exists, there is 1988 // nothing to do. 1989 let mut global_dbtx = db.begin_transaction().await; 1990 if global_dbtx 1991 .get_value(&DatabaseVersionKey(key_module_instance_id)) 1992 .await 1993 .is_none() 1994 { 1995 // If it exists, read and remove the legacy `DatabaseVersion`, which used to be 1996 // in the module's isolated namespace (but not for fedimint-server). 1997 // 1998 // Otherwise, if the previous database contains data and no legacy database 1999 // version, use `DatabaseVersion(0)` so that all database migrations are 2000 // run. Otherwise, this database can assumed to be new and can use 2001 // `target_db_version` to skip the database migrations. 2002 let current_version_in_module = if let Some(module_instance_id) = module_instance_id { 2003 remove_current_db_version_if_exists( 2004 &mut global_dbtx 2005 .to_ref_with_prefix_module_id(module_instance_id) 2006 .into_nc(), 2007 is_new_db, 2008 target_db_version, 2009 ) 2010 .await 2011 } else { 2012 remove_current_db_version_if_exists( 2013 &mut global_dbtx.to_ref().into_nc(), 2014 is_new_db, 2015 target_db_version, 2016 ) 2017 .await 2018 }; 2019 2020 // Write the previous `DatabaseVersion` to the new `DatabaseVersionKey` 2021 info!(target: LOG_DB, ?kind, ?current_version_in_module, ?target_db_version, ?is_new_db, "Creating DatabaseVersionKey..."); 2022 global_dbtx 2023 .insert_new_entry( 2024 &DatabaseVersionKey(key_module_instance_id), 2025 ¤t_version_in_module, 2026 ) 2027 .await; 2028 global_dbtx.commit_tx_result().await?; 2029 } 2030 2031 Ok(()) 2032 } 2033 2034 /// Removes `DatabaseVersion` from `DatabaseVersionKeyV0` if it exists and 2035 /// returns the current database version. If the current version does not 2036 /// exist, use `target_db_version` if the database is new. Otherwise, return 2037 /// `DatabaseVersion(0)` to ensure all migrations are run. 2038 async fn remove_current_db_version_if_exists( 2039 version_dbtx: &mut DatabaseTransaction<'_>, 2040 is_new_db: bool, 2041 target_db_version: DatabaseVersion, 2042 ) -> DatabaseVersion { 2043 // Remove the previous `DatabaseVersion` in the isolated database. If it doesn't 2044 // exist, just use the 0 for the version so that all of the migrations are 2045 // executed. 2046 let current_version_in_module = version_dbtx.remove_entry(&DatabaseVersionKeyV0).await; 2047 match current_version_in_module { 2048 Some(database_version) => database_version, 2049 None if is_new_db => target_db_version, 2050 None => DatabaseVersion(0), 2051 } 2052 } 2053 2054 /// Helper function to retrieve the `module_instance_id` for modules, otherwise 2055 /// return 0xff for the global namespace. 2056 fn module_instance_id_or_global(module_instance_id: Option<ModuleInstanceId>) -> ModuleInstanceId { 2057 // Use 0xff for fedimint-server and the `module_instance_id` for each module 2058 if let Some(module_instance_id) = module_instance_id { 2059 module_instance_id 2060 } else { 2061 MODULE_GLOBAL_PREFIX.into() 2062 } 2063 } 2064 2065 #[allow(unused_imports)] 2066 mod test_utils { 2067 use std::collections::BTreeMap; 2068 use std::time::Duration; 2069 2070 use futures::future::ready; 2071 use futures::{Future, FutureExt, StreamExt}; 2072 use rand::Rng; 2073 use tokio::join; 2074 2075 use super::{ 2076 apply_migrations, Database, DatabaseTransaction, DatabaseVersion, DatabaseVersionKey, 2077 DatabaseVersionKeyV0, ServerMigrationFn, 2078 }; 2079 use crate::core::ModuleKind; 2080 use crate::db::mem_impl::MemDatabase; 2081 use crate::db::{ 2082 IDatabaseTransactionOps, IDatabaseTransactionOpsCoreTyped, MODULE_GLOBAL_PREFIX, 2083 }; 2084 use crate::encoding::{Decodable, Encodable}; 2085 use crate::module::registry::ModuleDecoderRegistry; 2086 2087 pub async fn future_returns_shortly<F: Future>(fut: F) -> Option<F::Output> { 2088 crate::runtime::timeout(Duration::from_millis(10), fut) 2089 .await 2090 .ok() 2091 } 2092 2093 #[repr(u8)] 2094 #[derive(Clone)] 2095 pub enum TestDbKeyPrefix { 2096 Test = 0x42, 2097 AltTest = 0x43, 2098 PercentTestKey = 0x25, 2099 } 2100 2101 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)] 2102 pub(super) struct TestKey(pub u64); 2103 2104 #[derive(Debug, Encodable, Decodable)] 2105 struct DbPrefixTestPrefix; 2106 2107 impl_db_record!( 2108 key = TestKey, 2109 value = TestVal, 2110 db_prefix = TestDbKeyPrefix::Test, 2111 notify_on_modify = true, 2112 ); 2113 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix); 2114 2115 #[derive(Debug, Encodable, Decodable)] 2116 struct TestKeyV0(u64, u64); 2117 2118 #[derive(Debug, Encodable, Decodable)] 2119 struct DbPrefixTestPrefixV0; 2120 2121 impl_db_record!( 2122 key = TestKeyV0, 2123 value = TestVal, 2124 db_prefix = TestDbKeyPrefix::Test, 2125 ); 2126 impl_db_lookup!(key = TestKeyV0, query_prefix = DbPrefixTestPrefixV0); 2127 2128 #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Encodable, Decodable)] 2129 struct AltTestKey(u64); 2130 2131 #[derive(Debug, Encodable, Decodable)] 2132 struct AltDbPrefixTestPrefix; 2133 2134 impl_db_record!( 2135 key = AltTestKey, 2136 value = TestVal, 2137 db_prefix = TestDbKeyPrefix::AltTest, 2138 ); 2139 impl_db_lookup!(key = AltTestKey, query_prefix = AltDbPrefixTestPrefix); 2140 2141 #[derive(Debug, Encodable, Decodable)] 2142 struct PercentTestKey(u64); 2143 2144 #[derive(Debug, Encodable, Decodable)] 2145 struct PercentPrefixTestPrefix; 2146 2147 impl_db_record!( 2148 key = PercentTestKey, 2149 value = TestVal, 2150 db_prefix = TestDbKeyPrefix::PercentTestKey, 2151 ); 2152 2153 impl_db_lookup!(key = PercentTestKey, query_prefix = PercentPrefixTestPrefix); 2154 #[derive(Debug, Encodable, Decodable, Eq, PartialEq, PartialOrd, Ord)] 2155 pub(super) struct TestVal(pub u64); 2156 2157 const TEST_MODULE_PREFIX: u16 = 1; 2158 const ALT_MODULE_PREFIX: u16 = 2; 2159 2160 pub async fn verify_insert_elements(db: Database) { 2161 let mut dbtx = db.begin_transaction().await; 2162 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none()); 2163 assert!(dbtx.insert_entry(&TestKey(2), &TestVal(3)).await.is_none()); 2164 dbtx.commit_tx().await; 2165 2166 // Test values were persisted 2167 let mut dbtx = db.begin_transaction().await; 2168 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2))); 2169 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(3))); 2170 dbtx.commit_tx().await; 2171 2172 // Test overwrites work as expected 2173 let mut dbtx = db.begin_transaction().await; 2174 assert_eq!( 2175 dbtx.insert_entry(&TestKey(1), &TestVal(4)).await, 2176 Some(TestVal(2)) 2177 ); 2178 assert_eq!( 2179 dbtx.insert_entry(&TestKey(2), &TestVal(5)).await, 2180 Some(TestVal(3)) 2181 ); 2182 dbtx.commit_tx().await; 2183 2184 let mut dbtx = db.begin_transaction().await; 2185 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(4))); 2186 assert_eq!(dbtx.get_value(&TestKey(2)).await, Some(TestVal(5))); 2187 dbtx.commit_tx().await; 2188 } 2189 2190 pub async fn verify_remove_nonexisting(db: Database) { 2191 let mut dbtx = db.begin_transaction().await; 2192 assert_eq!(dbtx.get_value(&TestKey(1)).await, None); 2193 let removed = dbtx.remove_entry(&TestKey(1)).await; 2194 assert!(removed.is_none()); 2195 2196 // Commit to suppress the warning message 2197 dbtx.commit_tx().await; 2198 } 2199 2200 pub async fn verify_remove_existing(db: Database) { 2201 let mut dbtx = db.begin_transaction().await; 2202 2203 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none()); 2204 2205 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2))); 2206 2207 let removed = dbtx.remove_entry(&TestKey(1)).await; 2208 assert_eq!(removed, Some(TestVal(2))); 2209 assert_eq!(dbtx.get_value(&TestKey(1)).await, None); 2210 2211 // Commit to suppress the warning message 2212 dbtx.commit_tx().await; 2213 } 2214 2215 pub async fn verify_read_own_writes(db: Database) { 2216 let mut dbtx = db.begin_transaction().await; 2217 2218 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none()); 2219 2220 assert_eq!(dbtx.get_value(&TestKey(1)).await, Some(TestVal(2))); 2221 2222 // Commit to suppress the warning message 2223 dbtx.commit_tx().await; 2224 } 2225 2226 pub async fn verify_prevent_dirty_reads(db: Database) { 2227 let mut dbtx = db.begin_transaction().await; 2228 2229 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none()); 2230 2231 // dbtx2 should not be able to see uncommitted changes 2232 let mut dbtx2 = db.begin_transaction().await; 2233 assert_eq!(dbtx2.get_value(&TestKey(1)).await, None); 2234 2235 // Commit to suppress the warning message 2236 dbtx.commit_tx().await; 2237 } 2238 2239 pub async fn verify_find_by_prefix(db: Database) { 2240 let mut dbtx = db.begin_transaction().await; 2241 dbtx.insert_entry(&TestKey(55), &TestVal(9999)).await; 2242 dbtx.insert_entry(&TestKey(54), &TestVal(8888)).await; 2243 2244 dbtx.insert_entry(&AltTestKey(55), &TestVal(7777)).await; 2245 dbtx.insert_entry(&AltTestKey(54), &TestVal(6666)).await; 2246 dbtx.commit_tx().await; 2247 2248 // Verify finding by prefix returns the correct set of key pairs 2249 let mut dbtx = db.begin_transaction().await; 2250 2251 let mut returned_keys = dbtx 2252 .find_by_prefix(&DbPrefixTestPrefix) 2253 .await 2254 .collect::<Vec<_>>() 2255 .await; 2256 returned_keys.sort(); 2257 let expected = vec![(TestKey(54), TestVal(8888)), (TestKey(55), TestVal(9999))]; 2258 assert_eq!(returned_keys, expected); 2259 2260 let reversed = dbtx 2261 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix) 2262 .await 2263 .collect::<Vec<_>>() 2264 .await; 2265 let mut reversed_expected = expected; 2266 reversed_expected.reverse(); 2267 assert_eq!(reversed, reversed_expected); 2268 2269 let mut returned_keys = dbtx 2270 .find_by_prefix(&AltDbPrefixTestPrefix) 2271 .await 2272 .collect::<Vec<_>>() 2273 .await; 2274 returned_keys.sort(); 2275 let expected = vec![ 2276 (AltTestKey(54), TestVal(6666)), 2277 (AltTestKey(55), TestVal(7777)), 2278 ]; 2279 assert_eq!(returned_keys, expected); 2280 2281 let reversed = dbtx 2282 .find_by_prefix_sorted_descending(&AltDbPrefixTestPrefix) 2283 .await 2284 .collect::<Vec<_>>() 2285 .await; 2286 let mut reversed_expected = expected; 2287 reversed_expected.reverse(); 2288 assert_eq!(reversed, reversed_expected); 2289 } 2290 2291 pub async fn verify_commit(db: Database) { 2292 let mut dbtx = db.begin_transaction().await; 2293 2294 assert!(dbtx.insert_entry(&TestKey(1), &TestVal(2)).await.is_none()); 2295 dbtx.commit_tx().await; 2296 2297 // Verify dbtx2 can see committed transactions 2298 let mut dbtx2 = db.begin_transaction().await; 2299 assert_eq!(dbtx2.get_value(&TestKey(1)).await, Some(TestVal(2))); 2300 } 2301 2302 pub async fn verify_rollback_to_savepoint(db: Database) { 2303 let mut dbtx_rollback = db.begin_transaction().await; 2304 2305 dbtx_rollback 2306 .insert_entry(&TestKey(20), &TestVal(2000)) 2307 .await; 2308 2309 dbtx_rollback 2310 .set_tx_savepoint() 2311 .await 2312 .expect("Error setting transaction savepoint"); 2313 2314 dbtx_rollback 2315 .insert_entry(&TestKey(21), &TestVal(2001)) 2316 .await; 2317 2318 assert_eq!( 2319 dbtx_rollback.get_value(&TestKey(20)).await, 2320 Some(TestVal(2000)) 2321 ); 2322 assert_eq!( 2323 dbtx_rollback.get_value(&TestKey(21)).await, 2324 Some(TestVal(2001)) 2325 ); 2326 2327 dbtx_rollback 2328 .rollback_tx_to_savepoint() 2329 .await 2330 .expect("Error setting transaction savepoint"); 2331 2332 assert_eq!( 2333 dbtx_rollback.get_value(&TestKey(20)).await, 2334 Some(TestVal(2000)) 2335 ); 2336 2337 assert_eq!(dbtx_rollback.get_value(&TestKey(21)).await, None); 2338 2339 // Commit to suppress the warning message 2340 dbtx_rollback.commit_tx().await; 2341 } 2342 2343 pub async fn verify_prevent_nonrepeatable_reads(db: Database) { 2344 let mut dbtx = db.begin_transaction().await; 2345 assert_eq!(dbtx.get_value(&TestKey(100)).await, None); 2346 2347 let mut dbtx2 = db.begin_transaction().await; 2348 2349 dbtx2.insert_entry(&TestKey(100), &TestVal(101)).await; 2350 2351 assert_eq!(dbtx.get_value(&TestKey(100)).await, None); 2352 2353 dbtx2.commit_tx().await; 2354 2355 // dbtx should still read None because it is operating over a snapshot 2356 // of the data when the transaction started 2357 assert_eq!(dbtx.get_value(&TestKey(100)).await, None); 2358 2359 let expected_keys = 0; 2360 let returned_keys = dbtx 2361 .find_by_prefix(&DbPrefixTestPrefix) 2362 .await 2363 .fold(0, |returned_keys, (key, value)| async move { 2364 if let TestKey(100) = key { 2365 assert!(value.eq(&TestVal(101))); 2366 } 2367 returned_keys + 1 2368 }) 2369 .await; 2370 2371 assert_eq!(returned_keys, expected_keys); 2372 } 2373 2374 pub async fn verify_snapshot_isolation(db: Database) { 2375 // This scenario is taken straight out of https://github.com/fedimint/fedimint/issues/5195 bug 2376 for i in 0..1000 { 2377 let base_key = i * 2; 2378 let tx_accepted_key = base_key; 2379 let spent_input_key = base_key + 1; 2380 2381 async fn random_yield() { 2382 let times = if rand::thread_rng().gen_bool(0.5) { 2383 0 2384 } else { 2385 10 2386 }; 2387 for _ in 0..times { 2388 tokio::task::yield_now().await 2389 } 2390 } 2391 2392 join!( 2393 async { 2394 random_yield().await; 2395 let mut dbtx = db.begin_transaction().await; 2396 2397 random_yield().await; 2398 let a = dbtx.get_value(&TestKey(tx_accepted_key)).await; 2399 random_yield().await; 2400 // we have 4 operations that can give you the db key, 2401 // try all of them 2402 let s = match i % 5 { 2403 0 => dbtx.get_value(&TestKey(spent_input_key)).await, 2404 1 => dbtx.remove_entry(&TestKey(spent_input_key)).await, 2405 2 => { 2406 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(200)) 2407 .await 2408 } 2409 3 => { 2410 dbtx.find_by_prefix(&DbPrefixTestPrefix) 2411 .await 2412 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key))) 2413 .map(|(_k, v)| v) 2414 .next() 2415 .await 2416 } 2417 4 => { 2418 dbtx.find_by_prefix_sorted_descending(&DbPrefixTestPrefix) 2419 .await 2420 .filter(|(k, _v)| ready(k == &TestKey(spent_input_key))) 2421 .map(|(_k, v)| v) 2422 .next() 2423 .await 2424 } 2425 _ => { 2426 panic!("woot?"); 2427 } 2428 }; 2429 2430 match (a, s) { 2431 (None, None) | (Some(_), Some(_)) => {} 2432 (None, Some(_)) => panic!("none some?! {}", i), 2433 (Some(_), None) => panic!("some none?! {}", i), 2434 } 2435 }, 2436 async { 2437 random_yield().await; 2438 2439 let mut dbtx = db.begin_transaction().await; 2440 random_yield().await; 2441 assert_eq!(dbtx.get_value(&TestKey(tx_accepted_key)).await, None); 2442 2443 random_yield().await; 2444 assert_eq!( 2445 dbtx.insert_entry(&TestKey(spent_input_key), &TestVal(100)) 2446 .await, 2447 None 2448 ); 2449 2450 random_yield().await; 2451 assert_eq!( 2452 dbtx.insert_entry(&TestKey(tx_accepted_key), &TestVal(100)) 2453 .await, 2454 None 2455 ); 2456 random_yield().await; 2457 dbtx.commit_tx().await; 2458 } 2459 ); 2460 } 2461 } 2462 2463 pub async fn verify_phantom_entry(db: Database) { 2464 let mut dbtx = db.begin_transaction().await; 2465 2466 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await; 2467 2468 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await; 2469 2470 dbtx.commit_tx().await; 2471 2472 let mut dbtx = db.begin_transaction().await; 2473 let expected_keys = 2; 2474 let returned_keys = dbtx 2475 .find_by_prefix(&DbPrefixTestPrefix) 2476 .await 2477 .fold(0, |returned_keys, (key, value)| async move { 2478 match key { 2479 TestKey(100) => { 2480 assert!(value.eq(&TestVal(101))); 2481 } 2482 TestKey(101) => { 2483 assert!(value.eq(&TestVal(102))); 2484 } 2485 _ => {} 2486 }; 2487 returned_keys + 1 2488 }) 2489 .await; 2490 2491 assert_eq!(returned_keys, expected_keys); 2492 2493 let mut dbtx2 = db.begin_transaction().await; 2494 2495 dbtx2.insert_entry(&TestKey(102), &TestVal(103)).await; 2496 2497 dbtx2.commit_tx().await; 2498 2499 let returned_keys = dbtx 2500 .find_by_prefix(&DbPrefixTestPrefix) 2501 .await 2502 .fold(0, |returned_keys, (key, value)| async move { 2503 match key { 2504 TestKey(100) => { 2505 assert!(value.eq(&TestVal(101))); 2506 } 2507 TestKey(101) => { 2508 assert!(value.eq(&TestVal(102))); 2509 } 2510 _ => {} 2511 }; 2512 returned_keys + 1 2513 }) 2514 .await; 2515 2516 assert_eq!(returned_keys, expected_keys); 2517 } 2518 2519 pub async fn expect_write_conflict(db: Database) { 2520 let mut dbtx = db.begin_transaction().await; 2521 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await; 2522 dbtx.commit_tx().await; 2523 2524 let mut dbtx2 = db.begin_transaction().await; 2525 let mut dbtx3 = db.begin_transaction().await; 2526 2527 dbtx2.insert_entry(&TestKey(100), &TestVal(102)).await; 2528 2529 // Depending on if the database implementation supports optimistic or 2530 // pessimistic transactions, this test should generate an error here 2531 // (pessimistic) or at commit time (optimistic) 2532 dbtx3.insert_entry(&TestKey(100), &TestVal(103)).await; 2533 2534 dbtx2.commit_tx().await; 2535 dbtx3.commit_tx_result().await.expect_err("Expecting an error to be returned because this transaction is in a write-write conflict with dbtx"); 2536 } 2537 2538 pub async fn verify_string_prefix(db: Database) { 2539 let mut dbtx = db.begin_transaction().await; 2540 dbtx.insert_entry(&PercentTestKey(100), &TestVal(101)).await; 2541 2542 assert_eq!( 2543 dbtx.get_value(&PercentTestKey(100)).await, 2544 Some(TestVal(101)) 2545 ); 2546 2547 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await; 2548 2549 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await; 2550 2551 dbtx.insert_entry(&PercentTestKey(101), &TestVal(100)).await; 2552 2553 // If the wildcard character ('%') is not handled properly, this will make 2554 // find_by_prefix return 5 results instead of 4 2555 dbtx.insert_entry(&TestKey(101), &TestVal(100)).await; 2556 2557 let expected_keys = 4; 2558 let returned_keys = dbtx 2559 .find_by_prefix(&PercentPrefixTestPrefix) 2560 .await 2561 .fold(0, |returned_keys, (key, value)| async move { 2562 if let PercentTestKey(101) = key { 2563 assert!(value.eq(&TestVal(100))); 2564 } 2565 returned_keys + 1 2566 }) 2567 .await; 2568 2569 assert_eq!(returned_keys, expected_keys); 2570 } 2571 2572 pub async fn verify_remove_by_prefix(db: Database) { 2573 let mut dbtx = db.begin_transaction().await; 2574 2575 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await; 2576 2577 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await; 2578 2579 dbtx.commit_tx().await; 2580 2581 let mut remove_dbtx = db.begin_transaction().await; 2582 remove_dbtx.remove_by_prefix(&DbPrefixTestPrefix).await; 2583 remove_dbtx.commit_tx().await; 2584 2585 let mut dbtx = db.begin_transaction().await; 2586 let expected_keys = 0; 2587 let returned_keys = dbtx 2588 .find_by_prefix(&DbPrefixTestPrefix) 2589 .await 2590 .fold(0, |returned_keys, (key, value)| async move { 2591 match key { 2592 TestKey(100) => { 2593 assert!(value.eq(&TestVal(101))); 2594 } 2595 TestKey(101) => { 2596 assert!(value.eq(&TestVal(102))); 2597 } 2598 _ => {} 2599 }; 2600 returned_keys + 1 2601 }) 2602 .await; 2603 2604 assert_eq!(returned_keys, expected_keys); 2605 } 2606 2607 pub async fn verify_module_db(db: Database, module_db: Database) { 2608 let mut dbtx = db.begin_transaction().await; 2609 2610 dbtx.insert_entry(&TestKey(100), &TestVal(101)).await; 2611 2612 dbtx.insert_entry(&TestKey(101), &TestVal(102)).await; 2613 2614 dbtx.commit_tx().await; 2615 2616 // verify module_dbtx can only read key/value pairs from its own module 2617 let mut module_dbtx = module_db.begin_transaction().await; 2618 assert_eq!(module_dbtx.get_value(&TestKey(100)).await, None); 2619 2620 assert_eq!(module_dbtx.get_value(&TestKey(101)).await, None); 2621 2622 // verify module_dbtx can read key/value pairs that it wrote 2623 let mut dbtx = db.begin_transaction().await; 2624 assert_eq!(dbtx.get_value(&TestKey(100)).await, Some(TestVal(101))); 2625 2626 assert_eq!(dbtx.get_value(&TestKey(101)).await, Some(TestVal(102))); 2627 2628 let mut module_dbtx = module_db.begin_transaction().await; 2629 2630 module_dbtx.insert_entry(&TestKey(100), &TestVal(103)).await; 2631 2632 module_dbtx.insert_entry(&TestKey(101), &TestVal(104)).await; 2633 2634 module_dbtx.commit_tx().await; 2635 2636 let expected_keys = 2; 2637 let mut dbtx = db.begin_transaction().await; 2638 let returned_keys = dbtx 2639 .find_by_prefix(&DbPrefixTestPrefix) 2640 .await 2641 .fold(0, |returned_keys, (key, value)| async move { 2642 match key { 2643 TestKey(100) => { 2644 assert!(value.eq(&TestVal(101))); 2645 } 2646 TestKey(101) => { 2647 assert!(value.eq(&TestVal(102))); 2648 } 2649 _ => {} 2650 }; 2651 returned_keys + 1 2652 }) 2653 .await; 2654 2655 assert_eq!(returned_keys, expected_keys); 2656 2657 let removed = dbtx.remove_entry(&TestKey(100)).await; 2658 assert_eq!(removed, Some(TestVal(101))); 2659 assert_eq!(dbtx.get_value(&TestKey(100)).await, None); 2660 2661 let mut module_dbtx = module_db.begin_transaction().await; 2662 assert_eq!( 2663 module_dbtx.get_value(&TestKey(100)).await, 2664 Some(TestVal(103)) 2665 ); 2666 } 2667 2668 pub async fn verify_module_prefix(db: Database) { 2669 let mut test_dbtx = db.begin_transaction().await; 2670 { 2671 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX); 2672 2673 test_module_dbtx 2674 .insert_entry(&TestKey(100), &TestVal(101)) 2675 .await; 2676 2677 test_module_dbtx 2678 .insert_entry(&TestKey(101), &TestVal(102)) 2679 .await; 2680 } 2681 2682 test_dbtx.commit_tx().await; 2683 2684 let mut alt_dbtx = db.begin_transaction().await; 2685 { 2686 let mut alt_module_dbtx = alt_dbtx.to_ref_with_prefix_module_id(ALT_MODULE_PREFIX); 2687 2688 alt_module_dbtx 2689 .insert_entry(&TestKey(100), &TestVal(103)) 2690 .await; 2691 2692 alt_module_dbtx 2693 .insert_entry(&TestKey(101), &TestVal(104)) 2694 .await; 2695 } 2696 2697 alt_dbtx.commit_tx().await; 2698 2699 // verify test_module_dbtx can only see key/value pairs from its own module 2700 let mut test_dbtx = db.begin_transaction().await; 2701 let mut test_module_dbtx = test_dbtx.to_ref_with_prefix_module_id(TEST_MODULE_PREFIX); 2702 assert_eq!( 2703 test_module_dbtx.get_value(&TestKey(100)).await, 2704 Some(TestVal(101)) 2705 ); 2706 2707 assert_eq!( 2708 test_module_dbtx.get_value(&TestKey(101)).await, 2709 Some(TestVal(102)) 2710 ); 2711 2712 let expected_keys = 2; 2713 let returned_keys = test_module_dbtx 2714 .find_by_prefix(&DbPrefixTestPrefix) 2715 .await 2716 .fold(0, |returned_keys, (key, value)| async move { 2717 match key { 2718 TestKey(100) => { 2719 assert!(value.eq(&TestVal(101))); 2720 } 2721 TestKey(101) => { 2722 assert!(value.eq(&TestVal(102))); 2723 } 2724 _ => {} 2725 }; 2726 returned_keys + 1 2727 }) 2728 .await; 2729 2730 assert_eq!(returned_keys, expected_keys); 2731 2732 let removed = test_module_dbtx.remove_entry(&TestKey(100)).await; 2733 assert_eq!(removed, Some(TestVal(101))); 2734 assert_eq!(test_module_dbtx.get_value(&TestKey(100)).await, None); 2735 2736 // test_dbtx on its own wont find the key because it does not use a module 2737 // prefix 2738 let mut test_dbtx = db.begin_transaction().await; 2739 assert_eq!(test_dbtx.get_value(&TestKey(101)).await, None); 2740 2741 test_dbtx.commit_tx().await; 2742 } 2743 2744 #[cfg(test)] 2745 #[tokio::test] 2746 pub async fn verify_test_migration() { 2747 // Insert a bunch of old dummy data that needs to be migrated to a new version 2748 let db = Database::new(MemDatabase::new(), ModuleDecoderRegistry::default()); 2749 let expected_test_keys_size: usize = 100; 2750 let mut dbtx = db.begin_transaction().await; 2751 for i in 0..expected_test_keys_size { 2752 dbtx.insert_new_entry(&TestKeyV0(i as u64, (i + 1) as u64), &TestVal(i as u64)) 2753 .await; 2754 } 2755 2756 // Will also be migrated to `DatabaseVersionKey` 2757 dbtx.insert_new_entry(&DatabaseVersionKeyV0, &DatabaseVersion(0)) 2758 .await; 2759 dbtx.commit_tx().await; 2760 2761 let mut migrations: BTreeMap<DatabaseVersion, ServerMigrationFn> = BTreeMap::new(); 2762 2763 migrations.insert(DatabaseVersion(0), move |dbtx| { 2764 migrate_test_db_version_0(dbtx).boxed() 2765 }); 2766 2767 apply_migrations( 2768 &db, 2769 "TestModule".to_string(), 2770 DatabaseVersion(1), 2771 migrations, 2772 None, 2773 ) 2774 .await 2775 .expect("Error applying migrations for TestModule"); 2776 2777 // Verify that the migrations completed successfully 2778 let mut dbtx = db.begin_transaction().await; 2779 2780 // Verify that the old `DatabaseVersion` under `DatabaseVersionKeyV0` migrated 2781 // to `DatabaseVersionKey` 2782 assert!(dbtx 2783 .get_value(&DatabaseVersionKey(MODULE_GLOBAL_PREFIX.into())) 2784 .await 2785 .is_some()); 2786 2787 // Verify Dummy module migration 2788 let test_keys = dbtx 2789 .find_by_prefix(&DbPrefixTestPrefix) 2790 .await 2791 .collect::<Vec<_>>() 2792 .await; 2793 let test_keys_size = test_keys.len(); 2794 assert_eq!(test_keys_size, expected_test_keys_size); 2795 for (key, val) in test_keys { 2796 assert_eq!(key.0, val.0 + 1); 2797 } 2798 } 2799 2800 #[allow(dead_code)] 2801 async fn migrate_test_db_version_0<'a, 'b>( 2802 dbtx: &'b mut DatabaseTransaction<'a>, 2803 ) -> Result<(), anyhow::Error> { 2804 let example_keys_v0 = dbtx 2805 .find_by_prefix(&DbPrefixTestPrefixV0) 2806 .await 2807 .collect::<Vec<_>>() 2808 .await; 2809 dbtx.remove_by_prefix(&DbPrefixTestPrefixV0).await; 2810 for (key, val) in example_keys_v0 { 2811 let key_v2 = TestKey(key.1); 2812 dbtx.insert_new_entry(&key_v2, &val).await; 2813 } 2814 Ok(()) 2815 } 2816 2817 #[cfg(test)] 2818 #[tokio::test] 2819 async fn test_autocommit() { 2820 use std::marker::PhantomData; 2821 2822 use anyhow::anyhow; 2823 use async_trait::async_trait; 2824 2825 use crate::db::{ 2826 AutocommitError, BaseDatabaseTransaction, IDatabaseTransaction, 2827 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, 2828 IRawDatabaseTransaction, 2829 }; 2830 use crate::ModuleDecoderRegistry; 2831 2832 #[derive(Debug)] 2833 struct FakeDatabase; 2834 2835 #[async_trait] 2836 impl IRawDatabase for FakeDatabase { 2837 type Transaction<'a> = FakeTransaction<'a>; 2838 async fn begin_transaction(&self) -> FakeTransaction { 2839 FakeTransaction(PhantomData) 2840 } 2841 } 2842 2843 #[derive(Debug)] 2844 struct FakeTransaction<'a>(PhantomData<&'a ()>); 2845 2846 #[async_trait] 2847 impl<'a> IDatabaseTransactionOpsCore for FakeTransaction<'a> { 2848 async fn raw_insert_bytes( 2849 &mut self, 2850 _key: &[u8], 2851 _value: &[u8], 2852 ) -> anyhow::Result<Option<Vec<u8>>> { 2853 unimplemented!() 2854 } 2855 2856 async fn raw_get_bytes(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> { 2857 unimplemented!() 2858 } 2859 2860 async fn raw_remove_entry(&mut self, _key: &[u8]) -> anyhow::Result<Option<Vec<u8>>> { 2861 unimplemented!() 2862 } 2863 2864 async fn raw_find_by_prefix( 2865 &mut self, 2866 _key_prefix: &[u8], 2867 ) -> anyhow::Result<crate::db::PrefixStream<'_>> { 2868 unimplemented!() 2869 } 2870 2871 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> { 2872 unimplemented!() 2873 } 2874 2875 async fn raw_find_by_prefix_sorted_descending( 2876 &mut self, 2877 _key_prefix: &[u8], 2878 ) -> anyhow::Result<crate::db::PrefixStream<'_>> { 2879 unimplemented!() 2880 } 2881 } 2882 2883 #[async_trait] 2884 impl<'a> IDatabaseTransactionOps for FakeTransaction<'a> { 2885 async fn rollback_tx_to_savepoint(&mut self) -> anyhow::Result<()> { 2886 unimplemented!() 2887 } 2888 2889 async fn set_tx_savepoint(&mut self) -> anyhow::Result<()> { 2890 unimplemented!() 2891 } 2892 } 2893 2894 #[async_trait] 2895 impl<'a> IRawDatabaseTransaction for FakeTransaction<'a> { 2896 async fn commit_tx(self) -> anyhow::Result<()> { 2897 Err(anyhow!("Can't commit!")) 2898 } 2899 } 2900 2901 let db = Database::new(FakeDatabase, ModuleDecoderRegistry::default()); 2902 let err = db 2903 .autocommit::<_, _, ()>(|_dbtx, _| Box::pin(async { Ok(()) }), Some(5)) 2904 .await 2905 .unwrap_err(); 2906 2907 match err { 2908 AutocommitError::CommitFailed { 2909 attempts: failed_attempts, 2910 .. 2911 } => { 2912 assert_eq!(failed_attempts, 5) 2913 } 2914 AutocommitError::ClosureError { .. } => panic!("Closure did not return error"), 2915 } 2916 } 2917 } 2918 2919 pub async fn find_by_prefix_sorted_descending<'r, 'inner, KP>( 2920 tx: &'r mut (dyn IDatabaseTransaction + 'inner), 2921 decoders: ModuleDecoderRegistry, 2922 key_prefix: &KP, 2923 ) -> impl Stream< 2924 Item = ( 2925 KP::Record, 2926 <<KP as DatabaseLookup>::Record as DatabaseRecord>::Value, 2927 ), 2928 > + 'r 2929 where 2930 'inner: 'r, 2931 KP: DatabaseLookup, 2932 KP::Record: DatabaseKey, 2933 { 2934 debug!("find by prefix sorted descending"); 2935 let prefix_bytes = key_prefix.to_bytes(); 2936 tx.raw_find_by_prefix_sorted_descending(&prefix_bytes) 2937 .await 2938 .expect("Error doing prefix search in database") 2939 .map(move |(key_bytes, value_bytes)| { 2940 let key = decode_key_expect(&key_bytes, &decoders); 2941 let value = decode_value_expect(&value_bytes, &decoders, &key_bytes); 2942 (key, value) 2943 }) 2944 } 2945 2946 #[cfg(test)] 2947 mod tests { 2948 use tokio::sync::oneshot; 2949 2950 use super::mem_impl::MemDatabase; 2951 use super::*; 2952 use crate::runtime::spawn; 2953 2954 async fn waiter(db: &Database, key: TestKey) -> tokio::task::JoinHandle<TestVal> { 2955 let db = db.clone(); 2956 let (tx, rx) = oneshot::channel::<()>(); 2957 let join_handle = spawn("wait key exists", async move { 2958 let sub = db.wait_key_exists(&key); 2959 tx.send(()).unwrap(); 2960 sub.await 2961 }); 2962 rx.await.unwrap(); 2963 join_handle 2964 } 2965 2966 #[tokio::test] 2967 async fn test_wait_key_before_transaction() { 2968 let key = TestKey(1); 2969 let val = TestVal(2); 2970 let db = MemDatabase::new().into_database(); 2971 2972 let key_task = waiter(&db, TestKey(1)).await; 2973 2974 let mut tx = db.begin_transaction().await; 2975 tx.insert_new_entry(&key, &val).await; 2976 tx.commit_tx().await; 2977 2978 assert_eq!( 2979 future_returns_shortly(async { key_task.await.unwrap() }).await, 2980 Some(TestVal(2)), 2981 "should notify" 2982 ); 2983 } 2984 2985 #[tokio::test] 2986 async fn test_wait_key_before_insert() { 2987 let key = TestKey(1); 2988 let val = TestVal(2); 2989 let db = MemDatabase::new().into_database(); 2990 2991 let mut tx = db.begin_transaction().await; 2992 let key_task = waiter(&db, TestKey(1)).await; 2993 tx.insert_new_entry(&key, &val).await; 2994 tx.commit_tx().await; 2995 2996 assert_eq!( 2997 future_returns_shortly(async { key_task.await.unwrap() }).await, 2998 Some(TestVal(2)), 2999 "should notify" 3000 ); 3001 } 3002 3003 #[tokio::test] 3004 async fn test_wait_key_after_insert() { 3005 let key = TestKey(1); 3006 let val = TestVal(2); 3007 let db = MemDatabase::new().into_database(); 3008 3009 let mut tx = db.begin_transaction().await; 3010 tx.insert_new_entry(&key, &val).await; 3011 3012 let key_task = waiter(&db, TestKey(1)).await; 3013 3014 tx.commit_tx().await; 3015 3016 assert_eq!( 3017 future_returns_shortly(async { key_task.await.unwrap() }).await, 3018 Some(TestVal(2)), 3019 "should notify" 3020 ); 3021 } 3022 3023 #[tokio::test] 3024 async fn test_wait_key_after_commit() { 3025 let key = TestKey(1); 3026 let val = TestVal(2); 3027 let db = MemDatabase::new().into_database(); 3028 3029 let mut tx = db.begin_transaction().await; 3030 tx.insert_new_entry(&key, &val).await; 3031 tx.commit_tx().await; 3032 3033 let key_task = waiter(&db, TestKey(1)).await; 3034 assert_eq!( 3035 future_returns_shortly(async { key_task.await.unwrap() }).await, 3036 Some(TestVal(2)), 3037 "should notify" 3038 ); 3039 } 3040 3041 #[tokio::test] 3042 async fn test_wait_key_isolated_db() { 3043 let module_instance_id = 10; 3044 let key = TestKey(1); 3045 let val = TestVal(2); 3046 let db = MemDatabase::new().into_database(); 3047 let db = db.with_prefix_module_id(module_instance_id); 3048 3049 let key_task = waiter(&db, TestKey(1)).await; 3050 3051 let mut tx = db.begin_transaction().await; 3052 tx.insert_new_entry(&key, &val).await; 3053 tx.commit_tx().await; 3054 3055 assert_eq!( 3056 future_returns_shortly(async { key_task.await.unwrap() }).await, 3057 Some(TestVal(2)), 3058 "should notify" 3059 ); 3060 } 3061 3062 #[tokio::test] 3063 async fn test_wait_key_isolated_tx() { 3064 let module_instance_id = 10; 3065 let key = TestKey(1); 3066 let val = TestVal(2); 3067 let db = MemDatabase::new().into_database(); 3068 3069 let key_task = waiter(&db.with_prefix_module_id(module_instance_id), TestKey(1)).await; 3070 3071 let mut tx = db.begin_transaction().await; 3072 let mut tx_mod = tx.to_ref_with_prefix_module_id(module_instance_id); 3073 tx_mod.insert_new_entry(&key, &val).await; 3074 drop(tx_mod); 3075 tx.commit_tx().await; 3076 3077 assert_eq!( 3078 future_returns_shortly(async { key_task.await.unwrap() }).await, 3079 Some(TestVal(2)), 3080 "should notify" 3081 ); 3082 } 3083 3084 #[tokio::test] 3085 async fn test_wait_key_no_transaction() { 3086 let db = MemDatabase::new().into_database(); 3087 3088 let key_task = waiter(&db, TestKey(1)).await; 3089 assert_eq!( 3090 future_returns_shortly(async { key_task.await.unwrap() }).await, 3091 None, 3092 "should not notify" 3093 ); 3094 } 3095 }