lib.rs
1 #![allow(where_clauses_object_safety)] // https://github.com/dtolnay/async-trait/issues/228 2 3 pub mod envs; 4 5 use std::fmt; 6 use std::path::Path; 7 use std::str::FromStr; 8 9 use anyhow::{bail, Context, Result}; 10 use async_trait::async_trait; 11 use fedimint_core::db::{ 12 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction, 13 PrefixStream, 14 }; 15 use futures::stream; 16 pub use rocksdb; 17 use rocksdb::{ 18 DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions, 19 }; 20 use tracing::debug; 21 22 use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV; 23 24 #[derive(Debug)] 25 pub struct RocksDb(rocksdb::OptimisticTransactionDB); 26 27 pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>); 28 29 impl RocksDb { 30 pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> { 31 let mut opts = get_default_options()?; 32 // Since we turned synchronous writes one we should never encounter a corrupted 33 // WAL and should rather fail in this case 34 opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency); 35 let db: rocksdb::OptimisticTransactionDB = 36 rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?; 37 Ok(RocksDb(db)) 38 } 39 40 pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB { 41 &self.0 42 } 43 } 44 45 fn is_power_of_two(num: usize) -> bool { 46 num.count_ones() == 1 47 } 48 49 impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> { 50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 51 f.write_str("RocksDbTransaction") 52 } 53 } 54 55 impl<'a> fmt::Debug for RocksDbTransaction<'a> { 56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 57 f.write_str("RocksDbTransaction") 58 } 59 } 60 61 #[test] 62 fn is_power_of_two_sanity() { 63 assert!(!is_power_of_two(0)); 64 assert!(is_power_of_two(1)); 65 assert!(is_power_of_two(2)); 66 assert!(!is_power_of_two(3)); 67 assert!(is_power_of_two(4)); 68 assert!(!is_power_of_two(5)); 69 assert!(is_power_of_two(2 << 10)); 70 assert!(!is_power_of_two((2 << 10) + 1)); 71 } 72 73 fn get_default_options() -> anyhow::Result<rocksdb::Options> { 74 let mut opts = rocksdb::Options::default(); 75 if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) { 76 debug!(var, "Using custom write buffer size"); 77 let size: usize = FromStr::from_str(&var) 78 .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?; 79 if !is_power_of_two(size) { 80 bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV); 81 } 82 opts.set_write_buffer_size(size); 83 } 84 opts.create_if_missing(true); 85 Ok(opts) 86 } 87 88 #[derive(Debug)] 89 pub struct RocksDbReadOnly(rocksdb::DB); 90 91 pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB); 92 93 impl RocksDbReadOnly { 94 pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> { 95 let opts = get_default_options()?; 96 let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?; 97 Ok(RocksDbReadOnly(db)) 98 } 99 } 100 101 impl From<rocksdb::OptimisticTransactionDB> for RocksDb { 102 fn from(db: OptimisticTransactionDB) -> Self { 103 RocksDb(db) 104 } 105 } 106 107 impl From<RocksDb> for rocksdb::OptimisticTransactionDB { 108 fn from(db: RocksDb) -> Self { 109 db.0 110 } 111 } 112 113 // When finding by prefix iterating in Reverse order, we need to start from 114 // "prefix+1" instead of "prefix", using lexicographic ordering. See the tests 115 // below. 116 // Will return None if there is no next prefix (i.e prefix is already the last 117 // possible/max one) 118 fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> { 119 let mut next_prefix = prefix.to_vec(); 120 let mut is_last_prefix = true; 121 for i in (0..next_prefix.len()).rev() { 122 next_prefix[i] = next_prefix[i].wrapping_add(1); 123 if next_prefix[i] > 0 { 124 is_last_prefix = false; 125 break; 126 } 127 } 128 if is_last_prefix { 129 // The given prefix is already the last/max prefix, so there is no next prefix, 130 // return None to represent that 131 None 132 } else { 133 Some(next_prefix) 134 } 135 } 136 137 #[async_trait] 138 impl IRawDatabase for RocksDb { 139 type Transaction<'a> = RocksDbTransaction<'a>; 140 async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction { 141 let mut optimistic_options = OptimisticTransactionOptions::default(); 142 optimistic_options.set_snapshot(true); 143 144 let mut write_options = WriteOptions::default(); 145 // Make sure we never lose data on unclean shutdown 146 write_options.set_sync(true); 147 148 let mut rocksdb_tx = 149 RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options)); 150 rocksdb_tx 151 .set_tx_savepoint() 152 .await 153 .expect("setting tx savepoint failed"); 154 155 rocksdb_tx 156 } 157 } 158 159 #[async_trait] 160 impl IRawDatabase for RocksDbReadOnly { 161 type Transaction<'a> = RocksDbReadOnlyTransaction<'a>; 162 async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> { 163 RocksDbReadOnlyTransaction(&self.0) 164 } 165 } 166 167 #[async_trait] 168 impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> { 169 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 170 fedimint_core::runtime::block_in_place(|| { 171 let val = self.0.snapshot().get(key).unwrap(); 172 self.0.put(key, value)?; 173 Ok(val) 174 }) 175 } 176 177 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 178 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?)) 179 } 180 181 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 182 fedimint_core::runtime::block_in_place(|| { 183 let val = self.0.snapshot().get(key).unwrap(); 184 self.0.delete(key)?; 185 Ok(val) 186 }) 187 } 188 189 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 190 // turn an `iter` into a `Stream` where every `next` is ran inside 191 // `block_in_place` to offload the blocking calls 192 fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> 193 where 194 I: Iterator + Send + 'i, 195 I::Item: Send, 196 { 197 stream::unfold(iter, |mut iter| async move { 198 fedimint_core::runtime::block_in_place(move || { 199 let item = iter.next(); 200 item.map(move |item| (item, iter)) 201 }) 202 }) 203 } 204 205 Ok(fedimint_core::runtime::block_in_place(|| { 206 let prefix = key_prefix.to_vec(); 207 let mut options = rocksdb::ReadOptions::default(); 208 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone())); 209 let iter = self.0.snapshot().iterator_opt( 210 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward), 211 options, 212 ); 213 let rocksdb_iter = iter.map_while(move |res| { 214 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb"); 215 key_bytes 216 .starts_with(&prefix) 217 .then_some((key_bytes.to_vec(), value_bytes.to_vec())) 218 }); 219 Box::pin(convert_to_async_stream(rocksdb_iter)) 220 })) 221 } 222 223 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> { 224 fedimint_core::runtime::block_in_place(|| { 225 // Note: delete_range is not supported in Transactions :/ 226 let mut options = rocksdb::ReadOptions::default(); 227 options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned())); 228 let iter = self 229 .0 230 .snapshot() 231 .iterator_opt( 232 rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward), 233 options, 234 ) 235 .map_while(move |res| { 236 res.map(|(key_bytes, _)| { 237 key_bytes 238 .starts_with(key_prefix) 239 .then_some(key_bytes.to_vec()) 240 }) 241 .transpose() 242 }); 243 244 for item in iter { 245 let key = item?; 246 self.0.delete(key)?; 247 } 248 249 Ok(()) 250 }) 251 } 252 253 async fn raw_find_by_prefix_sorted_descending( 254 &mut self, 255 key_prefix: &[u8], 256 ) -> Result<PrefixStream<'_>> { 257 let prefix = key_prefix.to_vec(); 258 let next_prefix = next_prefix(&prefix); 259 let iterator_mode = if let Some(next_prefix) = &next_prefix { 260 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse) 261 } else { 262 rocksdb::IteratorMode::End 263 }; 264 Ok(fedimint_core::runtime::block_in_place(|| { 265 let mut options = rocksdb::ReadOptions::default(); 266 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone())); 267 let iter = self.0.snapshot().iterator_opt(iterator_mode, options); 268 let rocksdb_iter = iter.map_while(move |res| { 269 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb"); 270 key_bytes 271 .starts_with(&prefix) 272 .then_some((key_bytes.to_vec(), value_bytes.to_vec())) 273 }); 274 Box::pin(stream::iter(rocksdb_iter)) 275 })) 276 } 277 } 278 279 #[async_trait] 280 impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> { 281 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 282 Ok(fedimint_core::runtime::block_in_place(|| { 283 self.0.rollback_to_savepoint() 284 })?) 285 } 286 287 async fn set_tx_savepoint(&mut self) -> Result<()> { 288 fedimint_core::runtime::block_in_place(|| self.0.set_savepoint()); 289 290 Ok(()) 291 } 292 } 293 294 #[async_trait] 295 impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> { 296 async fn commit_tx(self) -> Result<()> { 297 fedimint_core::runtime::block_in_place(|| { 298 self.0.commit()?; 299 Ok(()) 300 }) 301 } 302 } 303 304 #[async_trait] 305 impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> { 306 async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> { 307 panic!("Cannot insert into a read only transaction"); 308 } 309 310 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 311 fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?)) 312 } 313 314 async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> { 315 panic!("Cannot remove from a read only transaction"); 316 } 317 318 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 319 // turn an `iter` into a `Stream` where every `next` is ran inside 320 // `block_in_place` to offload the blocking calls 321 fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item> 322 where 323 I: Iterator + Send + 'i, 324 I::Item: Send, 325 { 326 stream::unfold(iter, |mut iter| async move { 327 fedimint_core::runtime::block_in_place(move || { 328 let item = iter.next(); 329 item.map(move |item| (item, iter)) 330 }) 331 }) 332 } 333 334 Ok(fedimint_core::runtime::block_in_place(|| { 335 let prefix = key_prefix.to_vec(); 336 let mut options = rocksdb::ReadOptions::default(); 337 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone())); 338 let iter = self.0.snapshot().iterator_opt( 339 rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward), 340 options, 341 ); 342 let rocksdb_iter = iter.map_while(move |res| { 343 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb"); 344 key_bytes 345 .starts_with(&prefix) 346 .then_some((key_bytes.to_vec(), value_bytes.to_vec())) 347 }); 348 Box::pin(convert_to_async_stream(rocksdb_iter)) 349 })) 350 } 351 352 async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> { 353 panic!("Cannot remove from a read only transaction"); 354 } 355 356 async fn raw_find_by_prefix_sorted_descending( 357 &mut self, 358 key_prefix: &[u8], 359 ) -> Result<PrefixStream<'_>> { 360 let prefix = key_prefix.to_vec(); 361 let next_prefix = next_prefix(&prefix); 362 let iterator_mode = if let Some(next_prefix) = &next_prefix { 363 rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse) 364 } else { 365 rocksdb::IteratorMode::End 366 }; 367 Ok(fedimint_core::runtime::block_in_place(|| { 368 let mut options = rocksdb::ReadOptions::default(); 369 options.set_iterate_range(rocksdb::PrefixRange(prefix.clone())); 370 let iter = self.0.snapshot().iterator_opt(iterator_mode, options); 371 let rocksdb_iter = iter.map_while(move |res| { 372 let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb"); 373 key_bytes 374 .starts_with(&prefix) 375 .then_some((key_bytes.to_vec(), value_bytes.to_vec())) 376 }); 377 Box::pin(stream::iter(rocksdb_iter)) 378 })) 379 } 380 } 381 382 #[async_trait] 383 impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> { 384 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 385 panic!("Cannot rollback a read only transaction"); 386 } 387 388 async fn set_tx_savepoint(&mut self) -> Result<()> { 389 panic!("Cannot set a savepoint in a read only transaction"); 390 } 391 } 392 393 #[async_trait] 394 impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> { 395 async fn commit_tx(self) -> Result<()> { 396 panic!("Cannot commit a read only transaction"); 397 } 398 } 399 400 #[cfg(test)] 401 mod fedimint_rocksdb_tests { 402 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped}; 403 use fedimint_core::encoding::{Decodable, Encodable}; 404 use fedimint_core::module::registry::ModuleDecoderRegistry; 405 use fedimint_core::{impl_db_lookup, impl_db_record}; 406 use futures::StreamExt; 407 408 use super::*; 409 410 fn open_temp_db(temp_path: &str) -> Database { 411 let path = tempfile::Builder::new() 412 .prefix(temp_path) 413 .tempdir() 414 .unwrap(); 415 416 Database::new( 417 RocksDb::open(path).unwrap(), 418 ModuleDecoderRegistry::default(), 419 ) 420 } 421 422 #[tokio::test(flavor = "multi_thread")] 423 async fn test_dbtx_insert_elements() { 424 fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements")) 425 .await; 426 } 427 428 #[tokio::test(flavor = "multi_thread")] 429 async fn test_dbtx_remove_nonexisting() { 430 fedimint_core::db::verify_remove_nonexisting(open_temp_db( 431 "fcb-rocksdb-test-remove-nonexisting", 432 )) 433 .await; 434 } 435 436 #[tokio::test(flavor = "multi_thread")] 437 async fn test_dbtx_remove_existing() { 438 fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing")) 439 .await; 440 } 441 442 #[tokio::test(flavor = "multi_thread")] 443 async fn test_dbtx_read_own_writes() { 444 fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes")) 445 .await; 446 } 447 448 #[tokio::test(flavor = "multi_thread")] 449 async fn test_dbtx_prevent_dirty_reads() { 450 fedimint_core::db::verify_prevent_dirty_reads(open_temp_db( 451 "fcb-rocksdb-test-prevent-dirty-reads", 452 )) 453 .await; 454 } 455 456 #[tokio::test(flavor = "multi_thread")] 457 async fn test_dbtx_find_by_prefix() { 458 fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix")) 459 .await; 460 } 461 462 #[tokio::test(flavor = "multi_thread")] 463 async fn test_dbtx_commit() { 464 fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await; 465 } 466 467 #[tokio::test(flavor = "multi_thread")] 468 async fn test_dbtx_prevent_nonrepeatable_reads() { 469 fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db( 470 "fcb-rocksdb-test-prevent-nonrepeatable-reads", 471 )) 472 .await; 473 } 474 475 #[tokio::test(flavor = "multi_thread")] 476 async fn test_dbtx_snapshot_isolation() { 477 fedimint_core::db::verify_snapshot_isolation(open_temp_db( 478 "fcb-rocksdb-test-snapshot-isolation", 479 )) 480 .await; 481 } 482 483 #[tokio::test(flavor = "multi_thread")] 484 async fn test_dbtx_rollback_to_savepoint() { 485 fedimint_core::db::verify_rollback_to_savepoint(open_temp_db( 486 "fcb-rocksdb-test-rollback-to-savepoint", 487 )) 488 .await; 489 } 490 491 #[tokio::test(flavor = "multi_thread")] 492 async fn test_dbtx_phantom_entry() { 493 fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry")) 494 .await; 495 } 496 497 #[tokio::test(flavor = "multi_thread")] 498 async fn test_dbtx_write_conflict() { 499 fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict")) 500 .await; 501 } 502 503 #[tokio::test(flavor = "multi_thread")] 504 async fn test_dbtx_remove_by_prefix() { 505 fedimint_core::db::verify_remove_by_prefix(open_temp_db( 506 "fcb-rocksdb-test-remove-by-prefix", 507 )) 508 .await; 509 } 510 511 #[tokio::test(flavor = "multi_thread")] 512 async fn test_module_dbtx() { 513 fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix")) 514 .await; 515 } 516 517 #[tokio::test(flavor = "multi_thread")] 518 async fn test_module_db() { 519 let module_instance_id = 1; 520 let path = tempfile::Builder::new() 521 .prefix("fcb-rocksdb-test-module-db-prefix") 522 .tempdir() 523 .unwrap(); 524 525 let module_db = Database::new( 526 RocksDb::open(path).unwrap(), 527 ModuleDecoderRegistry::default(), 528 ); 529 530 fedimint_core::db::verify_module_db( 531 open_temp_db("fcb-rocksdb-test-module-db"), 532 module_db.with_prefix_module_id(module_instance_id), 533 ) 534 .await; 535 } 536 537 #[test] 538 fn test_next_prefix() { 539 // Note: although we are testing the general case of a vector with N elements, 540 // the prefixes currently use N = 1 541 assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]); 542 assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]); 543 assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]); 544 assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]); 545 // this is a "max" prefix 546 assert!(next_prefix(&[255, 255, 255]).is_none()); 547 // these are the common case 548 assert_eq!(next_prefix(&[0]).unwrap(), vec![1]); 549 assert_eq!(next_prefix(&[254]).unwrap(), vec![255]); 550 assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix 551 } 552 553 #[repr(u8)] 554 #[derive(Clone)] 555 pub enum TestDbKeyPrefix { 556 Test = 254, 557 MaxTest = 255, 558 } 559 560 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)] 561 pub(super) struct TestKey(pub Vec<u8>); 562 563 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)] 564 pub(super) struct TestVal(pub Vec<u8>); 565 566 #[derive(Debug, Encodable, Decodable)] 567 struct DbPrefixTestPrefix; 568 569 impl_db_record!( 570 key = TestKey, 571 value = TestVal, 572 db_prefix = TestDbKeyPrefix::Test, 573 notify_on_modify = true, 574 ); 575 impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix); 576 577 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)] 578 pub(super) struct TestKey2(pub Vec<u8>); 579 580 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)] 581 pub(super) struct TestVal2(pub Vec<u8>); 582 583 #[derive(Debug, Encodable, Decodable)] 584 struct DbPrefixTestPrefixMax; 585 586 impl_db_record!( 587 key = TestKey2, 588 value = TestVal2, 589 db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix 590 notify_on_modify = true, 591 ); 592 impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax); 593 594 #[tokio::test(flavor = "multi_thread")] 595 async fn test_retrieve_descending_order() { 596 let path = tempfile::Builder::new() 597 .prefix("fcb-rocksdb-test-descending-order") 598 .tempdir() 599 .unwrap(); 600 { 601 let db = Database::new( 602 RocksDb::open(&path).unwrap(), 603 ModuleDecoderRegistry::default(), 604 ); 605 let mut dbtx = db.begin_transaction().await; 606 dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3])) 607 .await; 608 dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1])) 609 .await; 610 dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2])) 611 .await; 612 dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3])) 613 .await; 614 dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1])) 615 .await; 616 dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2])) 617 .await; 618 let query = dbtx 619 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix) 620 .await 621 .collect::<Vec<_>>() 622 .await; 623 assert_eq!( 624 query, 625 vec![ 626 (TestKey(vec![255]), TestVal(vec![2])), 627 (TestKey(vec![254]), TestVal(vec![1])), 628 (TestKey(vec![0]), TestVal(vec![3])) 629 ] 630 ); 631 let query = dbtx 632 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax) 633 .await 634 .collect::<Vec<_>>() 635 .await; 636 assert_eq!( 637 query, 638 vec![ 639 (TestKey2(vec![255]), TestVal2(vec![2])), 640 (TestKey2(vec![254]), TestVal2(vec![1])), 641 (TestKey2(vec![0]), TestVal2(vec![3])) 642 ] 643 ); 644 dbtx.commit_tx().await; 645 } 646 // Test readonly implementation 647 let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap(); 648 let db_readonly = Database::new(db_readonly, Default::default()); 649 let mut dbtx = db_readonly.begin_transaction().await; 650 let query = dbtx 651 .find_by_prefix_sorted_descending(&DbPrefixTestPrefix) 652 .await 653 .collect::<Vec<_>>() 654 .await; 655 assert_eq!( 656 query, 657 vec![ 658 (TestKey(vec![255]), TestVal(vec![2])), 659 (TestKey(vec![254]), TestVal(vec![1])), 660 (TestKey(vec![0]), TestVal(vec![3])) 661 ] 662 ); 663 let query = dbtx 664 .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax) 665 .await 666 .collect::<Vec<_>>() 667 .await; 668 assert_eq!( 669 query, 670 vec![ 671 (TestKey2(vec![255]), TestVal2(vec![2])), 672 (TestKey2(vec![254]), TestVal2(vec![1])), 673 (TestKey2(vec![0]), TestVal2(vec![3])) 674 ] 675 ); 676 } 677 }