/ fedimint-server / src / atomic_broadcast / data_provider.rs
data_provider.rs
  1  use std::collections::BTreeSet;
  2  
  3  use fedimint_core::config::ALEPH_BFT_UNIT_BYTE_LIMIT;
  4  use fedimint_core::encoding::Encodable;
  5  use fedimint_core::epoch::ConsensusItem;
  6  use fedimint_core::session_outcome::SchnorrSignature;
  7  use fedimint_core::TransactionId;
  8  use tokio::sync::watch;
  9  
 10  use crate::LOG_CONSENSUS;
 11  
 12  #[derive(
 13      Clone, Debug, PartialEq, Eq, Hash, parity_scale_codec::Encode, parity_scale_codec::Decode,
 14  )]
 15  pub enum UnitData {
 16      Batch(Vec<u8>),
 17      Signature(SchnorrSignature),
 18  }
 19  
 20  impl UnitData {
 21      // in order to bound the RAM consumption of a session we have to bound an
 22      // individual units size, hence the size of its attached unit data in memory
 23      pub fn is_valid(&self) -> bool {
 24          match self {
 25              UnitData::Signature(..) => true,
 26              UnitData::Batch(bytes, ..) => bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT,
 27          }
 28      }
 29  }
 30  
 31  pub struct DataProvider {
 32      mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
 33      signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
 34      submitted_transactions: BTreeSet<TransactionId>,
 35      leftover_item: Option<ConsensusItem>,
 36  }
 37  
 38  impl DataProvider {
 39      pub fn new(
 40          mempool_item_receiver: async_channel::Receiver<ConsensusItem>,
 41          signature_receiver: watch::Receiver<Option<SchnorrSignature>>,
 42      ) -> Self {
 43          Self {
 44              mempool_item_receiver,
 45              signature_receiver,
 46              submitted_transactions: BTreeSet::new(),
 47              leftover_item: None,
 48          }
 49      }
 50  }
 51  
 52  #[async_trait::async_trait]
 53  impl aleph_bft::DataProvider<UnitData> for DataProvider {
 54      async fn get_data(&mut self) -> Option<UnitData> {
 55          // we only attach our signature as no more items can be ordered in this session
 56          if let Some(signature) = self.signature_receiver.borrow().clone() {
 57              return Some(UnitData::Signature(signature));
 58          }
 59  
 60          // the length of a vector is encoded in at most 9 bytes
 61          let mut n_bytes = 9;
 62          let mut items = Vec::new();
 63  
 64          if let Some(item) = self.leftover_item.take() {
 65              let n_bytes_item = item.consensus_encode_to_vec().len();
 66  
 67              if n_bytes_item + n_bytes <= ALEPH_BFT_UNIT_BYTE_LIMIT {
 68                  n_bytes += n_bytes_item;
 69                  items.push(item);
 70              } else {
 71                  tracing::warn!(target: LOG_CONSENSUS, ?item, "Consensus item length is over BYTE_LIMIT");
 72              }
 73          }
 74  
 75          // if the channel is empty we want to return the batch immediately in order to
 76          // not delay the creation of our next unit, even if the batch is empty
 77          while let Ok(item) = self.mempool_item_receiver.try_recv() {
 78              if let ConsensusItem::Transaction(transaction) = &item {
 79                  if !self.submitted_transactions.insert(transaction.tx_hash()) {
 80                      continue;
 81                  }
 82              }
 83  
 84              let n_bytes_item = item.consensus_encode_to_vec().len();
 85  
 86              if n_bytes + n_bytes_item <= ALEPH_BFT_UNIT_BYTE_LIMIT {
 87                  n_bytes += n_bytes_item;
 88                  items.push(item);
 89              } else {
 90                  self.leftover_item = Some(item);
 91                  break;
 92              }
 93          }
 94  
 95          let bytes = items.consensus_encode_to_vec();
 96  
 97          assert!(bytes.len() <= ALEPH_BFT_UNIT_BYTE_LIMIT);
 98  
 99          return Some(UnitData::Batch(bytes));
100      }
101  }