backup.rs
1 use async_trait::async_trait; 2 use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped}; 3 use futures::StreamExt; 4 use tracing::info; 5 6 use crate::consensus::db::{AlephUnitsKey, AlephUnitsPrefix}; 7 use crate::LOG_CONSENSUS; 8 9 pub struct BackupReader { 10 db: Database, 11 } 12 13 impl BackupReader { 14 pub fn new(db: Database) -> Self { 15 Self { db } 16 } 17 } 18 19 #[async_trait] 20 impl aleph_bft::BackupReader for BackupReader { 21 async fn read(&mut self) -> std::io::Result<Vec<u8>> { 22 let mut dbtx = self.db.begin_transaction().await; 23 24 let units = dbtx 25 .find_by_prefix(&AlephUnitsPrefix) 26 .await 27 .map(|entry| entry.1) 28 .collect::<Vec<Vec<u8>>>() 29 .await; 30 31 if !units.is_empty() { 32 info!(target: LOG_CONSENSUS, units_len = %units.len(), "Recovering from an in-session-shutdown"); 33 } 34 35 Ok(units.into_iter().flatten().collect()) 36 } 37 } 38 39 pub struct BackupWriter { 40 db: Database, 41 } 42 43 impl BackupWriter { 44 pub fn new(db: Database) -> Self { 45 Self { db } 46 } 47 } 48 49 #[async_trait] 50 impl aleph_bft::BackupWriter for BackupWriter { 51 async fn append(&mut self, data: &[u8]) -> std::io::Result<()> { 52 let mut dbtx = self.db.begin_transaction().await; 53 54 let index = dbtx 55 .find_by_prefix_sorted_descending(&AlephUnitsPrefix) 56 .await 57 .next() 58 .await 59 .map(|entry| (entry.0 .0) + 1) 60 .unwrap_or(0); 61 62 dbtx.insert_new_entry(&AlephUnitsKey(index), &data.to_owned()) 63 .await; 64 65 dbtx.commit_tx_result() 66 .await 67 .expect("This is the only place where we write to this key"); 68 69 Ok(()) 70 } 71 }