data_provider.rs
1 use std::collections::BTreeSet; 2 3 use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT; 4 use fedimint_core::encoding::Encodable; 5 use fedimint_core::epoch::ConsensusItem; 6 use fedimint_core::session_outcome::SchnorrSignature; 7 use fedimint_core::TransactionId; 8 use tokio::sync::watch; 9 10 use crate::LOG_CONSENSUS; 11 12 #[derive( 13 Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode, 14 )] 15 pub enum UnitData { 16 Batch(Vec<u8>), 17 Signature(SchnorrSignature), 18 } 19 20 impl UnitData { 21 // in order to bound the RAM consumption of a session we have to bound an 22 // individual units size, hence the size of its attached unit data in memory 23 pub fn is_valid(&self) -> bool { 24 match self { 25 UnitData::Signature(..) => true, 26 UnitData::Batch(bytes, ..) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT, 27 } 28 } 29 } 30 31 pub struct DataProvider { 32 mempool_item_receiver: async_channel::Receiver<ConsensusItem>, 33 signature_receiver: watch::Receiver<Option<SchnorrSignature>>, 34 submitted_transactions: BTreeSet<TransactionId>, 35 leftover_item: Option<ConsensusItem>, 36 } 37 38 impl DataProvider { 39 pub fn new( 40 mempool_item_receiver: async_channel::Receiver<ConsensusItem>, 41 signature_receiver: watch::Receiver<Option<SchnorrSignature>>, 42 ) -> Self { 43 Self { 44 mempool_item_receiver, 45 signature_receiver, 46 submitted_transactions: BTreeSet::new(), 47 leftover_item: None, 48 } 49 } 50 } 51 52 #[async_trait::async_trait] 53 impl aleph_bft::DataProvider<UnitData> for DataProvider { 54 async fn get_data(&mut self) -> Option<UnitData> { 55 // we only attach our signature as no more items can be ordered in this session 56 if let Some(signature) = self.signature_receiver.borrow().clone() { 57 return Some(UnitData::Signature(signature)); 58 } 59 60 // the length of a vector is encoded in at most 9 bytes 61 let mut n_bytes = 9; 62 let mut items = Vec::new(); 63 64 if let Some(item) = self.leftover_item.take() { 65 let n_bytes_item = item.consensus_encode_to_vec().len(); 66 67 if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT { 68 n_bytes += n_bytes_item; 69 items.push(item); 70 } else { 71 tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT"); 72 } 73 } 74 75 // if the channel is empty we want to return the batch immediately in order to 76 // not delay the creation of our next unit, even if the batch is empty 77 while let Ok(item) = self.mempool_item_receiver.try_recv() { 78 if let ConsensusItem::Transaction(transaction) = &item { 79 if !self.submitted_transactions.insert(transaction.tx_hash()) { 80 continue; 81 } 82 } 83 84 let n_bytes_item = item.consensus_encode_to_vec().len(); 85 86 if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT { 87 n_bytes += n_bytes_item; 88 items.push(item); 89 } else { 90 self.leftover_item = Some(item); 91 break; 92 } 93 } 94 95 let bytes = items.consensus_encode_to_vec(); 96 97 assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT); 98 99 return Some(UnitData::Batch(bytes)); 100 } 101 }