lib.rs
  1  #![allow(where_clauses_object_safety)] // https://github.com/dtolnay/async-trait/issues/228
  2  
  3  pub mod envs;
  4  
  5  use std::fmt;
  6  use std::path::Path;
  7  use std::str::FromStr;
  8  
  9  use anyhow::{bail, Context, Result};
 10  use async_trait::async_trait;
 11  use fedimint_core::db::{
 12      IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
 13      PrefixStream,
 14  };
 15  use futures::stream;
 16  pub use rocksdb;
 17  use rocksdb::{
 18      DBRecoveryMode, OptimisticTransactionDB, OptimisticTransactionOptions, WriteOptions,
 19  };
 20  use tracing::debug;
 21  
 22  use crate::envs::FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV;
 23  
 24  #[derive(Debug)]
 25  pub struct RocksDb(rocksdb::OptimisticTransactionDB);
 26  
 27  pub struct RocksDbTransaction<'a>(rocksdb::Transaction<'a, rocksdb::OptimisticTransactionDB>);
 28  
 29  impl RocksDb {
 30      pub fn open(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDb> {
 31          let mut opts = get_default_options()?;
 32          // Since we turned synchronous writes one we should never encounter a corrupted
 33          // WAL and should rather fail in this case
 34          opts.set_wal_recovery_mode(DBRecoveryMode::AbsoluteConsistency);
 35          let db: rocksdb::OptimisticTransactionDB =
 36              rocksdb::OptimisticTransactionDB::<rocksdb::SingleThreaded>::open(&opts, &db_path)?;
 37          Ok(RocksDb(db))
 38      }
 39  
 40      pub fn inner(&self) -> &rocksdb::OptimisticTransactionDB {
 41          &self.0
 42      }
 43  }
 44  
 45  fn is_power_of_two(num: usize) -> bool {
 46      num.count_ones() == 1
 47  }
 48  
 49  impl<'a> fmt::Debug for RocksDbReadOnlyTransaction<'a> {
 50      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 51          f.write_str("RocksDbTransaction")
 52      }
 53  }
 54  
 55  impl<'a> fmt::Debug for RocksDbTransaction<'a> {
 56      fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 57          f.write_str("RocksDbTransaction")
 58      }
 59  }
 60  
 61  #[test]
 62  fn is_power_of_two_sanity() {
 63      assert!(!is_power_of_two(0));
 64      assert!(is_power_of_two(1));
 65      assert!(is_power_of_two(2));
 66      assert!(!is_power_of_two(3));
 67      assert!(is_power_of_two(4));
 68      assert!(!is_power_of_two(5));
 69      assert!(is_power_of_two(2 << 10));
 70      assert!(!is_power_of_two((2 << 10) + 1));
 71  }
 72  
 73  fn get_default_options() -> anyhow::Result<rocksdb::Options> {
 74      let mut opts = rocksdb::Options::default();
 75      if let Ok(var) = std::env::var(FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV) {
 76          debug!(var, "Using custom write buffer size");
 77          let size: usize = FromStr::from_str(&var)
 78              .with_context(|| format!("Could not parse {FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV}"))?;
 79          if !is_power_of_two(size) {
 80              bail!("{} is not a power of 2", FM_ROCKSDB_WRITE_BUFFER_SIZE_ENV);
 81          }
 82          opts.set_write_buffer_size(size);
 83      }
 84      opts.create_if_missing(true);
 85      Ok(opts)
 86  }
 87  
 88  #[derive(Debug)]
 89  pub struct RocksDbReadOnly(rocksdb::DB);
 90  
 91  pub struct RocksDbReadOnlyTransaction<'a>(&'a rocksdb::DB);
 92  
 93  impl RocksDbReadOnly {
 94      pub fn open_read_only(db_path: impl AsRef<Path>) -> anyhow::Result<RocksDbReadOnly> {
 95          let opts = get_default_options()?;
 96          let db = rocksdb::DB::open_for_read_only(&opts, db_path, false)?;
 97          Ok(RocksDbReadOnly(db))
 98      }
 99  }
100  
101  impl From<rocksdb::OptimisticTransactionDB> for RocksDb {
102      fn from(db: OptimisticTransactionDB) -> Self {
103          RocksDb(db)
104      }
105  }
106  
107  impl From<RocksDb> for rocksdb::OptimisticTransactionDB {
108      fn from(db: RocksDb) -> Self {
109          db.0
110      }
111  }
112  
113  // When finding by prefix iterating in Reverse order, we need to start from
114  // "prefix+1" instead of "prefix", using lexicographic ordering. See the tests
115  // below.
116  // Will return None if there is no next prefix (i.e prefix is already the last
117  // possible/max one)
118  fn next_prefix(prefix: &[u8]) -> Option<Vec<u8>> {
119      let mut next_prefix = prefix.to_vec();
120      let mut is_last_prefix = true;
121      for i in (0..next_prefix.len()).rev() {
122          next_prefix[i] = next_prefix[i].wrapping_add(1);
123          if next_prefix[i] > 0 {
124              is_last_prefix = false;
125              break;
126          }
127      }
128      if is_last_prefix {
129          // The given prefix is already the last/max prefix, so there is no next prefix,
130          // return None to represent that
131          None
132      } else {
133          Some(next_prefix)
134      }
135  }
136  
137  #[async_trait]
138  impl IRawDatabase for RocksDb {
139      type Transaction<'a> = RocksDbTransaction<'a>;
140      async fn begin_transaction<'a>(&'a self) -> RocksDbTransaction {
141          let mut optimistic_options = OptimisticTransactionOptions::default();
142          optimistic_options.set_snapshot(true);
143  
144          let mut write_options = WriteOptions::default();
145          // Make sure we never lose data on unclean shutdown
146          write_options.set_sync(true);
147  
148          let mut rocksdb_tx =
149              RocksDbTransaction(self.0.transaction_opt(&write_options, &optimistic_options));
150          rocksdb_tx
151              .set_tx_savepoint()
152              .await
153              .expect("setting tx savepoint failed");
154  
155          rocksdb_tx
156      }
157  }
158  
159  #[async_trait]
160  impl IRawDatabase for RocksDbReadOnly {
161      type Transaction<'a> = RocksDbReadOnlyTransaction<'a>;
162      async fn begin_transaction<'a>(&'a self) -> RocksDbReadOnlyTransaction<'a> {
163          RocksDbReadOnlyTransaction(&self.0)
164      }
165  }
166  
167  #[async_trait]
168  impl<'a> IDatabaseTransactionOpsCore for RocksDbTransaction<'a> {
169      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
170          fedimint_core::runtime::block_in_place(|| {
171              let val = self.0.snapshot().get(key).unwrap();
172              self.0.put(key, value)?;
173              Ok(val)
174          })
175      }
176  
177      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
178          fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
179      }
180  
181      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
182          fedimint_core::runtime::block_in_place(|| {
183              let val = self.0.snapshot().get(key).unwrap();
184              self.0.delete(key)?;
185              Ok(val)
186          })
187      }
188  
189      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
190          // turn an `iter` into a `Stream` where every `next` is ran inside
191          // `block_in_place` to offload the blocking calls
192          fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
193          where
194              I: Iterator + Send + 'i,
195              I::Item: Send,
196          {
197              stream::unfold(iter, |mut iter| async move {
198                  fedimint_core::runtime::block_in_place(move || {
199                      let item = iter.next();
200                      item.map(move |item| (item, iter))
201                  })
202              })
203          }
204  
205          Ok(fedimint_core::runtime::block_in_place(|| {
206              let prefix = key_prefix.to_vec();
207              let mut options = rocksdb::ReadOptions::default();
208              options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
209              let iter = self.0.snapshot().iterator_opt(
210                  rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
211                  options,
212              );
213              let rocksdb_iter = iter.map_while(move |res| {
214                  let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
215                  key_bytes
216                      .starts_with(&prefix)
217                      .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
218              });
219              Box::pin(convert_to_async_stream(rocksdb_iter))
220          }))
221      }
222  
223      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
224          fedimint_core::runtime::block_in_place(|| {
225              // Note: delete_range is not supported in Transactions :/
226              let mut options = rocksdb::ReadOptions::default();
227              options.set_iterate_range(rocksdb::PrefixRange(key_prefix.to_owned()));
228              let iter = self
229                  .0
230                  .snapshot()
231                  .iterator_opt(
232                      rocksdb::IteratorMode::From(key_prefix, rocksdb::Direction::Forward),
233                      options,
234                  )
235                  .map_while(move |res| {
236                      res.map(|(key_bytes, _)| {
237                          key_bytes
238                              .starts_with(key_prefix)
239                              .then_some(key_bytes.to_vec())
240                      })
241                      .transpose()
242                  });
243  
244              for item in iter {
245                  let key = item?;
246                  self.0.delete(key)?;
247              }
248  
249              Ok(())
250          })
251      }
252  
253      async fn raw_find_by_prefix_sorted_descending(
254          &mut self,
255          key_prefix: &[u8],
256      ) -> Result<PrefixStream<'_>> {
257          let prefix = key_prefix.to_vec();
258          let next_prefix = next_prefix(&prefix);
259          let iterator_mode = if let Some(next_prefix) = &next_prefix {
260              rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
261          } else {
262              rocksdb::IteratorMode::End
263          };
264          Ok(fedimint_core::runtime::block_in_place(|| {
265              let mut options = rocksdb::ReadOptions::default();
266              options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
267              let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
268              let rocksdb_iter = iter.map_while(move |res| {
269                  let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
270                  key_bytes
271                      .starts_with(&prefix)
272                      .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
273              });
274              Box::pin(stream::iter(rocksdb_iter))
275          }))
276      }
277  }
278  
279  #[async_trait]
280  impl<'a> IDatabaseTransactionOps for RocksDbTransaction<'a> {
281      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
282          Ok(fedimint_core::runtime::block_in_place(|| {
283              self.0.rollback_to_savepoint()
284          })?)
285      }
286  
287      async fn set_tx_savepoint(&mut self) -> Result<()> {
288          fedimint_core::runtime::block_in_place(|| self.0.set_savepoint());
289  
290          Ok(())
291      }
292  }
293  
294  #[async_trait]
295  impl<'a> IRawDatabaseTransaction for RocksDbTransaction<'a> {
296      async fn commit_tx(self) -> Result<()> {
297          fedimint_core::runtime::block_in_place(|| {
298              self.0.commit()?;
299              Ok(())
300          })
301      }
302  }
303  
304  #[async_trait]
305  impl<'a> IDatabaseTransactionOpsCore for RocksDbReadOnlyTransaction<'a> {
306      async fn raw_insert_bytes(&mut self, _key: &[u8], _value: &[u8]) -> Result<Option<Vec<u8>>> {
307          panic!("Cannot insert into a read only transaction");
308      }
309  
310      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
311          fedimint_core::runtime::block_in_place(|| Ok(self.0.snapshot().get(key)?))
312      }
313  
314      async fn raw_remove_entry(&mut self, _key: &[u8]) -> Result<Option<Vec<u8>>> {
315          panic!("Cannot remove from a read only transaction");
316      }
317  
318      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
319          // turn an `iter` into a `Stream` where every `next` is ran inside
320          // `block_in_place` to offload the blocking calls
321          fn convert_to_async_stream<'i, I>(iter: I) -> impl futures::Stream<Item = I::Item>
322          where
323              I: Iterator + Send + 'i,
324              I::Item: Send,
325          {
326              stream::unfold(iter, |mut iter| async move {
327                  fedimint_core::runtime::block_in_place(move || {
328                      let item = iter.next();
329                      item.map(move |item| (item, iter))
330                  })
331              })
332          }
333  
334          Ok(fedimint_core::runtime::block_in_place(|| {
335              let prefix = key_prefix.to_vec();
336              let mut options = rocksdb::ReadOptions::default();
337              options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
338              let iter = self.0.snapshot().iterator_opt(
339                  rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
340                  options,
341              );
342              let rocksdb_iter = iter.map_while(move |res| {
343                  let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
344                  key_bytes
345                      .starts_with(&prefix)
346                      .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
347              });
348              Box::pin(convert_to_async_stream(rocksdb_iter))
349          }))
350      }
351  
352      async fn raw_remove_by_prefix(&mut self, _key_prefix: &[u8]) -> anyhow::Result<()> {
353          panic!("Cannot remove from a read only transaction");
354      }
355  
356      async fn raw_find_by_prefix_sorted_descending(
357          &mut self,
358          key_prefix: &[u8],
359      ) -> Result<PrefixStream<'_>> {
360          let prefix = key_prefix.to_vec();
361          let next_prefix = next_prefix(&prefix);
362          let iterator_mode = if let Some(next_prefix) = &next_prefix {
363              rocksdb::IteratorMode::From(next_prefix, rocksdb::Direction::Reverse)
364          } else {
365              rocksdb::IteratorMode::End
366          };
367          Ok(fedimint_core::runtime::block_in_place(|| {
368              let mut options = rocksdb::ReadOptions::default();
369              options.set_iterate_range(rocksdb::PrefixRange(prefix.clone()));
370              let iter = self.0.snapshot().iterator_opt(iterator_mode, options);
371              let rocksdb_iter = iter.map_while(move |res| {
372                  let (key_bytes, value_bytes) = res.expect("Error reading from RocksDb");
373                  key_bytes
374                      .starts_with(&prefix)
375                      .then_some((key_bytes.to_vec(), value_bytes.to_vec()))
376              });
377              Box::pin(stream::iter(rocksdb_iter))
378          }))
379      }
380  }
381  
382  #[async_trait]
383  impl<'a> IDatabaseTransactionOps for RocksDbReadOnlyTransaction<'a> {
384      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
385          panic!("Cannot rollback a read only transaction");
386      }
387  
388      async fn set_tx_savepoint(&mut self) -> Result<()> {
389          panic!("Cannot set a savepoint in a read only transaction");
390      }
391  }
392  
393  #[async_trait]
394  impl<'a> IRawDatabaseTransaction for RocksDbReadOnlyTransaction<'a> {
395      async fn commit_tx(self) -> Result<()> {
396          panic!("Cannot commit a read only transaction");
397      }
398  }
399  
400  #[cfg(test)]
401  mod fedimint_rocksdb_tests {
402      use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
403      use fedimint_core::encoding::{Decodable, Encodable};
404      use fedimint_core::module::registry::ModuleDecoderRegistry;
405      use fedimint_core::{impl_db_lookup, impl_db_record};
406      use futures::StreamExt;
407  
408      use super::*;
409  
410      fn open_temp_db(temp_path: &str) -> Database {
411          let path = tempfile::Builder::new()
412              .prefix(temp_path)
413              .tempdir()
414              .unwrap();
415  
416          Database::new(
417              RocksDb::open(path).unwrap(),
418              ModuleDecoderRegistry::default(),
419          )
420      }
421  
422      #[tokio::test(flavor = "multi_thread")]
423      async fn test_dbtx_insert_elements() {
424          fedimint_core::db::verify_insert_elements(open_temp_db("fcb-rocksdb-test-insert-elements"))
425              .await;
426      }
427  
428      #[tokio::test(flavor = "multi_thread")]
429      async fn test_dbtx_remove_nonexisting() {
430          fedimint_core::db::verify_remove_nonexisting(open_temp_db(
431              "fcb-rocksdb-test-remove-nonexisting",
432          ))
433          .await;
434      }
435  
436      #[tokio::test(flavor = "multi_thread")]
437      async fn test_dbtx_remove_existing() {
438          fedimint_core::db::verify_remove_existing(open_temp_db("fcb-rocksdb-test-remove-existing"))
439              .await;
440      }
441  
442      #[tokio::test(flavor = "multi_thread")]
443      async fn test_dbtx_read_own_writes() {
444          fedimint_core::db::verify_read_own_writes(open_temp_db("fcb-rocksdb-test-read-own-writes"))
445              .await;
446      }
447  
448      #[tokio::test(flavor = "multi_thread")]
449      async fn test_dbtx_prevent_dirty_reads() {
450          fedimint_core::db::verify_prevent_dirty_reads(open_temp_db(
451              "fcb-rocksdb-test-prevent-dirty-reads",
452          ))
453          .await;
454      }
455  
456      #[tokio::test(flavor = "multi_thread")]
457      async fn test_dbtx_find_by_prefix() {
458          fedimint_core::db::verify_find_by_prefix(open_temp_db("fcb-rocksdb-test-find-by-prefix"))
459              .await;
460      }
461  
462      #[tokio::test(flavor = "multi_thread")]
463      async fn test_dbtx_commit() {
464          fedimint_core::db::verify_commit(open_temp_db("fcb-rocksdb-test-commit")).await;
465      }
466  
467      #[tokio::test(flavor = "multi_thread")]
468      async fn test_dbtx_prevent_nonrepeatable_reads() {
469          fedimint_core::db::verify_prevent_nonrepeatable_reads(open_temp_db(
470              "fcb-rocksdb-test-prevent-nonrepeatable-reads",
471          ))
472          .await;
473      }
474  
475      #[tokio::test(flavor = "multi_thread")]
476      async fn test_dbtx_snapshot_isolation() {
477          fedimint_core::db::verify_snapshot_isolation(open_temp_db(
478              "fcb-rocksdb-test-snapshot-isolation",
479          ))
480          .await;
481      }
482  
483      #[tokio::test(flavor = "multi_thread")]
484      async fn test_dbtx_rollback_to_savepoint() {
485          fedimint_core::db::verify_rollback_to_savepoint(open_temp_db(
486              "fcb-rocksdb-test-rollback-to-savepoint",
487          ))
488          .await;
489      }
490  
491      #[tokio::test(flavor = "multi_thread")]
492      async fn test_dbtx_phantom_entry() {
493          fedimint_core::db::verify_phantom_entry(open_temp_db("fcb-rocksdb-test-phantom-entry"))
494              .await;
495      }
496  
497      #[tokio::test(flavor = "multi_thread")]
498      async fn test_dbtx_write_conflict() {
499          fedimint_core::db::expect_write_conflict(open_temp_db("fcb-rocksdb-test-write-conflict"))
500              .await;
501      }
502  
503      #[tokio::test(flavor = "multi_thread")]
504      async fn test_dbtx_remove_by_prefix() {
505          fedimint_core::db::verify_remove_by_prefix(open_temp_db(
506              "fcb-rocksdb-test-remove-by-prefix",
507          ))
508          .await;
509      }
510  
511      #[tokio::test(flavor = "multi_thread")]
512      async fn test_module_dbtx() {
513          fedimint_core::db::verify_module_prefix(open_temp_db("fcb-rocksdb-test-module-prefix"))
514              .await;
515      }
516  
517      #[tokio::test(flavor = "multi_thread")]
518      async fn test_module_db() {
519          let module_instance_id = 1;
520          let path = tempfile::Builder::new()
521              .prefix("fcb-rocksdb-test-module-db-prefix")
522              .tempdir()
523              .unwrap();
524  
525          let module_db = Database::new(
526              RocksDb::open(path).unwrap(),
527              ModuleDecoderRegistry::default(),
528          );
529  
530          fedimint_core::db::verify_module_db(
531              open_temp_db("fcb-rocksdb-test-module-db"),
532              module_db.with_prefix_module_id(module_instance_id),
533          )
534          .await;
535      }
536  
537      #[test]
538      fn test_next_prefix() {
539          // Note: although we are testing the general case of a vector with N elements,
540          // the prefixes currently use N = 1
541          assert_eq!(next_prefix(&[1, 2, 3]).unwrap(), vec![1, 2, 4]);
542          assert_eq!(next_prefix(&[1, 2, 254]).unwrap(), vec![1, 2, 255]);
543          assert_eq!(next_prefix(&[1, 2, 255]).unwrap(), vec![1, 3, 0]);
544          assert_eq!(next_prefix(&[1, 255, 255]).unwrap(), vec![2, 0, 0]);
545          // this is a "max" prefix
546          assert!(next_prefix(&[255, 255, 255]).is_none());
547          // these are the common case
548          assert_eq!(next_prefix(&[0]).unwrap(), vec![1]);
549          assert_eq!(next_prefix(&[254]).unwrap(), vec![255]);
550          assert!(next_prefix(&[255]).is_none()); // this is a "max" prefix
551      }
552  
553      #[repr(u8)]
554      #[derive(Clone)]
555      pub enum TestDbKeyPrefix {
556          Test = 254,
557          MaxTest = 255,
558      }
559  
560      #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
561      pub(super) struct TestKey(pub Vec<u8>);
562  
563      #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
564      pub(super) struct TestVal(pub Vec<u8>);
565  
566      #[derive(Debug, Encodable, Decodable)]
567      struct DbPrefixTestPrefix;
568  
569      impl_db_record!(
570          key = TestKey,
571          value = TestVal,
572          db_prefix = TestDbKeyPrefix::Test,
573          notify_on_modify = true,
574      );
575      impl_db_lookup!(key = TestKey, query_prefix = DbPrefixTestPrefix);
576  
577      #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
578      pub(super) struct TestKey2(pub Vec<u8>);
579  
580      #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Encodable, Decodable)]
581      pub(super) struct TestVal2(pub Vec<u8>);
582  
583      #[derive(Debug, Encodable, Decodable)]
584      struct DbPrefixTestPrefixMax;
585  
586      impl_db_record!(
587          key = TestKey2,
588          value = TestVal2,
589          db_prefix = TestDbKeyPrefix::MaxTest, // max/last prefix
590          notify_on_modify = true,
591      );
592      impl_db_lookup!(key = TestKey2, query_prefix = DbPrefixTestPrefixMax);
593  
594      #[tokio::test(flavor = "multi_thread")]
595      async fn test_retrieve_descending_order() {
596          let path = tempfile::Builder::new()
597              .prefix("fcb-rocksdb-test-descending-order")
598              .tempdir()
599              .unwrap();
600          {
601              let db = Database::new(
602                  RocksDb::open(&path).unwrap(),
603                  ModuleDecoderRegistry::default(),
604              );
605              let mut dbtx = db.begin_transaction().await;
606              dbtx.insert_entry(&TestKey(vec![0]), &TestVal(vec![3]))
607                  .await;
608              dbtx.insert_entry(&TestKey(vec![254]), &TestVal(vec![1]))
609                  .await;
610              dbtx.insert_entry(&TestKey(vec![255]), &TestVal(vec![2]))
611                  .await;
612              dbtx.insert_entry(&TestKey2(vec![0]), &TestVal2(vec![3]))
613                  .await;
614              dbtx.insert_entry(&TestKey2(vec![254]), &TestVal2(vec![1]))
615                  .await;
616              dbtx.insert_entry(&TestKey2(vec![255]), &TestVal2(vec![2]))
617                  .await;
618              let query = dbtx
619                  .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
620                  .await
621                  .collect::<Vec<_>>()
622                  .await;
623              assert_eq!(
624                  query,
625                  vec![
626                      (TestKey(vec![255]), TestVal(vec![2])),
627                      (TestKey(vec![254]), TestVal(vec![1])),
628                      (TestKey(vec![0]), TestVal(vec![3]))
629                  ]
630              );
631              let query = dbtx
632                  .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
633                  .await
634                  .collect::<Vec<_>>()
635                  .await;
636              assert_eq!(
637                  query,
638                  vec![
639                      (TestKey2(vec![255]), TestVal2(vec![2])),
640                      (TestKey2(vec![254]), TestVal2(vec![1])),
641                      (TestKey2(vec![0]), TestVal2(vec![3]))
642                  ]
643              );
644              dbtx.commit_tx().await;
645          }
646          // Test readonly implementation
647          let db_readonly = RocksDbReadOnly::open_read_only(path).unwrap();
648          let db_readonly = Database::new(db_readonly, Default::default());
649          let mut dbtx = db_readonly.begin_transaction().await;
650          let query = dbtx
651              .find_by_prefix_sorted_descending(&DbPrefixTestPrefix)
652              .await
653              .collect::<Vec<_>>()
654              .await;
655          assert_eq!(
656              query,
657              vec![
658                  (TestKey(vec![255]), TestVal(vec![2])),
659                  (TestKey(vec![254]), TestVal(vec![1])),
660                  (TestKey(vec![0]), TestVal(vec![3]))
661              ]
662          );
663          let query = dbtx
664              .find_by_prefix_sorted_descending(&DbPrefixTestPrefixMax)
665              .await
666              .collect::<Vec<_>>()
667              .await;
668          assert_eq!(
669              query,
670              vec![
671                  (TestKey2(vec![255]), TestVal2(vec![2])),
672                  (TestKey2(vec![254]), TestVal2(vec![1])),
673                  (TestKey2(vec![0]), TestVal2(vec![3]))
674              ]
675          );
676      }
677  }