db.rs
  1  //! Uses immutable data structures and saves to indexeddb on commit.
  2  use std::fmt::Debug;
  3  use std::ops::Range;
  4  use std::sync::Arc;
  5  
  6  use anyhow::{Context as _, Result};
  7  use fedimint_core::db::{
  8      IDatabaseTransactionOps, IDatabaseTransactionOpsCore, IRawDatabase, IRawDatabaseTransaction,
  9      PrefixStream,
 10  };
 11  use fedimint_core::{apply, async_trait_maybe_send};
 12  use futures::lock::Mutex;
 13  use futures::stream;
 14  use imbl::OrdMap;
 15  use rexie::{Rexie, TransactionMode};
 16  use wasm_bindgen::JsCast;
 17  
 18  pub fn rexie_to_anyhow(e: rexie::Error) -> anyhow::Error {
 19      anyhow::anyhow!(e.to_string())
 20  }
 21  
 22  #[derive(Debug, Default)]
 23  pub struct DatabaseInsertOperation {
 24      pub key: Vec<u8>,
 25      pub value: Vec<u8>,
 26      pub old_value: Option<Vec<u8>>,
 27  }
 28  
 29  #[derive(Debug, Default)]
 30  pub struct DatabaseDeleteOperation {
 31      pub key: Vec<u8>,
 32      pub old_value: Option<Vec<u8>>,
 33  }
 34  
 35  #[derive(Debug)]
 36  pub enum DatabaseOperation {
 37      Insert(DatabaseInsertOperation),
 38      Delete(DatabaseDeleteOperation),
 39  }
 40  
 41  #[derive(Clone)]
 42  pub struct MemAndIndexedDb {
 43      data: Arc<Mutex<OrdMap<Vec<u8>, Vec<u8>>>>,
 44      idb: Arc<Rexie>,
 45  }
 46  
 47  impl Debug for MemAndIndexedDb {
 48      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 49          f.debug_struct("MemDatabase").finish_non_exhaustive()
 50      }
 51  }
 52  
 53  #[derive(Debug)]
 54  pub struct MemAndIndexedDbTransaction<'a> {
 55      operations: Vec<DatabaseOperation>,
 56      tx_data: OrdMap<Vec<u8>, Vec<u8>>,
 57      db: &'a MemAndIndexedDb,
 58      savepoint: OrdMap<Vec<u8>, Vec<u8>>,
 59      num_pending_operations: usize,
 60      num_savepoint_operations: usize,
 61  }
 62  
 63  impl MemAndIndexedDb {
 64      pub async fn new(name: &str) -> Result<Self> {
 65          let idb = rexie::Rexie::builder(name)
 66              .add_object_store(rexie::ObjectStore::new("default"))
 67              .build()
 68              .await
 69              .map_err(rexie_to_anyhow)?;
 70          let idb = Arc::new(idb);
 71          let mut data = OrdMap::new();
 72  
 73          let idb_tx = idb
 74              .transaction(&["default"], TransactionMode::ReadWrite)
 75              .map_err(rexie_to_anyhow)?;
 76  
 77          let idb_store = idb_tx.store("default").map_err(rexie_to_anyhow)?;
 78          let entries = idb_store
 79              .scan(None, None, None, None)
 80              .await
 81              .map_err(rexie_to_anyhow)?;
 82  
 83          for (key, value) in entries {
 84              let key = js_sys::Uint8Array::new(&key).to_vec();
 85              let value = value.dyn_into::<js_sys::Uint8Array>().unwrap().to_vec();
 86              data.insert(key, value);
 87          }
 88          Ok(Self {
 89              data: Arc::new(Mutex::new(data)),
 90              idb,
 91          })
 92      }
 93  
 94      pub async fn delete(self) -> Result<()> {
 95          Rexie::delete(&self.idb.name())
 96              .await
 97              .map_err(|e| anyhow::anyhow!("Error deleting database: {e}"))
 98      }
 99  }
100  
101  #[apply(async_trait_maybe_send!)]
102  impl IRawDatabase for MemAndIndexedDb {
103      type Transaction<'a> = MemAndIndexedDbTransaction<'a>;
104      async fn begin_transaction<'a>(&'a self) -> MemAndIndexedDbTransaction<'a> {
105          let db_clone = self.data.lock().await.clone();
106          let mut memtx = MemAndIndexedDbTransaction {
107              operations: Vec::new(),
108              tx_data: db_clone.clone(),
109              db: self,
110              savepoint: db_clone,
111              num_pending_operations: 0,
112              num_savepoint_operations: 0,
113          };
114  
115          memtx
116              .set_tx_savepoint()
117              .await
118              .expect("MemTransaction never fails");
119          memtx
120      }
121  
122      fn checkpoint(&self, _: &std::path::Path) -> Result<(), anyhow::Error> {
123          unimplemented!()
124      }
125  }
126  
127  #[apply(async_trait_maybe_send!)]
128  impl<'a> IDatabaseTransactionOpsCore for MemAndIndexedDbTransaction<'a> {
129      async fn raw_insert_bytes(&mut self, key: &[u8], value: &[u8]) -> Result<Option<Vec<u8>>> {
130          let val = IDatabaseTransactionOpsCore::raw_get_bytes(self, key).await;
131          // Insert data from copy so we can read our own writes
132          let old_value = self.tx_data.insert(key.to_vec(), value.to_vec());
133          self.operations
134              .push(DatabaseOperation::Insert(DatabaseInsertOperation {
135                  key: key.to_vec(),
136                  value: value.to_vec(),
137                  old_value,
138              }));
139          self.num_pending_operations += 1;
140          val
141      }
142  
143      async fn raw_get_bytes(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
144          Ok(self.tx_data.get(key).cloned())
145      }
146  
147      async fn raw_remove_entry(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
148          // Remove data from copy so we can read our own writes
149          let old_value = self.tx_data.remove(&key.to_vec());
150          self.operations
151              .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
152                  key: key.to_vec(),
153                  old_value: old_value.clone(),
154              }));
155          self.num_pending_operations += 1;
156          Ok(old_value)
157      }
158  
159      async fn raw_find_by_range(&mut self, range: Range<&[u8]>) -> Result<PrefixStream<'_>> {
160          let data = self
161              .tx_data
162              .range::<_, Vec<u8>>(Range {
163                  start: range.start.to_vec(),
164                  end: range.end.to_vec(),
165              })
166              .map(|(key, value)| (key.clone(), value.clone()))
167              .collect::<Vec<_>>();
168  
169          Ok(Box::pin(stream::iter(data)))
170      }
171  
172      async fn raw_find_by_prefix(&mut self, key_prefix: &[u8]) -> Result<PrefixStream<'_>> {
173          let data = self
174              .tx_data
175              .range::<_, Vec<u8>>((key_prefix.to_vec())..)
176              .take_while(|(key, _)| key.starts_with(key_prefix))
177              .map(|(key, value)| (key.clone(), value.clone()))
178              .collect::<Vec<_>>();
179  
180          Ok(Box::pin(stream::iter(data)))
181      }
182  
183      async fn raw_remove_by_prefix(&mut self, key_prefix: &[u8]) -> anyhow::Result<()> {
184          let keys = self
185              .tx_data
186              .range::<_, Vec<u8>>((key_prefix.to_vec())..)
187              .take_while(|(key, _)| key.starts_with(key_prefix))
188              .map(|(key, _)| key.clone())
189              .collect::<Vec<_>>();
190          for key in keys.iter() {
191              let old_value = self.tx_data.remove(&key.to_vec());
192              self.operations
193                  .push(DatabaseOperation::Delete(DatabaseDeleteOperation {
194                      key: key.to_vec(),
195                      old_value,
196                  }));
197              self.num_pending_operations += 1;
198          }
199          Ok(())
200      }
201  
202      async fn raw_find_by_prefix_sorted_descending(
203          &mut self,
204          key_prefix: &[u8],
205      ) -> Result<PrefixStream<'_>> {
206          let mut data = self
207              .tx_data
208              .range::<_, Vec<u8>>((key_prefix.to_vec())..)
209              .take_while(|(key, _)| key.starts_with(key_prefix))
210              .map(|(key, value)| (key.clone(), value.clone()))
211              .collect::<Vec<_>>();
212          data.sort_by(|a, b| a.cmp(b).reverse());
213  
214          Ok(Box::pin(stream::iter(data)))
215      }
216  }
217  
218  #[apply(async_trait_maybe_send!)]
219  impl<'a> IDatabaseTransactionOps for MemAndIndexedDbTransaction<'a> {
220      async fn rollback_tx_to_savepoint(&mut self) -> Result<()> {
221          self.tx_data = self.savepoint.clone();
222  
223          // Remove any pending operations beyond the savepoint
224          let removed_ops = self.num_pending_operations - self.num_savepoint_operations;
225          for _i in 0..removed_ops {
226              self.operations.pop();
227          }
228  
229          Ok(())
230      }
231  
232      async fn set_tx_savepoint(&mut self) -> Result<()> {
233          self.savepoint = self.tx_data.clone();
234          self.num_savepoint_operations = self.num_pending_operations;
235          Ok(())
236      }
237  }
238  
239  // In-memory database transaction should only be used for test code and never
240  // for production as it doesn't properly implement MVCC
241  #[apply(async_trait_maybe_send!)]
242  impl<'a> IRawDatabaseTransaction for MemAndIndexedDbTransaction<'a> {
243      async fn commit_tx(self) -> Result<()> {
244          let mut data = self.db.data.lock().await;
245          let mut data_new = data.clone();
246          let idb_tx = self
247              .db
248              .idb
249              .transaction(&["default"], TransactionMode::ReadWrite)
250              .map_err(rexie_to_anyhow)?;
251  
252          let idb_store = idb_tx.store("default").map_err(rexie_to_anyhow)?;
253  
254          let result = async {
255              for op in self.operations {
256                  match op {
257                      DatabaseOperation::Insert(insert_op) => {
258                          let key = js_sys::Uint8Array::from(&insert_op.key[..]);
259                          let value = js_sys::Uint8Array::from(&insert_op.value[..]);
260                          idb_store
261                              .put(&value, Some(&key))
262                              .await
263                              .map_err(rexie_to_anyhow)?;
264                          let old_value = data_new.insert(insert_op.key, insert_op.value);
265                          anyhow::ensure!(old_value == insert_op.old_value, "write-write conflict");
266                      }
267                      DatabaseOperation::Delete(delete_op) => {
268                          let key = js_sys::Uint8Array::from(&delete_op.key[..]);
269                          idb_store
270                              .delete(key.into())
271                              .await
272                              .map_err(rexie_to_anyhow)?;
273                          let old_value = data_new.remove(&delete_op.key);
274                          anyhow::ensure!(old_value == delete_op.old_value, "write-write conflict");
275                      }
276                  }
277              }
278              Ok(())
279          }
280          .await;
281          match result {
282              Ok(()) => {
283                  idb_tx
284                      .commit()
285                      .await
286                      .map_err(rexie_to_anyhow)
287                      .context("indexeddb commit failed")?;
288                  // commit the data to memdb
289                  *data = data_new;
290                  Ok(())
291              }
292              Err(e) => {
293                  idb_tx
294                      .abort()
295                      .await
296                      .map_err(rexie_to_anyhow)
297                      .context("indexeddb abort failed")?;
298                  Err(e)
299              }
300          }
301      }
302  }