/ node / src / db / mdbx.rs
mdbx.rs
  1  use std::{marker::PhantomData, ops::Range, path::Path};
  2  
  3  use apibara_core::stream::{MessageData, RawMessageData};
  4  use libmdbx::{
  5      Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder, EnvironmentKind,
  6      Error as MdbxError, Geometry, TableObject, Transaction, TransactionKind, WriteFlags, RW,
  7  };
  8  use prost::Message;
  9  
 10  use super::{
 11      table::{Table, TableKey},
 12      DupSortTable,
 13  };
 14  
 15  /// A type-safe view over a mdbx database.
 16  pub struct MdbxTable<'txn, T, K, E>
 17  where
 18      T: Table,
 19      K: TransactionKind,
 20      E: EnvironmentKind,
 21  {
 22      txn: &'txn Transaction<'txn, K, E>,
 23      db: Database<'txn>,
 24      phantom: PhantomData<T>,
 25  }
 26  
 27  /// A cursor over items in a `MdbxTable`.
 28  pub struct TableCursor<'txn, T, K>
 29  where
 30      T: Table,
 31      K: TransactionKind,
 32  {
 33      cursor: Cursor<'txn, K>,
 34      phantom: PhantomData<T>,
 35  }
 36  
 37  /// Result value of any mdbx operation.
 38  pub type MdbxResult<T> = Result<T, MdbxError>;
 39  
 40  /// Configure and open a mdbx environment.
 41  pub struct MdbxEnvironmentBuilder<E: EnvironmentKind> {
 42      env: EnvironmentBuilder<E>,
 43      max_dbs: usize,
 44      geometry: Geometry<Range<usize>>,
 45  }
 46  
 47  /// Extension methods over mdbx environment.
 48  pub trait MdbxEnvironmentExt<E: EnvironmentKind> {
 49      /// Open a mdbx environment with the default configuration.
 50      fn open(path: &Path) -> MdbxResult<Environment<E>>;
 51  
 52      /// Creates a new mdbx environment builder.
 53      fn builder() -> MdbxEnvironmentBuilder<E>;
 54  }
 55  
 56  /// Extension methods over mdbx RO and RW transactions.
 57  pub trait MdbxTransactionExt<K: TransactionKind, E: EnvironmentKind> {
 58      /// Open a database accessed through a type-safe [MdbxTable].
 59      fn open_table<T: Table>(&self) -> MdbxResult<MdbxTable<'_, T, K, E>>;
 60  
 61      /// Shorthand for `open_table()?.cursor()?;`
 62      ///
 63      /// Cannot use `cursor` as name since it's a method on transaction.
 64      fn open_cursor<T: Table>(&self) -> MdbxResult<TableCursor<'_, T, K>>;
 65  }
 66  
 67  /// Extension methods over mdbx RW transactions.
 68  pub trait MdbxRWTransactionExt {
 69      /// Ensure the given table database exists. Creates it if it doesn't.
 70      fn ensure_table<T: Table>(&self, flags: Option<DatabaseFlags>) -> MdbxResult<()>;
 71  }
 72  
 73  impl<E: EnvironmentKind> MdbxEnvironmentExt<E> for Environment<E> {
 74      fn open(path: &Path) -> MdbxResult<Environment<E>> {
 75          let mut builder = Environment::new();
 76          builder.set_max_dbs(16);
 77          builder.open(path)
 78      }
 79  
 80      fn builder() -> MdbxEnvironmentBuilder<E> {
 81          MdbxEnvironmentBuilder::new()
 82      }
 83  }
 84  
 85  impl<E: EnvironmentKind> MdbxEnvironmentBuilder<E> {
 86      /// Create a new environment builder.
 87      pub fn new() -> MdbxEnvironmentBuilder<E> {
 88          let env = Environment::new();
 89          // set reasonable default geometry.
 90          let min_size = byte_unit::n_gib_bytes!(10) as usize;
 91          let max_size = byte_unit::n_gib_bytes!(100) as usize;
 92          let growth_step = byte_unit::n_gib_bytes(2) as isize;
 93          let geometry = Geometry {
 94              size: Some(min_size..max_size),
 95              growth_step: Some(growth_step),
 96              shrink_threshold: None,
 97              page_size: None,
 98          };
 99          MdbxEnvironmentBuilder {
100              env,
101              max_dbs: 100,
102              geometry,
103          }
104      }
105  
106      /// Change the database size in GiB.
107      pub fn with_size_gib(mut self, min_size: usize, max_size: usize) -> Self {
108          let min_size = byte_unit::n_gib_bytes(min_size as u128) as usize;
109          let max_size = byte_unit::n_gib_bytes(max_size as u128) as usize;
110          self.geometry.size = Some(min_size..max_size);
111          self
112      }
113  
114      /// Change the database growth size in GiB.
115      pub fn with_growth_step_gib(mut self, step: isize) -> Self {
116          let step = byte_unit::n_gib_bytes(step as u128) as isize;
117          self.geometry.growth_step = Some(step);
118          self
119      }
120  
121      /// Open the environment.
122      pub fn open(mut self, path: &Path) -> MdbxResult<Environment<E>> {
123          self.env
124              .set_geometry(self.geometry)
125              .set_max_dbs(self.max_dbs)
126              .open(path)
127      }
128  }
129  
130  impl<E: EnvironmentKind> Default for MdbxEnvironmentBuilder<E> {
131      fn default() -> Self {
132          Self::new()
133      }
134  }
135  
136  impl<'env, K, E> MdbxTransactionExt<K, E> for Transaction<'env, K, E>
137  where
138      K: TransactionKind,
139      E: EnvironmentKind,
140  {
141      fn open_table<T: Table>(&self) -> MdbxResult<MdbxTable<'_, T, K, E>> {
142          let database = self.open_db(Some(T::db_name()))?;
143          Ok(MdbxTable {
144              txn: self,
145              db: database,
146              phantom: Default::default(),
147          })
148      }
149  
150      fn open_cursor<T: Table>(&self) -> MdbxResult<TableCursor<'_, T, K>> {
151          self.open_table::<T>()?.cursor()
152      }
153  }
154  
155  impl<'env, E: EnvironmentKind> MdbxRWTransactionExt for Transaction<'env, RW, E> {
156      fn ensure_table<T: Table>(&self, flags: Option<DatabaseFlags>) -> MdbxResult<()> {
157          let flags = flags.unwrap_or_default();
158          let name = T::db_name();
159          self.create_db(Some(name), flags)?;
160          Ok(())
161      }
162  }
163  
164  #[derive(Debug, Clone)]
165  struct TableObjectWrapper<T>(T);
166  
167  impl<'txn, T> TableObject<'txn> for TableObjectWrapper<T>
168  where
169      T: Message + Default + Clone,
170  {
171      fn decode(data_val: &[u8]) -> MdbxResult<Self>
172      where
173          Self: Sized,
174      {
175          T::decode(data_val)
176              .map_err(|err| MdbxError::DecodeError(Box::new(err)))
177              .map(Self)
178      }
179  }
180  
181  #[derive(Debug, Clone)]
182  struct RawTableObjectWrapper<T: MessageData>(RawMessageData<T>);
183  
184  impl<'txn, T> TableObject<'txn> for RawTableObjectWrapper<T>
185  where
186      T: MessageData,
187  {
188      fn decode(data_val: &[u8]) -> MdbxResult<Self>
189      where
190          Self: Sized,
191      {
192          Ok(Self(RawMessageData::from_vec(data_val.to_vec())))
193      }
194  }
195  
196  #[derive(Debug, Clone)]
197  struct TableKeyWrapper<T>(T);
198  
199  impl<'txn, T> TableObject<'txn> for TableKeyWrapper<T>
200  where
201      T: TableKey,
202  {
203      fn decode(data_val: &[u8]) -> MdbxResult<Self>
204      where
205          Self: Sized,
206      {
207          T::decode(data_val)
208              .map_err(|err| MdbxError::DecodeError(Box::new(err)))
209              .map(Self)
210      }
211  }
212  
213  impl<'txn, T, K, E> MdbxTable<'txn, T, K, E>
214  where
215      T: Table,
216      K: TransactionKind,
217      E: EnvironmentKind,
218  {
219      /// Returns a cursor over the items in the table.
220      pub fn cursor(&self) -> MdbxResult<TableCursor<'txn, T, K>> {
221          let cursor = self.txn.cursor(&self.db)?;
222          Ok(TableCursor {
223              cursor,
224              phantom: Default::default(),
225          })
226      }
227  
228      /// Get an item in the table by its `key`.
229      pub fn get(&self, key: &T::Key) -> MdbxResult<Option<T::Value>> {
230          let data = self
231              .txn
232              .get::<TableObjectWrapper<_>>(&self.db, key.encode().as_ref())?;
233          Ok(data.map(|d| d.0))
234      }
235  }
236  
237  impl<'txn, T, K> TableCursor<'txn, T, K>
238  where
239      T: Table,
240      K: TransactionKind,
241  {
242      /// Get key/data at current cursor position.
243      pub fn get_current(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
244          map_kv_result::<T>(self.cursor.get_current())
245      }
246      /// Position at the first item.
247      pub fn first(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
248          map_kv_result::<T>(self.cursor.first())
249      }
250  
251      /// Position at the last item.
252      pub fn last(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
253          map_kv_result::<T>(self.cursor.last())
254      }
255  
256      /// Position at the next item.
257      #[allow(clippy::should_implement_trait)]
258      pub fn next(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
259          map_kv_result::<T>(self.cursor.next())
260      }
261  
262      /// Position at the previous item.
263      pub fn prev(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
264          map_kv_result::<T>(self.cursor.prev())
265      }
266  
267      /// Position at the specified key.
268      pub fn seek_exact(&mut self, key: &T::Key) -> MdbxResult<Option<(T::Key, T::Value)>> {
269          map_kv_result::<T>(self.cursor.set_key(key.encode().as_ref()))
270      }
271  
272      /// Position at the specified key and return the raw value .
273      #[allow(clippy::type_complexity)]
274      pub fn seek_exact_raw(
275          &mut self,
276          key: &T::Key,
277      ) -> MdbxResult<Option<(T::Key, RawMessageData<T::Value>)>> {
278          raw_map_kv_result::<T>(self.cursor.set_key(key.encode().as_ref()))
279      }
280  
281      /// Position at the first key greater than or equal to the specified key.
282      pub fn seek_range(&mut self, key: &T::Key) -> MdbxResult<Option<(T::Key, T::Value)>> {
283          map_kv_result::<T>(self.cursor.set_range(key.encode().as_ref()))
284      }
285  }
286  
287  impl<'txn, T, K> TableCursor<'txn, T, K>
288  where
289      T: DupSortTable,
290      K: TransactionKind,
291  {
292      /// Position at the first item of the current key.
293      pub fn first_dup(&mut self) -> MdbxResult<Option<T::Value>> {
294          Ok(self
295              .cursor
296              .first_dup::<TableObjectWrapper<_>>()?
297              .map(|d| d.0))
298      }
299  
300      /// Position at the last item of the current key.
301      pub fn last_dup(&mut self) -> MdbxResult<Option<T::Value>> {
302          Ok(self
303              .cursor
304              .last_dup::<TableObjectWrapper<_>>()?
305              .map(|d| d.0))
306      }
307  
308      /// Position at the next item of the current key.
309      pub fn next_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
310          map_kv_result::<T>(self.cursor.next_dup())
311      }
312  
313      /// Position at the first item of the next key.
314      pub fn next_no_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
315          map_kv_result::<T>(self.cursor.next_nodup())
316      }
317  
318      /// Position at the previous item of the current key.
319      pub fn prev_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
320          map_kv_result::<T>(self.cursor.prev_dup())
321      }
322  
323      /// Position at the first item of the previous key.
324      pub fn prev_no_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> {
325          map_kv_result::<T>(self.cursor.prev_nodup())
326      }
327  }
328  
329  impl<'txn, T> TableCursor<'txn, T, RW>
330  where
331      T: Table,
332  {
333      pub fn put(&mut self, key: &T::Key, value: &T::Value) -> MdbxResult<()> {
334          let data = T::Value::encode_to_vec(value);
335          self.cursor
336              .put(key.encode().as_ref(), &data, WriteFlags::default())?;
337          Ok(())
338      }
339  
340      /// Delete the first cursor/data item.
341      pub fn del(&mut self) -> MdbxResult<()> {
342          self.cursor.del(WriteFlags::default())
343      }
344  }
345  
346  impl<'txn, T> TableCursor<'txn, T, RW>
347  where
348      T: DupSortTable,
349  {
350      pub fn append_dup(&mut self, key: &T::Key, value: &T::Value) -> MdbxResult<()> {
351          let data = T::Value::encode_to_vec(value);
352          self.cursor
353              .put(key.encode().as_ref(), &data, WriteFlags::APPEND_DUP)?;
354          Ok(())
355      }
356  }
357  
358  #[allow(clippy::type_complexity)]
359  fn map_kv_result<T>(
360      t: MdbxResult<Option<(TableKeyWrapper<T::Key>, TableObjectWrapper<T::Value>)>>,
361  ) -> MdbxResult<Option<(T::Key, T::Value)>>
362  where
363      T: Table,
364  {
365      if let Some((k, v)) = t? {
366          return Ok(Some((k.0, v.0)));
367      }
368      Ok(None)
369  }
370  
371  #[allow(clippy::type_complexity)]
372  fn raw_map_kv_result<T>(
373      t: MdbxResult<Option<(TableKeyWrapper<T::Key>, RawTableObjectWrapper<T::Value>)>>,
374  ) -> MdbxResult<Option<(T::Key, RawMessageData<T::Value>)>>
375  where
376      T: Table,
377  {
378      if let Some((k, v)) = t? {
379          return Ok(Some((k.0, v.0)));
380      }
381      Ok(None)
382  }
383  
384  pub trait MdbxErrorExt {
385      fn decode_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> MdbxError;
386  }
387  
388  impl MdbxErrorExt for MdbxError {
389      fn decode_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> MdbxError {
390          MdbxError::DecodeError(Box::new(err))
391      }
392  }