db.rs
1 //! Uses immutable data structures and saves to indexeddb on commit. 2 use std::fmt::Debug; 3 use std::ops::Range; 4 use std::sync::Arc; 5 6 use anyhow::{Context as _, Result}; 7 use fedimint_core::db::{ 8 IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction, 9 PrefixStream, 10 }; 11 use fedimint_core::{apply, async_trait_maybe_send}; 12 use futures::lock::Mutex; 13 use futures::stream; 14 use imbl::OrdMap; 15 use rexie::{Rexie, TransactionMode}; 16 use wasm_bindgen::JsCast; 17 18 pub fn rexie_to_anyhow(e: rexie::Error) -> anyhow::Error { 19 anyhow::anyhow!(e.to_string()) 20 } 21 22 #[derive(Debug, Default)] 23 pub struct DatabaseInsertOperation { 24 pub key: Vec<u8>, 25 pub value: Vec<u8>, 26 pub old_value: Option<Vec<u8>>, 27 } 28 29 #[derive(Debug, Default)] 30 pub struct DatabaseDeleteOperation { 31 pub key: Vec<u8>, 32 pub old_value: Option<Vec<u8>>, 33 } 34 35 #[derive(Debug)] 36 pub enum DatabaseOperation { 37 Insert(DatabaseInsertOperation), 38 Delete(DatabaseDeleteOperation), 39 } 40 41 #[derive(Clone)] 42 pub struct MemAndIndexedDb { 43 data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>, 44 idb: Arc<Rexie>, 45 } 46 47 impl Debug for MemAndIndexedDb { 48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 49 f.debug_struct("MemDatabase").finish_non_exhaustive() 50 } 51 } 52 53 #[derive(Debug)] 54 pub struct MemAndIndexedDbTransaction<'a> { 55 operations: Vec<DatabaseOperation>, 56 tx_data: OrdMap<Vec<u8>, Vec<u8>>, 57 db: &'a MemAndIndexedDb, 58 savepoint: OrdMap<Vec<u8>, Vec<u8>>, 59 num_pending_operations: usize, 60 num_savepoint_operations: usize, 61 } 62 63 impl MemAndIndexedDb { 64 pub async fn new(name: &str) -> Result<Self> { 65 let idb = rexie::Rexie::builder(name) 66 .add_object_store(rexie::ObjectStore::new("default")) 67 .build() 68 .await 69 .map_err(rexie_to_anyhow)?; 70 let idb = Arc::new(idb); 71 let mut data = OrdMap::new(); 72 73 let idb_tx = idb 74 .transaction(&["default"], TransactionMode::ReadWrite) 75 .map_err(rexie_to_anyhow)?; 76 77 let idb_store = idb_tx.store("default").map_err(rexie_to_anyhow)?; 78 let entries = idb_store 79 .scan(None, None, None, None) 80 .await 81 .map_err(rexie_to_anyhow)?; 82 83 for (key, value) in entries { 84 let key = js_sys::Uint8Array::new(&key).to_vec(); 85 let value = value.dyn_into::<js_sys::Uint8Array>().unwrap().to_vec(); 86 data.insert(key, value); 87 } 88 Ok(Self { 89 data: Arc::new(Mutex::new(data)), 90 idb, 91 }) 92 } 93 94 pub async fn delete(self) -> Result<()> { 95 Rexie::delete(&self.idb.name()) 96 .await 97 .map_err(|e| anyhow::anyhow!("Error deleting database: {e}")) 98 } 99 } 100 101 #[apply(async_trait_maybe_send!)] 102 impl IRawDatabase for MemAndIndexedDb { 103 type Transaction<'a> = MemAndIndexedDbTransaction<'a>; 104 async fn begin_transaction<'a>(&'a self) -> MemAndIndexedDbTransaction<'a> { 105 let db_clone = self.data.lock().await.clone(); 106 let mut memtx = MemAndIndexedDbTransaction { 107 operations: Vec::new(), 108 tx_data: db_clone.clone(), 109 db: self, 110 savepoint: db_clone, 111 num_pending_operations: 0, 112 num_savepoint_operations: 0, 113 }; 114 115 memtx 116 .set_tx_savepoint() 117 .await 118 .expect("MemTransaction never fails"); 119 memtx 120 } 121 122 fn checkpoint(&self, _: &std::path::Path) -> Result<(), anyhow::Error> { 123 unimplemented!() 124 } 125 } 126 127 #[apply(async_trait_maybe_send!)] 128 impl<'a> IDatabaseTransactionOpsCore for MemAndIndexedDbTransaction<'a> { 129 async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> { 130 let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await; 131 // Insert data from copy so we can read our own writes 132 let old_value = self.tx_data.insert(key.to_vec(), value.to_vec()); 133 self.operations 134 .push(DatabaseOperation::Insert(DatabaseInsertOperation { 135 key: key.to_vec(), 136 value: value.to_vec(), 137 old_value, 138 })); 139 self.num_pending_operations += 1; 140 val 141 } 142 143 async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 144 Ok(self.tx_data.get(key).cloned()) 145 } 146 147 async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> { 148 // Remove data from copy so we can read our own writes 149 let old_value = self.tx_data.remove(&key.to_vec()); 150 self.operations 151 .push(DatabaseOperation::Delete(DatabaseDeleteOperation { 152 key: key.to_vec(), 153 old_value: old_value.clone(), 154 })); 155 self.num_pending_operations += 1; 156 Ok(old_value) 157 } 158 159 async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> { 160 let data = self 161 .tx_data 162 .range::<_, Vec<u8>>(Range { 163 start: range.start.to_vec(), 164 end: range.end.to_vec(), 165 }) 166 .map(|(key, value)| (key.clone(), value.clone())) 167 .collect::<Vec<_>>(); 168 169 Ok(Box::pin(stream::iter(data))) 170 } 171 172 async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> { 173 let data = self 174 .tx_data 175 .range::<_, Vec<u8>>((key_prefix.to_vec())..) 176 .take_while(|(key, _)| key.starts_with(key_prefix)) 177 .map(|(key, value)| (key.clone(), value.clone())) 178 .collect::<Vec<_>>(); 179 180 Ok(Box::pin(stream::iter(data))) 181 } 182 183 async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> { 184 let keys = self 185 .tx_data 186 .range::<_, Vec<u8>>((key_prefix.to_vec())..) 187 .take_while(|(key, _)| key.starts_with(key_prefix)) 188 .map(|(key, _)| key.clone()) 189 .collect::<Vec<_>>(); 190 for key in keys.iter() { 191 let old_value = self.tx_data.remove(&key.to_vec()); 192 self.operations 193 .push(DatabaseOperation::Delete(DatabaseDeleteOperation { 194 key: key.to_vec(), 195 old_value, 196 })); 197 self.num_pending_operations += 1; 198 } 199 Ok(()) 200 } 201 202 async fn raw_find_by_prefix_sorted_descending( 203 &mut self, 204 key_prefix: &[u8], 205 ) -> Result<PrefixStream<'_>> { 206 let mut data = self 207 .tx_data 208 .range::<_, Vec<u8>>((key_prefix.to_vec())..) 209 .take_while(|(key, _)| key.starts_with(key_prefix)) 210 .map(|(key, value)| (key.clone(), value.clone())) 211 .collect::<Vec<_>>(); 212 data.sort_by(|a, b| a.cmp(b).reverse()); 213 214 Ok(Box::pin(stream::iter(data))) 215 } 216 } 217 218 #[apply(async_trait_maybe_send!)] 219 impl<'a> IDatabaseTransactionOps for MemAndIndexedDbTransaction<'a> { 220 async fn rollback_tx_to_savepoint(&mut self) -> Result<()> { 221 self.tx_data = self.savepoint.clone(); 222 223 // Remove any pending operations beyond the savepoint 224 let removed_ops = self.num_pending_operations - self.num_savepoint_operations; 225 for _i in 0..removed_ops { 226 self.operations.pop(); 227 } 228 229 Ok(()) 230 } 231 232 async fn set_tx_savepoint(&mut self) -> Result<()> { 233 self.savepoint = self.tx_data.clone(); 234 self.num_savepoint_operations = self.num_pending_operations; 235 Ok(()) 236 } 237 } 238 239 // In-memory database transaction should only be used for test code and never 240 // for production as it doesn't properly implement MVCC 241 #[apply(async_trait_maybe_send!)] 242 impl<'a> IRawDatabaseTransaction for MemAndIndexedDbTransaction<'a> { 243 async fn commit_tx(self) -> Result<()> { 244 let mut data = self.db.data.lock().await; 245 let mut data_new = data.clone(); 246 let idb_tx = self 247 .db 248 .idb 249 .transaction(&["default"], TransactionMode::ReadWrite) 250 .map_err(rexie_to_anyhow)?; 251 252 let idb_store = idb_tx.store("default").map_err(rexie_to_anyhow)?; 253 254 let result = async { 255 for op in self.operations { 256 match op { 257 DatabaseOperation::Insert(insert_op) => { 258 let key = js_sys::Uint8Array::from(&insert_op.key[..]); 259 let value = js_sys::Uint8Array::from(&insert_op.value[..]); 260 idb_store 261 .put(&value, Some(&key)) 262 .await 263 .map_err(rexie_to_anyhow)?; 264 let old_value = data_new.insert(insert_op.key, insert_op.value); 265 anyhow::ensure!(old_value == insert_op.old_value, "write-write conflict"); 266 } 267 DatabaseOperation::Delete(delete_op) => { 268 let key = js_sys::Uint8Array::from(&delete_op.key[..]); 269 idb_store 270 .delete(key.into()) 271 .await 272 .map_err(rexie_to_anyhow)?; 273 let old_value = data_new.remove(&delete_op.key); 274 anyhow::ensure!(old_value == delete_op.old_value, "write-write conflict"); 275 } 276 } 277 } 278 Ok(()) 279 } 280 .await; 281 match result { 282 Ok(()) => { 283 idb_tx 284 .commit() 285 .await 286 .map_err(rexie_to_anyhow) 287 .context("indexeddb commit failed")?; 288 // commit the data to memdb 289 *data = data_new; 290 Ok(()) 291 } 292 Err(e) => { 293 idb_tx 294 .abort() 295 .await 296 .map_err(rexie_to_anyhow) 297 .context("indexeddb abort failed")?; 298 Err(e) 299 } 300 } 301 } 302 }