mdbx.rs
1 use std::{marker::PhantomData, ops::Range, path::Path}; 2 3 use apibara_core::stream::{MessageData, RawMessageData}; 4 use libmdbx::{ 5 Cursor, Database, DatabaseFlags, Environment, EnvironmentBuilder, EnvironmentKind, 6 Error as MdbxError, Geometry, TableObject, Transaction, TransactionKind, WriteFlags, RW, 7 }; 8 use prost::Message; 9 10 use super::{ 11 table::{Table, TableKey}, 12 DupSortTable, 13 }; 14 15 /// A type-safe view over a mdbx database. 16 pub struct MdbxTable<'txn, T, K, E> 17 where 18 T: Table, 19 K: TransactionKind, 20 E: EnvironmentKind, 21 { 22 txn: &'txn Transaction<'txn, K, E>, 23 db: Database<'txn>, 24 phantom: PhantomData<T>, 25 } 26 27 /// A cursor over items in a `MdbxTable`. 28 pub struct TableCursor<'txn, T, K> 29 where 30 T: Table, 31 K: TransactionKind, 32 { 33 cursor: Cursor<'txn, K>, 34 phantom: PhantomData<T>, 35 } 36 37 /// Result value of any mdbx operation. 38 pub type MdbxResult<T> = Result<T, MdbxError>; 39 40 /// Configure and open a mdbx environment. 41 pub struct MdbxEnvironmentBuilder<E: EnvironmentKind> { 42 env: EnvironmentBuilder<E>, 43 max_dbs: usize, 44 geometry: Geometry<Range<usize>>, 45 } 46 47 /// Extension methods over mdbx environment. 48 pub trait MdbxEnvironmentExt<E: EnvironmentKind> { 49 /// Open a mdbx environment with the default configuration. 50 fn open(path: &Path) -> MdbxResult<Environment<E>>; 51 52 /// Creates a new mdbx environment builder. 53 fn builder() -> MdbxEnvironmentBuilder<E>; 54 } 55 56 /// Extension methods over mdbx RO and RW transactions. 57 pub trait MdbxTransactionExt<K: TransactionKind, E: EnvironmentKind> { 58 /// Open a database accessed through a type-safe [MdbxTable]. 59 fn open_table<T: Table>(&self) -> MdbxResult<MdbxTable<'_, T, K, E>>; 60 61 /// Shorthand for `open_table()?.cursor()?;` 62 /// 63 /// Cannot use `cursor` as name since it's a method on transaction. 64 fn open_cursor<T: Table>(&self) -> MdbxResult<TableCursor<'_, T, K>>; 65 } 66 67 /// Extension methods over mdbx RW transactions. 68 pub trait MdbxRWTransactionExt { 69 /// Ensure the given table database exists. Creates it if it doesn't. 70 fn ensure_table<T: Table>(&self, flags: Option<DatabaseFlags>) -> MdbxResult<()>; 71 } 72 73 impl<E: EnvironmentKind> MdbxEnvironmentExt<E> for Environment<E> { 74 fn open(path: &Path) -> MdbxResult<Environment<E>> { 75 let mut builder = Environment::new(); 76 builder.set_max_dbs(16); 77 builder.open(path) 78 } 79 80 fn builder() -> MdbxEnvironmentBuilder<E> { 81 MdbxEnvironmentBuilder::new() 82 } 83 } 84 85 impl<E: EnvironmentKind> MdbxEnvironmentBuilder<E> { 86 /// Create a new environment builder. 87 pub fn new() -> MdbxEnvironmentBuilder<E> { 88 let env = Environment::new(); 89 // set reasonable default geometry. 90 let min_size = byte_unit::n_gib_bytes!(10) as usize; 91 let max_size = byte_unit::n_gib_bytes!(100) as usize; 92 let growth_step = byte_unit::n_gib_bytes(2) as isize; 93 let geometry = Geometry { 94 size: Some(min_size..max_size), 95 growth_step: Some(growth_step), 96 shrink_threshold: None, 97 page_size: None, 98 }; 99 MdbxEnvironmentBuilder { 100 env, 101 max_dbs: 100, 102 geometry, 103 } 104 } 105 106 /// Change the database size in GiB. 107 pub fn with_size_gib(mut self, min_size: usize, max_size: usize) -> Self { 108 let min_size = byte_unit::n_gib_bytes(min_size as u128) as usize; 109 let max_size = byte_unit::n_gib_bytes(max_size as u128) as usize; 110 self.geometry.size = Some(min_size..max_size); 111 self 112 } 113 114 /// Change the database growth size in GiB. 115 pub fn with_growth_step_gib(mut self, step: isize) -> Self { 116 let step = byte_unit::n_gib_bytes(step as u128) as isize; 117 self.geometry.growth_step = Some(step); 118 self 119 } 120 121 /// Open the environment. 122 pub fn open(mut self, path: &Path) -> MdbxResult<Environment<E>> { 123 self.env 124 .set_geometry(self.geometry) 125 .set_max_dbs(self.max_dbs) 126 .open(path) 127 } 128 } 129 130 impl<E: EnvironmentKind> Default for MdbxEnvironmentBuilder<E> { 131 fn default() -> Self { 132 Self::new() 133 } 134 } 135 136 impl<'env, K, E> MdbxTransactionExt<K, E> for Transaction<'env, K, E> 137 where 138 K: TransactionKind, 139 E: EnvironmentKind, 140 { 141 fn open_table<T: Table>(&self) -> MdbxResult<MdbxTable<'_, T, K, E>> { 142 let database = self.open_db(Some(T::db_name()))?; 143 Ok(MdbxTable { 144 txn: self, 145 db: database, 146 phantom: Default::default(), 147 }) 148 } 149 150 fn open_cursor<T: Table>(&self) -> MdbxResult<TableCursor<'_, T, K>> { 151 self.open_table::<T>()?.cursor() 152 } 153 } 154 155 impl<'env, E: EnvironmentKind> MdbxRWTransactionExt for Transaction<'env, RW, E> { 156 fn ensure_table<T: Table>(&self, flags: Option<DatabaseFlags>) -> MdbxResult<()> { 157 let flags = flags.unwrap_or_default(); 158 let name = T::db_name(); 159 self.create_db(Some(name), flags)?; 160 Ok(()) 161 } 162 } 163 164 #[derive(Debug, Clone)] 165 struct TableObjectWrapper<T>(T); 166 167 impl<'txn, T> TableObject<'txn> for TableObjectWrapper<T> 168 where 169 T: Message + Default + Clone, 170 { 171 fn decode(data_val: &[u8]) -> MdbxResult<Self> 172 where 173 Self: Sized, 174 { 175 T::decode(data_val) 176 .map_err(|err| MdbxError::DecodeError(Box::new(err))) 177 .map(Self) 178 } 179 } 180 181 #[derive(Debug, Clone)] 182 struct RawTableObjectWrapper<T: MessageData>(RawMessageData<T>); 183 184 impl<'txn, T> TableObject<'txn> for RawTableObjectWrapper<T> 185 where 186 T: MessageData, 187 { 188 fn decode(data_val: &[u8]) -> MdbxResult<Self> 189 where 190 Self: Sized, 191 { 192 Ok(Self(RawMessageData::from_vec(data_val.to_vec()))) 193 } 194 } 195 196 #[derive(Debug, Clone)] 197 struct TableKeyWrapper<T>(T); 198 199 impl<'txn, T> TableObject<'txn> for TableKeyWrapper<T> 200 where 201 T: TableKey, 202 { 203 fn decode(data_val: &[u8]) -> MdbxResult<Self> 204 where 205 Self: Sized, 206 { 207 T::decode(data_val) 208 .map_err(|err| MdbxError::DecodeError(Box::new(err))) 209 .map(Self) 210 } 211 } 212 213 impl<'txn, T, K, E> MdbxTable<'txn, T, K, E> 214 where 215 T: Table, 216 K: TransactionKind, 217 E: EnvironmentKind, 218 { 219 /// Returns a cursor over the items in the table. 220 pub fn cursor(&self) -> MdbxResult<TableCursor<'txn, T, K>> { 221 let cursor = self.txn.cursor(&self.db)?; 222 Ok(TableCursor { 223 cursor, 224 phantom: Default::default(), 225 }) 226 } 227 228 /// Get an item in the table by its `key`. 229 pub fn get(&self, key: &T::Key) -> MdbxResult<Option<T::Value>> { 230 let data = self 231 .txn 232 .get::<TableObjectWrapper<_>>(&self.db, key.encode().as_ref())?; 233 Ok(data.map(|d| d.0)) 234 } 235 } 236 237 impl<'txn, T, K> TableCursor<'txn, T, K> 238 where 239 T: Table, 240 K: TransactionKind, 241 { 242 /// Get key/data at current cursor position. 243 pub fn get_current(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 244 map_kv_result::<T>(self.cursor.get_current()) 245 } 246 /// Position at the first item. 247 pub fn first(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 248 map_kv_result::<T>(self.cursor.first()) 249 } 250 251 /// Position at the last item. 252 pub fn last(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 253 map_kv_result::<T>(self.cursor.last()) 254 } 255 256 /// Position at the next item. 257 #[allow(clippy::should_implement_trait)] 258 pub fn next(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 259 map_kv_result::<T>(self.cursor.next()) 260 } 261 262 /// Position at the previous item. 263 pub fn prev(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 264 map_kv_result::<T>(self.cursor.prev()) 265 } 266 267 /// Position at the specified key. 268 pub fn seek_exact(&mut self, key: &T::Key) -> MdbxResult<Option<(T::Key, T::Value)>> { 269 map_kv_result::<T>(self.cursor.set_key(key.encode().as_ref())) 270 } 271 272 /// Position at the specified key and return the raw value . 273 #[allow(clippy::type_complexity)] 274 pub fn seek_exact_raw( 275 &mut self, 276 key: &T::Key, 277 ) -> MdbxResult<Option<(T::Key, RawMessageData<T::Value>)>> { 278 raw_map_kv_result::<T>(self.cursor.set_key(key.encode().as_ref())) 279 } 280 281 /// Position at the first key greater than or equal to the specified key. 282 pub fn seek_range(&mut self, key: &T::Key) -> MdbxResult<Option<(T::Key, T::Value)>> { 283 map_kv_result::<T>(self.cursor.set_range(key.encode().as_ref())) 284 } 285 } 286 287 impl<'txn, T, K> TableCursor<'txn, T, K> 288 where 289 T: DupSortTable, 290 K: TransactionKind, 291 { 292 /// Position at the first item of the current key. 293 pub fn first_dup(&mut self) -> MdbxResult<Option<T::Value>> { 294 Ok(self 295 .cursor 296 .first_dup::<TableObjectWrapper<_>>()? 297 .map(|d| d.0)) 298 } 299 300 /// Position at the last item of the current key. 301 pub fn last_dup(&mut self) -> MdbxResult<Option<T::Value>> { 302 Ok(self 303 .cursor 304 .last_dup::<TableObjectWrapper<_>>()? 305 .map(|d| d.0)) 306 } 307 308 /// Position at the next item of the current key. 309 pub fn next_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 310 map_kv_result::<T>(self.cursor.next_dup()) 311 } 312 313 /// Position at the first item of the next key. 314 pub fn next_no_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 315 map_kv_result::<T>(self.cursor.next_nodup()) 316 } 317 318 /// Position at the previous item of the current key. 319 pub fn prev_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 320 map_kv_result::<T>(self.cursor.prev_dup()) 321 } 322 323 /// Position at the first item of the previous key. 324 pub fn prev_no_dup(&mut self) -> MdbxResult<Option<(T::Key, T::Value)>> { 325 map_kv_result::<T>(self.cursor.prev_nodup()) 326 } 327 } 328 329 impl<'txn, T> TableCursor<'txn, T, RW> 330 where 331 T: Table, 332 { 333 pub fn put(&mut self, key: &T::Key, value: &T::Value) -> MdbxResult<()> { 334 let data = T::Value::encode_to_vec(value); 335 self.cursor 336 .put(key.encode().as_ref(), &data, WriteFlags::default())?; 337 Ok(()) 338 } 339 340 /// Delete the first cursor/data item. 341 pub fn del(&mut self) -> MdbxResult<()> { 342 self.cursor.del(WriteFlags::default()) 343 } 344 } 345 346 impl<'txn, T> TableCursor<'txn, T, RW> 347 where 348 T: DupSortTable, 349 { 350 pub fn append_dup(&mut self, key: &T::Key, value: &T::Value) -> MdbxResult<()> { 351 let data = T::Value::encode_to_vec(value); 352 self.cursor 353 .put(key.encode().as_ref(), &data, WriteFlags::APPEND_DUP)?; 354 Ok(()) 355 } 356 } 357 358 #[allow(clippy::type_complexity)] 359 fn map_kv_result<T>( 360 t: MdbxResult<Option<(TableKeyWrapper<T::Key>, TableObjectWrapper<T::Value>)>>, 361 ) -> MdbxResult<Option<(T::Key, T::Value)>> 362 where 363 T: Table, 364 { 365 if let Some((k, v)) = t? { 366 return Ok(Some((k.0, v.0))); 367 } 368 Ok(None) 369 } 370 371 #[allow(clippy::type_complexity)] 372 fn raw_map_kv_result<T>( 373 t: MdbxResult<Option<(TableKeyWrapper<T::Key>, RawTableObjectWrapper<T::Value>)>>, 374 ) -> MdbxResult<Option<(T::Key, RawMessageData<T::Value>)>> 375 where 376 T: Table, 377 { 378 if let Some((k, v)) = t? { 379 return Ok(Some((k.0, v.0))); 380 } 381 Ok(None) 382 } 383 384 pub trait MdbxErrorExt { 385 fn decode_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> MdbxError; 386 } 387 388 impl MdbxErrorExt for MdbxError { 389 fn decode_error<E: std::error::Error + Send + Sync + 'static>(err: E) -> MdbxError { 390 MdbxError::DecodeError(Box::new(err)) 391 } 392 }