engine.rs
1 use std::collections::BTreeMap; 2 use std::default::Default; 3 use std::sync::Arc; 4 use std::time::Duration; 5 6 use aleph_bft::Keychain as KeychainTrait; 7 use anyhow::{anyhow, bail}; 8 use async_channel::Receiver; 9 use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerConnectionStatus}; 10 use fedimint_api_client::query::FilterMap; 11 use fedimint_core::core::MODULE_INSTANCE_ID_GLOBAL; 12 use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped}; 13 use fedimint_core::encoding::Decodable; 14 use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT; 15 use fedimint_core::epoch::ConsensusItem; 16 use fedimint_core::fmt_utils::OptStacktrace; 17 use fedimint_core::module::audit::Audit; 18 use fedimint_core::module::registry::{ModuleDecoderRegistry, ServerModuleRegistry}; 19 use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding}; 20 use fedimint_core::runtime::spawn; 21 use fedimint_core::session_outcome::{ 22 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome, 23 }; 24 use fedimint_core::task::{sleep, TaskGroup, TaskHandle}; 25 use fedimint_core::timing::TimeReporter; 26 use fedimint_core::{timing, PeerId}; 27 use futures::StreamExt; 28 use rand::Rng; 29 use tokio::sync::{watch, RwLock}; 30 use tracing::{debug, info, instrument, warn, Level}; 31 32 use crate::atomic_broadcast::backup::{BackupReader, BackupWriter}; 33 use crate::atomic_broadcast::data_provider::{DataProvider, UnitData}; 34 use crate::atomic_broadcast::finalization_handler::FinalizationHandler; 35 use crate::atomic_broadcast::network::Network; 36 use crate::atomic_broadcast::spawner::Spawner; 37 use crate::atomic_broadcast::{to_node_index, Keychain, Message}; 38 use crate::config::ServerConfig; 39 use crate::consensus::db::{ 40 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix, 41 SignedSessionOutcomeKey, SignedSessionOutcomePrefix, 42 }; 43 use crate::consensus::debug_fmt::FmtDbgConsensusItem; 44 use crate::consensus::transaction::process_transaction_with_dbtx; 45 use crate::fedimint_core::encoding::Encodable; 46 use crate::metrics::{ 47 CONSENSUS_ITEMS_PROCESSED_TOTAL, CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS, 48 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, 49 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX, CONSENSUS_SESSION_COUNT, 50 }; 51 use crate::net::connect::{Connector, TlsTcpConnector}; 52 use crate::net::peers::{DelayCalculator, ReconnectPeerConnections}; 53 use crate::LOG_CONSENSUS; 54 55 /// Runs the main server consensus loop 56 pub struct ConsensusEngine { 57 pub modules: ServerModuleRegistry, 58 pub db: Database, 59 pub keychain: Keychain, 60 pub federation_api: DynGlobalApi, 61 pub cfg: ServerConfig, 62 pub submission_receiver: Receiver<ConsensusItem>, 63 pub shutdown_receiver: watch::Receiver<Option<u64>>, 64 pub last_ci_by_peer: Arc<RwLock<BTreeMap<PeerId, u64>>>, 65 /// Just a string version of `cfg.local.identity` for performance 66 pub self_id_str: String, 67 /// Just a string version of peer ids for performance 68 pub peer_id_str: Vec<String>, 69 pub connection_status_channels: Arc<RwLock<BTreeMap<PeerId, PeerConnectionStatus>>>, 70 pub task_group: TaskGroup, 71 } 72 73 impl ConsensusEngine { 74 #[instrument(name = "run", skip_all, fields(id=%self.cfg.local.identity))] 75 pub async fn run(self) -> anyhow::Result<()> { 76 if self.cfg.consensus.broadcast_public_keys.len() == 1 { 77 self.run_single_guardian(self.task_group.make_handle()) 78 .await 79 } else { 80 self.run_consensus(self.task_group.make_handle()).await 81 } 82 } 83 84 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> { 85 assert_eq!(self.cfg.consensus.broadcast_public_keys.len(), 1); 86 87 while !task_handle.is_shutting_down() { 88 let session_index = self.get_finished_session_count().await; 89 90 CONSENSUS_SESSION_COUNT.set(session_index as i64); 91 92 let mut item_index = self.pending_accepted_items().await.len() as u64; 93 94 let session_start_time = std::time::Instant::now(); 95 96 while let Ok(item) = self.submission_receiver.recv().await { 97 if self 98 .process_consensus_item( 99 session_index, 100 item_index, 101 item, 102 self.cfg.local.identity, 103 ) 104 .await 105 .is_ok() 106 { 107 item_index += 1; 108 } 109 110 // we rely on the module consensus items to notice the timeout 111 if session_start_time.elapsed() > Duration::from_secs(60) { 112 break; 113 } 114 } 115 116 let session_outcome = SessionOutcome { 117 items: self.pending_accepted_items().await, 118 }; 119 120 let header = session_outcome.header(session_index); 121 let signature = self.keychain.sign(&header); 122 let signatures = BTreeMap::from_iter([(self.cfg.local.identity, signature)]); 123 124 self.complete_session( 125 session_index, 126 SignedSessionOutcome { 127 session_outcome, 128 signatures, 129 }, 130 ) 131 .await; 132 133 info!(target: LOG_CONSENSUS, "Session {session_index} completed"); 134 135 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() { 136 break; 137 } 138 } 139 140 info!(target: LOG_CONSENSUS, "Consensus task shut down"); 141 142 Ok(()) 143 } 144 145 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> { 146 // We need four peers to run the atomic broadcast 147 assert!(self.cfg.consensus.broadcast_public_keys.len() >= 4); 148 149 self.confirm_server_config_consensus_hash().await?; 150 151 // Build P2P connections for the atomic broadcast 152 let connections = ReconnectPeerConnections::new( 153 self.cfg.network_config(), 154 DelayCalculator::PROD_DEFAULT, 155 TlsTcpConnector::new(self.cfg.tls_config(), self.cfg.local.identity).into_dyn(), 156 &self.task_group, 157 Arc::clone(&self.connection_status_channels), 158 ) 159 .await; 160 161 while !task_handle.is_shutting_down() { 162 let session_index = self.get_finished_session_count().await; 163 164 CONSENSUS_SESSION_COUNT.set(session_index as i64); 165 166 self.run_session(connections.clone(), session_index).await?; 167 168 info!(target: LOG_CONSENSUS, "Session {session_index} completed"); 169 170 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() { 171 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session..."); 172 173 sleep(Duration::from_secs(60)).await; 174 175 break; 176 } 177 } 178 179 info!(target: LOG_CONSENSUS, "Consensus task shut down"); 180 181 Ok(()) 182 } 183 184 async fn confirm_server_config_consensus_hash(&self) -> anyhow::Result<()> { 185 let our_hash = self.cfg.consensus.consensus_hash(); 186 187 info!(target: LOG_CONSENSUS, "Waiting for peers config {our_hash}"); 188 189 loop { 190 match self.federation_api.server_config_consensus_hash().await { 191 Ok(consensus_hash) => { 192 if consensus_hash != our_hash { 193 bail!("Our consensus config doesn't match peers!") 194 } 195 196 info!(target: LOG_CONSENSUS, "Confirmed peers config {our_hash}"); 197 198 return Ok(()); 199 } 200 Err(e) => { 201 warn!(target: LOG_CONSENSUS, "Could not check consensus config hash: {}", OptStacktrace(e)) 202 } 203 } 204 205 sleep(Duration::from_millis(100)).await; 206 } 207 } 208 209 pub async fn run_session( 210 &self, 211 connections: ReconnectPeerConnections<Message>, 212 session_index: u64, 213 ) -> anyhow::Result<()> { 214 // In order to bound a sessions RAM consumption we need to bound its number of 215 // units and therefore its number of rounds. Since we use a session to 216 // create a naive secp256k1 threshold signature for the header of session 217 // outcome we have to guarantee that an attacker cannot exhaust our 218 // memory by preventing the creation of a threshold signature, thereby 219 // keeping the session open indefinitely. Hence, after a certain round 220 // index, we increase the delay between rounds exponentially such that 221 // max_round would only be reached after a minimum of 100 years. In case 222 // of such an attack the broadcast stops ordering any items until the 223 // attack subsides as no items are ordered while the signatures are 224 // collected. The maximum RAM consumption of a peer is bound by: 225 // 226 // self.keychain.peer_count() * max_round * ALEPH_BFT_UNIT_BYTE_LIMIT 227 228 const BASE: f64 = 1.005; 229 230 let expected_rounds = 3 * self.cfg.consensus.broadcast_expected_rounds_per_session as usize; 231 let max_round = 3 * self.cfg.consensus.broadcast_max_rounds_per_session; 232 let round_delay = self.cfg.local.broadcast_round_delay_ms as f64; 233 234 let exp_slowdown_offset = 3 * expected_rounds; 235 236 let mut delay_config = aleph_bft::default_delay_config(); 237 238 delay_config.unit_creation_delay = Arc::new(move |round_index| { 239 let delay = if round_index == 0 { 240 0.0 241 } else { 242 round_delay 243 * BASE.powf(round_index.saturating_sub(exp_slowdown_offset) as f64) 244 * rand::thread_rng().gen_range(0.5..=1.5) 245 }; 246 247 Duration::from_millis(delay.round() as u64) 248 }); 249 250 let config = aleph_bft::create_config( 251 self.keychain.peer_count().into(), 252 self.keychain.peer_id().to_usize().into(), 253 session_index, 254 max_round, 255 delay_config, 256 Duration::from_secs(100 * 365 * 24 * 60 * 60), 257 ) 258 .expect("The exponential slowdown we defined exceeds 100 years"); 259 260 // we can use an unbounded channel here since the number and size of units 261 // ordered in a single aleph session is bounded as described above 262 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded(); 263 let (signature_sender, signature_receiver) = watch::channel(None); 264 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel(); 265 266 let aleph_handle = spawn( 267 "aleph run session", 268 aleph_bft::run_session( 269 config, 270 aleph_bft::LocalIO::new( 271 DataProvider::new(self.submission_receiver.clone(), signature_receiver), 272 FinalizationHandler::new(unit_data_sender), 273 BackupWriter::new(self.db.clone()), 274 BackupReader::new(self.db.clone()), 275 ), 276 Network::new(connections), 277 self.keychain.clone(), 278 Spawner::new(self.task_group.make_subgroup()), 279 aleph_bft_types::Terminator::create_root(terminator_receiver, "Terminator"), 280 ), 281 ); 282 283 // this is the minimum number of batches data that will be ordered before we 284 // reach the exponential_slowdown_offset since at least f + 1 batches are 285 // ordered every round 286 let batches_per_session = expected_rounds * self.keychain.peer_count(); 287 288 let signed_session_outcome = self 289 .complete_signed_session_outcome( 290 session_index, 291 batches_per_session, 292 unit_data_receiver, 293 signature_sender, 294 ) 295 .await?; 296 297 // We can terminate the session instead of waiting for other peers to complete 298 // it since they can always download the signed session outcome from us 299 terminator_sender.send(()).ok(); 300 aleph_handle.await.ok(); 301 302 // This method removes the backup of the current session from the database 303 // and therefore has to be called after we have waited for the session to 304 // shutdown or we risk write-write conflicts with the UnitSaver 305 self.complete_session(session_index, signed_session_outcome) 306 .await; 307 308 Ok(()) 309 } 310 311 pub async fn complete_signed_session_outcome( 312 &self, 313 session_index: u64, 314 batches_per_session_outcome: usize, 315 unit_data_receiver: Receiver<(UnitData, PeerId)>, 316 signature_sender: watch::Sender<Option<SchnorrSignature>>, 317 ) -> anyhow::Result<SignedSessionOutcome> { 318 let mut num_batches = 0; 319 let mut item_index = 0; 320 321 // We build a session outcome out of the ordered batches until either we have 322 // processed batches_per_session_outcome of batches or a threshold signed 323 // session outcome is obtained from our peers 324 while num_batches < batches_per_session_outcome { 325 tokio::select! { 326 unit_data = unit_data_receiver.recv() => { 327 if let (UnitData::Batch(bytes), peer) = unit_data? { 328 if let Ok(items) = Vec::<ConsensusItem>::consensus_decode(&mut bytes.as_slice(), &self.decoders()){ 329 for item in items { 330 if self.process_consensus_item( 331 session_index, 332 item_index, 333 item.clone(), 334 peer 335 ).await 336 .is_ok() { 337 item_index += 1; 338 } 339 } 340 } 341 num_batches += 1; 342 } 343 }, 344 signed_session_outcome = self.request_signed_session_outcome(&self.federation_api, session_index) => { 345 let pending_accepted_items = self.pending_accepted_items().await; 346 347 // this panics if we have more accepted items than the signed session outcome 348 let (processed, unprocessed) = signed_session_outcome 349 .session_outcome 350 .items 351 .split_at(pending_accepted_items.len()); 352 353 assert!(processed.iter().eq(pending_accepted_items.iter())); 354 355 for accepted_item in unprocessed { 356 let result = self.process_consensus_item( 357 session_index, 358 item_index, 359 accepted_item.item.clone(), 360 accepted_item.peer 361 ).await; 362 363 assert!(result.is_ok()); 364 365 item_index += 1; 366 } 367 368 return Ok(signed_session_outcome); 369 } 370 } 371 } 372 373 let items = self.pending_accepted_items().await; 374 375 assert_eq!(item_index, items.len() as u64); 376 377 let session_outcome = SessionOutcome { items }; 378 379 let header = session_outcome.header(session_index); 380 381 // We send our own signature to the data provider to be submitted to the atomic 382 // broadcast and collected by our peers 383 signature_sender.send(Some(self.keychain.sign(&header)))?; 384 385 let mut signatures = BTreeMap::new(); 386 387 // We collect the ordered signatures until we either obtain a threshold 388 // signature or a signed session outcome arrives from our peers 389 while signatures.len() < self.keychain.threshold() { 390 tokio::select! { 391 unit_data = unit_data_receiver.recv() => { 392 if let (UnitData::Signature(signature), peer) = unit_data? { 393 if self.keychain.verify(&header, &signature, to_node_index(peer)){ 394 signatures.insert(peer, signature); 395 } else { 396 warn!(target: LOG_CONSENSUS, "Received invalid signature from peer {peer}"); 397 } 398 } 399 } 400 signed_session_outcome = self.request_signed_session_outcome(&self.federation_api, session_index) => { 401 // We check that the session outcome we have created agrees with the federations consensus 402 assert!(header == signed_session_outcome.session_outcome.header(session_index)); 403 404 return Ok(signed_session_outcome); 405 } 406 } 407 } 408 409 Ok(SignedSessionOutcome { 410 session_outcome, 411 signatures, 412 }) 413 } 414 415 fn decoders(&self) -> ModuleDecoderRegistry { 416 self.modules.decoder_registry() 417 } 418 419 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> { 420 self.db 421 .begin_transaction_nc() 422 .await 423 .find_by_prefix(&AcceptedItemPrefix) 424 .await 425 .map(|entry| entry.1) 426 .collect() 427 .await 428 } 429 430 pub async fn complete_session( 431 &self, 432 session_index: u64, 433 signed_session_outcome: SignedSessionOutcome, 434 ) { 435 let mut dbtx = self.db.begin_transaction().await; 436 437 dbtx.remove_by_prefix(&AlephUnitsPrefix).await; 438 439 dbtx.remove_by_prefix(&AcceptedItemPrefix).await; 440 441 if dbtx 442 .insert_entry( 443 &SignedSessionOutcomeKey(session_index), 444 &signed_session_outcome, 445 ) 446 .await 447 .is_some() 448 { 449 panic!("We tried to overwrite a signed session outcome"); 450 } 451 452 dbtx.commit_tx_result() 453 .await 454 .expect("This is the only place where we write to this key"); 455 } 456 457 #[instrument(target = "fm::consensus", skip(self, item), level = "info")] 458 pub async fn process_consensus_item( 459 &self, 460 session_index: u64, 461 item_index: u64, 462 item: ConsensusItem, 463 peer: PeerId, 464 ) -> anyhow::Result<()> { 465 let peer_id_str = &self.peer_id_str[peer.to_usize()]; 466 let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE); 467 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS 468 .with_label_values(&[peer_id_str]) 469 .start_timer(); 470 471 debug!(%peer, item = ?FmtDbgConsensusItem(&item), "Processing consensus item"); 472 473 self.last_ci_by_peer 474 .write() 475 .await 476 .insert(peer, session_index); 477 478 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX 479 .with_label_values(&[&self.self_id_str, peer_id_str]) 480 .set(session_index as i64); 481 482 let mut dbtx = self.db.begin_transaction().await; 483 484 dbtx.ignore_uncommitted(); 485 486 // When we recover from a mid session crash aleph bft will replay the units that 487 // already were processed before the crash. We therefore skip all consensus 488 // items until we have seen all previously accepted items again. 489 if let Some(accepted_item) = dbtx 490 .get_value(&AcceptedItemKey(item_index.to_owned())) 491 .await 492 { 493 if accepted_item.item == item && accepted_item.peer == peer { 494 return Ok(()); 495 } 496 497 bail!("Item was discarded previously"); 498 } 499 500 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer) 501 .await?; 502 503 // After this point the we have to commit the database transaction since the 504 // item has been fully processed without errors 505 dbtx.warn_uncommitted(); 506 507 dbtx.insert_entry(&AcceptedItemKey(item_index), &AcceptedItem { item, peer }) 508 .await; 509 510 let mut audit = Audit::default(); 511 512 for (module_instance_id, kind, module) in self.modules.iter_modules() { 513 let _module_audit_timing = 514 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE); 515 516 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS 517 .with_label_values(&[&MODULE_INSTANCE_ID_GLOBAL.to_string(), kind.as_str()]) 518 .start_timer(); 519 520 module 521 .audit( 522 &mut dbtx 523 .to_ref_with_prefix_module_id(module_instance_id) 524 .into_nc(), 525 &mut audit, 526 module_instance_id, 527 ) 528 .await; 529 timing_prom.observe_duration(); 530 } 531 532 if audit.net_assets().milli_sat < 0 { 533 panic!("Balance sheet of the fed has gone negative, this should never happen! {audit}") 534 } 535 536 dbtx.commit_tx_result() 537 .await 538 .expect("Committing consensus epoch failed"); 539 540 CONSENSUS_ITEMS_PROCESSED_TOTAL 541 .with_label_values(&[peer_id_str]) 542 .inc(); 543 timing_prom.observe_duration(); 544 545 Ok(()) 546 } 547 548 async fn process_consensus_item_with_db_transaction( 549 &self, 550 dbtx: &mut DatabaseTransaction<'_>, 551 consensus_item: ConsensusItem, 552 peer_id: PeerId, 553 ) -> anyhow::Result<()> { 554 // We rely on decoding rejecting any unknown module instance ids to avoid 555 // peer-triggered panic here 556 self.decoders().assert_reject_mode(); 557 558 match consensus_item { 559 ConsensusItem::Module(module_item) => { 560 let instance_id = module_item.module_instance_id(); 561 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id); 562 563 self.modules 564 .get_expect(instance_id) 565 .process_consensus_item(module_dbtx, module_item, peer_id) 566 .await 567 } 568 ConsensusItem::Transaction(transaction) => { 569 let txid = transaction.tx_hash(); 570 if dbtx 571 .get_value(&AcceptedTransactionKey(txid)) 572 .await 573 .is_some() 574 { 575 debug!(target: LOG_CONSENSUS, %txid, "Transaction already accepted"); 576 bail!("Transaction is already accepted"); 577 } 578 579 let modules_ids = transaction 580 .outputs 581 .iter() 582 .map(|output| output.module_instance_id()) 583 .collect::<Vec<_>>(); 584 585 process_transaction_with_dbtx(self.modules.clone(), dbtx, transaction) 586 .await 587 .map_err(|error| anyhow!(error.to_string()))?; 588 589 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted"); 590 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids) 591 .await; 592 593 Ok(()) 594 } 595 ConsensusItem::Default { variant, .. } => { 596 warn!( 597 target: LOG_CONSENSUS, 598 "Minor consensus version mismatch: unexpected consensus item type: {variant}" 599 ); 600 bail!("Unexpected consensus item type: {variant}") 601 } 602 } 603 } 604 605 async fn request_signed_session_outcome( 606 &self, 607 federation_api: &DynGlobalApi, 608 index: u64, 609 ) -> SignedSessionOutcome { 610 let keychain = self.keychain.clone(); 611 let total_peers = self.keychain.peer_count(); 612 let decoders = self.decoders(); 613 614 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| match response 615 .try_into_inner(&decoders) 616 { 617 Ok(signed_session_outcome) => { 618 match signed_session_outcome.signatures.len() == keychain.threshold() 619 && signed_session_outcome 620 .signatures 621 .iter() 622 .all(|(peer_id, sig)| { 623 keychain.verify( 624 &signed_session_outcome.session_outcome.header(index), 625 sig, 626 to_node_index(*peer_id), 627 ) 628 }) { 629 true => Ok(signed_session_outcome), 630 false => Err(anyhow!("Invalid signatures")), 631 } 632 } 633 Err(error) => Err(anyhow!(error.to_string())), 634 }; 635 636 loop { 637 // We only want to initiate the request if we have not ordered a unit in a 638 // while. This indicates that we have fallen behind and our peers 639 // have already switched sessions without us 640 sleep(Duration::from_secs(5)).await; 641 642 let result = federation_api 643 .request_with_strategy( 644 FilterMap::new(filter_map.clone(), total_peers), 645 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(), 646 ApiRequestErased::new(index), 647 ) 648 .await; 649 650 match result { 651 Ok(signed_session_outcome) => return signed_session_outcome, 652 Err(error) => { 653 tracing::error!(target: LOG_CONSENSUS, "Error while requesting signed session outcome: {}", error) 654 } 655 } 656 } 657 } 658 659 /// Returns the number of sessions already saved in the database. This count 660 /// **does not** include the currently running session. 661 async fn get_finished_session_count(&self) -> u64 { 662 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await 663 } 664 } 665 666 pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 { 667 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix) 668 .await 669 .next() 670 .await 671 .map(|entry| (entry.0 .0) + 1) 672 .unwrap_or(0) 673 }