backup.rs
 1  use async_trait::async_trait;
 2  use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
 3  use futures::StreamExt;
 4  use tracing::info;
 5  
 6  use crate::consensus::db::{AlephUnitsKey, AlephUnitsPrefix};
 7  use crate::LOG_CONSENSUS;
 8  
 9  pub struct BackupReader {
10      db: Database,
11  }
12  
13  impl BackupReader {
14      pub fn new(db: Database) -> Self {
15          Self { db }
16      }
17  }
18  
19  #[async_trait]
20  impl aleph_bft::BackupReader for BackupReader {
21      async fn read(&mut self) -> std::io::Result<Vec<u8>> {
22          let mut dbtx = self.db.begin_transaction().await;
23  
24          let units = dbtx
25              .find_by_prefix(&AlephUnitsPrefix)
26              .await
27              .map(|entry| entry.1)
28              .collect::<Vec<Vec<u8>>>()
29              .await;
30  
31          if !units.is_empty() {
32              info!(target: LOG_CONSENSUS, units_len = %units.len(), "Recovering from an in-session-shutdown");
33          }
34  
35          Ok(units.into_iter().flatten().collect())
36      }
37  }
38  
39  pub struct BackupWriter {
40      db: Database,
41  }
42  
43  impl BackupWriter {
44      pub fn new(db: Database) -> Self {
45          Self { db }
46      }
47  }
48  
49  #[async_trait]
50  impl aleph_bft::BackupWriter for BackupWriter {
51      async fn append(&mut self, data: &[u8]) -> std::io::Result<()> {
52          let mut dbtx = self.db.begin_transaction().await;
53  
54          let index = dbtx
55              .find_by_prefix_sorted_descending(&AlephUnitsPrefix)
56              .await
57              .next()
58              .await
59              .map(|entry| (entry.0 .0) + 1)
60              .unwrap_or(0);
61  
62          dbtx.insert_new_entry(&AlephUnitsKey(index), &data.to_owned())
63              .await;
64  
65          dbtx.commit_tx_result()
66              .await
67              .expect("This is the only place where we write to this key");
68  
69          Ok(())
70      }
71  }