worker.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{ 17 MAX_FETCH_TIMEOUT_IN_MS, 18 MAX_WORKERS, 19 ProposedBatch, 20 Transport, 21 events::{Event, TransmissionRequest, TransmissionResponse}, 22 helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests}, 23 spawn_blocking, 24 }; 25 use alphaos_node_bft_ledger_service::LedgerService; 26 use alphavm::{ 27 console::prelude::*, 28 ledger::{ 29 Transaction, 30 narwhal::{BatchHeader, Data, Transmission, TransmissionID}, 31 puzzle::{Solution, SolutionID}, 32 }, 33 }; 34 35 use anyhow::Context; 36 use colored::Colorize; 37 use indexmap::{IndexMap, IndexSet}; 38 #[cfg(feature = "locktick")] 39 use locktick::parking_lot::{Mutex, RwLock}; 40 #[cfg(not(feature = "locktick"))] 41 use parking_lot::{Mutex, RwLock}; 42 use rand::seq::IteratorRandom; 43 use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; 44 use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; 45 46 /// A worker's main role is maintaining a queue of verified ("ready") transmissions, 47 /// which will eventually be fetched by the primary when the primary generates a new batch. 48 #[derive(Clone)] 49 pub struct Worker<N: Network> { 50 /// The worker ID. 51 id: u8, 52 /// The gateway. 53 gateway: Arc<dyn Transport<N>>, 54 /// The storage. 55 storage: Storage<N>, 56 /// The ledger service. 57 ledger: Arc<dyn LedgerService<N>>, 58 /// The proposed batch. 59 proposed_batch: Arc<ProposedBatch<N>>, 60 /// The ready queue. 61 ready: Arc<RwLock<Ready<N>>>, 62 /// The pending transmissions queue. 63 pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>, 64 /// The spawned handles. 65 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 66 } 67 68 impl<N: Network> Worker<N> { 69 /// Initializes a new worker instance. 70 pub fn new( 71 id: u8, 72 gateway: Arc<dyn Transport<N>>, 73 storage: Storage<N>, 74 ledger: Arc<dyn LedgerService<N>>, 75 proposed_batch: Arc<ProposedBatch<N>>, 76 ) -> Result<Self> { 77 // Ensure the worker ID is valid. 78 ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'"); 79 // Return the worker. 80 Ok(Self { 81 id, 82 gateway, 83 storage, 84 ledger, 85 proposed_batch, 86 ready: Default::default(), 87 pending: Default::default(), 88 handles: Default::default(), 89 }) 90 } 91 92 /// Run the worker instance. 93 pub fn run(&self, receiver: WorkerReceiver<N>) { 94 info!("Starting worker instance {} of the memory pool...", self.id); 95 // Start the worker handlers. 96 self.start_handlers(receiver); 97 } 98 99 /// Returns the worker ID. 100 pub const fn id(&self) -> u8 { 101 self.id 102 } 103 104 /// Returns a reference to the pending transmissions queue. 105 pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> { 106 &self.pending 107 } 108 } 109 110 impl<N: Network> Worker<N> { 111 /// The maximum number of transmissions allowed in a worker. 112 pub const MAX_TRANSMISSIONS_PER_WORKER: usize = 113 BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize; 114 /// The maximum number of transmissions allowed in a worker ping. 115 pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10; 116 117 /// Returns the number of transmissions in the ready queue. 118 pub fn num_transmissions(&self) -> usize { 119 self.ready.read().num_transmissions() 120 } 121 122 /// Returns the number of ratifications in the ready queue. 123 pub fn num_ratifications(&self) -> usize { 124 self.ready.read().num_ratifications() 125 } 126 127 /// Returns the number of solutions in the ready queue. 128 pub fn num_solutions(&self) -> usize { 129 self.ready.read().num_solutions() 130 } 131 132 /// Returns the number of transactions in the ready queue. 133 pub fn num_transactions(&self) -> usize { 134 self.ready.read().num_transactions() 135 } 136 } 137 138 impl<N: Network> Worker<N> { 139 /// Returns the transmission IDs in the ready queue. 140 pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> { 141 self.ready.read().transmission_ids() 142 } 143 144 /// Returns the transmissions in the ready queue. 145 pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> { 146 self.ready.read().transmissions() 147 } 148 149 /// Returns the solutions in the ready queue. 150 pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> { 151 self.ready.read().solutions().into_iter() 152 } 153 154 /// Returns the transactions in the ready queue. 155 pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> { 156 self.ready.read().transactions().into_iter() 157 } 158 } 159 160 impl<N: Network> Worker<N> { 161 /// Clears the solutions from the ready queue. 162 pub(super) fn clear_solutions(&self) { 163 self.ready.write().clear_solutions() 164 } 165 } 166 167 impl<N: Network> Worker<N> { 168 /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger. 169 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool { 170 let transmission_id = transmission_id.into(); 171 // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger. 172 self.ready.read().contains(transmission_id) 173 || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id)) 174 || self.storage.contains_transmission(transmission_id) 175 || self.ledger.contains_transmission(&transmission_id).unwrap_or(false) 176 } 177 178 /// Returns the transmission if it exists in the ready queue, proposed batch, storage. 179 /// 180 /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions 181 /// in the ledger are not guaranteed to be invalid for the current batch. 182 pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> { 183 // Check if the transmission ID exists in the ready queue. 184 if let Some(transmission) = self.ready.read().get(transmission_id) { 185 return Some(transmission); 186 } 187 // Check if the transmission ID exists in storage. 188 if let Some(transmission) = self.storage.get_transmission(transmission_id) { 189 return Some(transmission); 190 } 191 // Check if the transmission ID exists in the proposed batch. 192 if let Some(transmission) = 193 self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id)) 194 { 195 return Some(transmission.clone()); 196 } 197 None 198 } 199 200 /// Returns the transmissions if it exists in the worker, or requests it from the specified peer. 201 pub async fn get_or_fetch_transmission( 202 &self, 203 peer_ip: SocketAddr, 204 transmission_id: TransmissionID<N>, 205 ) -> Result<(TransmissionID<N>, Transmission<N>)> { 206 // Attempt to get the transmission from the worker. 207 if let Some(transmission) = self.get_transmission(transmission_id) { 208 return Ok((transmission_id, transmission)); 209 } 210 // Send a transmission request to the peer. 211 let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?; 212 // Ensure the transmission ID matches. 213 ensure!(candidate_id == transmission_id, "Invalid transmission ID"); 214 // Return the transmission. 215 Ok((transmission_id, transmission)) 216 } 217 218 /// Inserts the transmission at the front of the ready queue. 219 pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) { 220 self.ready.write().insert_front(key, value); 221 } 222 223 /// Removes and returns the transmission at the front of the ready queue. 224 pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> { 225 self.ready.write().remove_front() 226 } 227 228 /// Reinserts the specified transmission into the ready queue. 229 pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool { 230 // Check if the transmission ID exists. 231 if !self.contains_transmission(transmission_id) { 232 // Insert the transmission into the ready queue. 233 return self.ready.write().insert(transmission_id, transmission); 234 } 235 false 236 } 237 238 /// Broadcasts a worker ping event. 239 pub(crate) fn broadcast_ping(&self) { 240 // Retrieve the transmission IDs. 241 let transmission_ids = self 242 .ready 243 .read() 244 .transmission_ids() 245 .into_iter() 246 .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING) 247 .into_iter() 248 .collect::<IndexSet<_>>(); 249 250 // Broadcast the ping event. 251 if !transmission_ids.is_empty() { 252 self.gateway.broadcast(Event::WorkerPing(transmission_ids.into())); 253 } 254 } 255 } 256 257 impl<N: Network> Worker<N> { 258 /// Handles the incoming transmission ID from a worker ping event. 259 fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) { 260 // Check if the transmission ID exists. 261 if self.contains_transmission(transmission_id) { 262 return; 263 } 264 // If the ready queue is full, then skip this transmission. 265 // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions. 266 if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER { 267 return; 268 } 269 // Attempt to fetch the transmission from the peer. 270 let self_ = self.clone(); 271 tokio::spawn(async move { 272 // Send a transmission request to the peer. 273 match self_.send_transmission_request(peer_ip, transmission_id).await { 274 // If the transmission was fetched, then process it. 275 Ok((candidate_id, transmission)) => { 276 // Ensure the transmission ID matches. 277 if candidate_id == transmission_id { 278 // Insert the transmission into the ready queue. 279 // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched, 280 // it could have already been inserted into the ready queue. 281 self_.process_transmission_from_peer(peer_ip, transmission_id, transmission); 282 } 283 } 284 // If the transmission was not fetched, then attempt to fetch it again. 285 Err(e) => { 286 warn!( 287 "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}", 288 self_.id, 289 fmt_id(transmission_id), 290 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() 291 ); 292 } 293 } 294 }); 295 } 296 297 /// Handles the incoming transmission from a peer. 298 pub(crate) fn process_transmission_from_peer( 299 &self, 300 peer_ip: SocketAddr, 301 transmission_id: TransmissionID<N>, 302 transmission: Transmission<N>, 303 ) { 304 // If the transmission ID already exists, then do not store it. 305 if self.contains_transmission(transmission_id) { 306 return; 307 } 308 // Ensure the transmission ID and transmission type matches. 309 let is_well_formed = match (&transmission_id, &transmission) { 310 (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true, 311 (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true, 312 // Note: We explicitly forbid inserting ratifications into the ready queue, 313 // as the protocol currently does not support ratifications. 314 (TransmissionID::Ratification, Transmission::Ratification) => false, 315 // All other combinations are clearly invalid. 316 _ => false, 317 }; 318 // If the transmission is a deserialized execution, verify it immediately. 319 // This takes heavy transaction verification out of the hot path during block generation. 320 if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) = 321 (transmission_id, &transmission) 322 { 323 if tx.is_execute() { 324 let self_ = self.clone(); 325 let tx_ = tx.clone(); 326 tokio::spawn(async move { 327 let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await; 328 }); 329 } 330 } 331 // If the transmission ID and transmission type matches, then insert the transmission into the ready queue. 332 if is_well_formed && self.ready.write().insert(transmission_id, transmission) { 333 trace!( 334 "Worker {} - Added transmission '{}.{}' from '{peer_ip}'", 335 self.id, 336 fmt_id(transmission_id), 337 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() 338 ); 339 } 340 } 341 342 /// Handles the incoming unconfirmed solution. 343 /// Note: This method assumes the incoming solution is valid and does not exist in the ledger. 344 /// 345 /// # Returns 346 /// - `Ok(true)` if the solution was added to the ready queue. 347 /// - `Ok(false)` if the solution was valid but already exists in the ready queue. 348 /// - `Err(anyhow::Error)` if the solution is invalid. 349 pub(crate) async fn process_unconfirmed_solution( 350 &self, 351 solution_id: SolutionID<N>, 352 solution: Data<Solution<N>>, 353 ) -> Result<bool> { 354 // Construct the transmission. 355 let transmission = Transmission::Solution(solution.clone()); 356 // Compute the checksum. 357 let checksum = solution.to_checksum::<N>()?; 358 // Construct the transmission ID. 359 let transmission_id = TransmissionID::Solution(solution_id, checksum); 360 // Remove the solution ID from the pending queue. 361 self.pending.remove(transmission_id, Some(transmission.clone())); 362 // Check if the solution exists. 363 if self.contains_transmission(transmission_id) { 364 return Ok(false); 365 } 366 // Check that the solution is well-formed and unique. 367 self.ledger.check_solution_basic(solution_id, solution).await?; 368 // Adds the solution to the ready queue. 369 if self.ready.write().insert(transmission_id, transmission) { 370 trace!( 371 "Worker {} - Added unconfirmed solution '{}.{}'", 372 self.id, 373 fmt_id(solution_id), 374 fmt_id(checksum).dimmed() 375 ); 376 } 377 Ok(true) 378 } 379 380 /// Handles the incoming unconfirmed transaction. 381 /// 382 /// # Returns 383 /// - `Ok(true)` if the transaction was added to the ready queue. 384 /// - `Ok(false)` if the transaction was valid but already exists in the ready queue. 385 /// - `Err(anyhow::Error)` if the transaction was invalid. 386 pub(crate) async fn process_unconfirmed_transaction( 387 &self, 388 transaction_id: N::TransactionID, 389 transaction: Data<Transaction<N>>, 390 ) -> Result<bool> { 391 // Construct the transmission. 392 let transmission = Transmission::Transaction(transaction.clone()); 393 // Compute the checksum. 394 let checksum = transaction.to_checksum::<N>()?; 395 // Construct the transmission ID. 396 let transmission_id = TransmissionID::Transaction(transaction_id, checksum); 397 // Remove the transaction from the pending queue. 398 self.pending.remove(transmission_id, Some(transmission.clone())); 399 // Check if the transaction ID exists. 400 if self.contains_transmission(transmission_id) { 401 return Ok(false); 402 } 403 // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error. 404 let transaction = spawn_blocking!({ 405 match transaction { 406 Data::Object(transaction) => Ok(transaction), 407 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?), 408 } 409 })?; 410 411 // Check that the transaction is well-formed and unique. 412 self.ledger.check_transaction_basic(transaction_id, transaction).await?; 413 // Adds the transaction to the ready queue. 414 if self.ready.write().insert(transmission_id, transmission) { 415 trace!( 416 "Worker {}.{} - Added unconfirmed transaction '{}'", 417 self.id, 418 fmt_id(transaction_id), 419 fmt_id(checksum).dimmed() 420 ); 421 } 422 Ok(true) 423 } 424 } 425 426 impl<N: Network> Worker<N> { 427 /// Starts the worker handlers. 428 fn start_handlers(&self, receiver: WorkerReceiver<N>) { 429 let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver; 430 431 // Start the pending queue expiration loop. 432 let self_ = self.clone(); 433 self.spawn(async move { 434 loop { 435 // Sleep briefly. 436 tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; 437 438 // Remove the expired pending certificate requests. 439 let self__ = self_.clone(); 440 let _ = spawn_blocking!({ 441 self__.pending.clear_expired_callbacks(); 442 Ok(()) 443 }); 444 } 445 }); 446 447 // Process the ping events. 448 let self_ = self.clone(); 449 self.spawn(async move { 450 while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await { 451 self_.process_transmission_id_from_ping(peer_ip, transmission_id); 452 } 453 }); 454 455 // Process the transmission requests. 456 let self_ = self.clone(); 457 self.spawn(async move { 458 while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await { 459 self_.send_transmission_response(peer_ip, transmission_request); 460 } 461 }); 462 463 // Process the transmission responses. 464 let self_ = self.clone(); 465 self.spawn(async move { 466 while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await { 467 // Process the transmission response. 468 let self__ = self_.clone(); 469 let _ = spawn_blocking!({ 470 self__.finish_transmission_request(peer_ip, transmission_response); 471 Ok(()) 472 }); 473 } 474 }); 475 } 476 477 /// Sends a transmission request to the specified peer. 478 async fn send_transmission_request( 479 &self, 480 peer_ip: SocketAddr, 481 transmission_id: TransmissionID<N>, 482 ) -> Result<(TransmissionID<N>, Transmission<N>)> { 483 // Initialize a oneshot channel. 484 let (callback_sender, callback_receiver) = oneshot::channel(); 485 // Determine how many sent requests are pending. 486 let num_sent_requests = self.pending.num_sent_requests(transmission_id); 487 // Determine if we've already sent a request to the peer. 488 let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip); 489 // Determine the maximum number of redundant requests. 490 let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?; 491 // Determine if we should send a transmission request to the peer. 492 // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time. 493 let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request; 494 495 // Insert the transmission ID into the pending queue. 496 self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request))); 497 498 // Helper to print the transmission ID and checksum. 499 let print_transmission_id = |transmission_id: TransmissionID<N>| { 500 format!("{}.{}", fmt_id(transmission_id), fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()) 501 }; 502 503 // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer. 504 if should_send_request { 505 // Send the transmission request to the peer. 506 if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() { 507 bail!("Unable to fetch transmission - failed to send request") 508 } 509 } else { 510 debug!( 511 "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)", 512 print_transmission_id(transmission_id) 513 ); 514 } 515 // Wait for the transmission to be fetched. 516 517 let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) 518 .await 519 .with_context(|| { 520 format!("Unable to fetch transmission {} (timeout)", print_transmission_id(transmission_id),) 521 })? 522 .with_context(|| format!("Unable to fetch transmission {}", print_transmission_id(transmission_id),))?; 523 524 Ok((transmission_id, transmission)) 525 } 526 527 /// Handles the incoming transmission response. 528 /// This method ensures the transmission response is well-formed and matches the transmission ID. 529 fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) { 530 let TransmissionResponse { transmission_id, mut transmission } = response; 531 // Check if the peer IP exists in the pending queue for the given transmission ID. 532 let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip); 533 // If the peer IP exists, finish the pending request. 534 if exists { 535 // Ensure the transmission is not a fee and matches the transmission ID. 536 match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) { 537 Ok(()) => { 538 // Remove the transmission ID from the pending queue. 539 self.pending.remove(transmission_id, Some(transmission)); 540 } 541 Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"), 542 }; 543 } 544 } 545 546 /// Sends the requested transmission to the specified peer. 547 fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) { 548 let TransmissionRequest { transmission_id } = request; 549 // Attempt to retrieve the transmission. 550 if let Some(transmission) = self.get_transmission(transmission_id) { 551 // Send the transmission response to the peer. 552 let self_ = self.clone(); 553 tokio::spawn(async move { 554 self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await; 555 }); 556 } 557 } 558 559 /// Spawns a task with the given future; it should only be used for long-running tasks. 560 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) { 561 self.handles.lock().push(tokio::spawn(future)); 562 } 563 564 /// Shuts down the worker. 565 pub(crate) fn shut_down(&self) { 566 trace!("Shutting down worker {}...", self.id); 567 // Abort the tasks. 568 self.handles.lock().iter().for_each(|handle| handle.abort()); 569 } 570 } 571 572 #[cfg(test)] 573 mod tests { 574 use super::*; 575 use crate::helpers::CALLBACK_EXPIRATION_IN_SECS; 576 use alphaos_node_bft_ledger_service::LedgerService; 577 use alphaos_node_bft_storage_service::BFTMemoryService; 578 use alphavm::{ 579 console::{network::Network, types::Field}, 580 ledger::{ 581 Block, 582 PendingBlock, 583 committee::Committee, 584 narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID}, 585 test_helpers::sample_execution_transaction_with_fee, 586 }, 587 prelude::Address, 588 }; 589 590 use bytes::Bytes; 591 use indexmap::IndexMap; 592 use mockall::mock; 593 use std::{io, ops::Range}; 594 595 type CurrentNetwork = alphavm::prelude::MainnetV0; 596 597 const ITERATIONS: usize = 100; 598 599 mock! { 600 Gateway<N: Network> {} 601 #[async_trait] 602 impl<N:Network> Transport<N> for Gateway<N> { 603 fn broadcast(&self, event: Event<N>); 604 async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>; 605 } 606 } 607 608 mock! { 609 #[derive(Debug)] 610 Ledger<N: Network> {} 611 #[async_trait] 612 impl<N: Network> LedgerService<N> for Ledger<N> { 613 fn latest_round(&self) -> u64; 614 fn latest_block_height(&self) -> u32; 615 fn latest_block(&self) -> Block<N>; 616 fn latest_restrictions_id(&self) -> Field<N>; 617 fn latest_leader(&self) -> Option<(u64, Address<N>)>; 618 fn update_latest_leader(&self, round: u64, leader: Address<N>); 619 fn contains_block_height(&self, height: u32) -> bool; 620 fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>; 621 fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>; 622 fn get_block_round(&self, height: u32) -> Result<u64>; 623 fn get_block(&self, height: u32) -> Result<Block<N>>; 624 fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>; 625 fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>; 626 fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>; 627 fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>; 628 fn current_committee(&self) -> Result<Committee<N>>; 629 fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>; 630 fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>; 631 fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>; 632 fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>; 633 fn ensure_transmission_is_well_formed( 634 &self, 635 transmission_id: TransmissionID<N>, 636 transmission: &mut Transmission<N>, 637 ) -> Result<()>; 638 async fn check_solution_basic( 639 &self, 640 solution_id: SolutionID<N>, 641 solution: Data<Solution<N>>, 642 ) -> Result<()>; 643 async fn check_transaction_basic( 644 &self, 645 transaction_id: N::TransactionID, 646 transaction: Transaction<N>, 647 ) -> Result<()>; 648 fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>>; 649 fn check_block_content(&self, _block: PendingBlock<N>) -> Result<Block<N>>; 650 fn check_next_block(&self, block: &Block<N>) -> Result<()>; 651 fn prepare_advance_to_next_quorum_block( 652 &self, 653 subdag: Subdag<N>, 654 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>, 655 ) -> Result<Block<N>>; 656 fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>; 657 fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>; 658 } 659 } 660 661 #[tokio::test] 662 async fn test_max_redundant_requests() { 663 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1; 664 665 let rng = &mut TestRng::default(); 666 // Sample a committee. 667 let committee = 668 alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng); 669 let committee_clone = committee.clone(); 670 // Setup the mock ledger. 671 let mut mock_ledger = MockLedger::default(); 672 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 673 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 674 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 675 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(())); 676 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 677 678 // Ensure the maximum number of redundant requests is correct and consistent across iterations. 679 assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes"); 680 } 681 682 #[tokio::test] 683 async fn test_process_transmission() { 684 let rng = &mut TestRng::default(); 685 // Sample a committee. 686 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 687 let committee_clone = committee.clone(); 688 // Setup the mock gateway and ledger. 689 let gateway = MockGateway::default(); 690 let mut mock_ledger = MockLedger::default(); 691 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 692 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 693 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 694 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(())); 695 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 696 // Initialize the storage. 697 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 698 699 // Create the Worker. 700 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 701 let data = 702 |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 703 let transmission_id = TransmissionID::Solution( 704 rng.r#gen::<u64>().into(), 705 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(), 706 ); 707 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 708 let transmission = Transmission::Solution(data(rng)); 709 710 // Process the transmission. 711 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone()); 712 assert!(worker.contains_transmission(transmission_id)); 713 assert!(worker.ready.read().contains(transmission_id)); 714 assert_eq!(worker.get_transmission(transmission_id), Some(transmission)); 715 // Take the transmission from the ready set. 716 assert!(worker.ready.write().remove_front().is_some()); 717 assert!(!worker.ready.read().contains(transmission_id)); 718 } 719 720 #[tokio::test] 721 async fn test_send_transmission() { 722 let rng = &mut TestRng::default(); 723 // Sample a committee. 724 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 725 let committee_clone = committee.clone(); 726 // Setup the mock gateway and ledger. 727 let mut gateway = MockGateway::default(); 728 gateway.expect_send().returning(|_, _| { 729 let (_tx, rx) = oneshot::channel(); 730 Some(rx) 731 }); 732 let mut mock_ledger = MockLedger::default(); 733 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 734 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 735 mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(())); 736 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 737 // Initialize the storage. 738 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 739 740 // Create the Worker. 741 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 742 let transmission_id = TransmissionID::Solution( 743 rng.r#gen::<u64>().into(), 744 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(), 745 ); 746 let worker_ = worker.clone(); 747 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 748 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 749 assert!(worker.pending.contains(transmission_id)); 750 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 751 // Fake the transmission response. 752 worker.finish_transmission_request(peer_ip, TransmissionResponse { 753 transmission_id, 754 transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))), 755 }); 756 // Check the transmission was removed from the pending set. 757 assert!(!worker.pending.contains(transmission_id)); 758 } 759 760 #[tokio::test] 761 async fn test_process_solution_ok() { 762 let rng = &mut TestRng::default(); 763 // Sample a committee. 764 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 765 let committee_clone = committee.clone(); 766 // Setup the mock gateway and ledger. 767 let mut gateway = MockGateway::default(); 768 gateway.expect_send().returning(|_, _| { 769 let (_tx, rx) = oneshot::channel(); 770 Some(rx) 771 }); 772 let mut mock_ledger = MockLedger::default(); 773 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 774 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 775 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 776 mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(())); 777 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 778 // Initialize the storage. 779 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 780 781 // Create the Worker. 782 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 783 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 784 let solution_id = rng.r#gen::<u64>().into(); 785 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 786 let transmission_id = TransmissionID::Solution(solution_id, solution_checksum); 787 let worker_ = worker.clone(); 788 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 789 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 790 assert!(worker.pending.contains(transmission_id)); 791 let result = worker.process_unconfirmed_solution(solution_id, solution).await; 792 assert!(result.is_ok()); 793 assert!(!worker.pending.contains(transmission_id)); 794 assert!(worker.ready.read().contains(transmission_id)); 795 } 796 797 #[tokio::test] 798 async fn test_process_solution_nok() { 799 let rng = &mut TestRng::default(); 800 // Sample a committee. 801 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 802 let committee_clone = committee.clone(); 803 // Setup the mock gateway and ledger. 804 let mut gateway = MockGateway::default(); 805 gateway.expect_send().returning(|_, _| { 806 let (_tx, rx) = oneshot::channel(); 807 Some(rx) 808 }); 809 let mut mock_ledger = MockLedger::default(); 810 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 811 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 812 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 813 mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!(""))); 814 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 815 // Initialize the storage. 816 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 817 818 // Create the Worker. 819 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 820 let solution_id = rng.r#gen::<u64>().into(); 821 let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 822 let checksum = solution.to_checksum::<CurrentNetwork>().unwrap(); 823 let transmission_id = TransmissionID::Solution(solution_id, checksum); 824 let worker_ = worker.clone(); 825 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 826 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 827 assert!(worker.pending.contains(transmission_id)); 828 let result = worker.process_unconfirmed_solution(solution_id, solution).await; 829 assert!(result.is_err()); 830 assert!(!worker.pending.contains(transmission_id)); 831 assert!(!worker.ready.read().contains(transmission_id)); 832 } 833 834 #[tokio::test] 835 async fn test_process_transaction_ok() { 836 let rng = &mut TestRng::default(); 837 // Sample a committee. 838 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 839 let committee_clone = committee.clone(); 840 // Setup the mock gateway and ledger. 841 let mut gateway = MockGateway::default(); 842 gateway.expect_send().returning(|_, _| { 843 let (_tx, rx) = oneshot::channel(); 844 Some(rx) 845 }); 846 let mut mock_ledger = MockLedger::default(); 847 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 848 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 849 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 850 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(())); 851 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 852 // Initialize the storage. 853 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 854 855 // Create the Worker. 856 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 857 let transaction = sample_execution_transaction_with_fee(false, rng, 0); 858 let transaction_id = transaction.id(); 859 let transaction_data = Data::Object(transaction); 860 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap(); 861 let transmission_id = TransmissionID::Transaction(transaction_id, checksum); 862 let worker_ = worker.clone(); 863 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 864 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 865 assert!(worker.pending.contains(transmission_id)); 866 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await; 867 assert!(result.is_ok()); 868 assert!(!worker.pending.contains(transmission_id)); 869 assert!(worker.ready.read().contains(transmission_id)); 870 } 871 872 #[tokio::test] 873 async fn test_process_transaction_nok() { 874 let mut rng = &mut TestRng::default(); 875 // Sample a committee. 876 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 877 let committee_clone = committee.clone(); 878 // Setup the mock gateway and ledger. 879 let mut gateway = MockGateway::default(); 880 gateway.expect_send().returning(|_, _| { 881 let (_tx, rx) = oneshot::channel(); 882 Some(rx) 883 }); 884 let mut mock_ledger = MockLedger::default(); 885 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 886 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 887 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 888 mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!(""))); 889 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 890 // Initialize the storage. 891 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 892 893 // Create the Worker. 894 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 895 let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into(); 896 let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>())); 897 let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap(); 898 let transmission_id = TransmissionID::Transaction(transaction_id, checksum); 899 let worker_ = worker.clone(); 900 let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); 901 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 902 assert!(worker.pending.contains(transmission_id)); 903 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await; 904 assert!(result.is_err()); 905 assert!(!worker.pending.contains(transmission_id)); 906 assert!(!worker.ready.read().contains(transmission_id)); 907 } 908 909 #[tokio::test] 910 async fn test_flood_transmission_requests() { 911 let rng = &mut TestRng::default(); 912 // Sample a committee. 913 let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng); 914 let committee_clone = committee.clone(); 915 // Setup the mock gateway and ledger. 916 let mut gateway = MockGateway::default(); 917 gateway.expect_send().returning(|_, _| { 918 let (_tx, rx) = oneshot::channel(); 919 Some(rx) 920 }); 921 let mut mock_ledger = MockLedger::default(); 922 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 923 mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone())); 924 mock_ledger.expect_contains_transmission().returning(|_| Ok(false)); 925 mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(())); 926 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 927 // Initialize the storage. 928 let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1); 929 930 // Create the Worker. 931 let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 932 let transaction = sample_execution_transaction_with_fee(false, rng, 0); 933 let transaction_id = transaction.id(); 934 let transaction_data = Data::Object(transaction); 935 let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap(); 936 let transmission_id = TransmissionID::Transaction(transaction_id, checksum); 937 938 // Determine the number of redundant requests are sent. 939 let num_redundant_requests = 940 max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap(); 941 let num_flood_requests = num_redundant_requests * 10; 942 let mut peer_ips = 943 (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec(); 944 let first_peer_ip = peer_ips[0]; 945 946 // Flood the pending queue with transmission requests. 947 for i in 1..=num_flood_requests { 948 let worker_ = worker.clone(); 949 let peer_ip = peer_ips.pop().unwrap(); 950 tokio::spawn(async move { 951 let _ = worker_.send_transmission_request(peer_ip, transmission_id).await; 952 }); 953 tokio::time::sleep(Duration::from_millis(10)).await; 954 // Check that the number of sent requests does not exceed the maximum number of redundant requests. 955 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests); 956 assert_eq!(worker.pending.num_callbacks(transmission_id), i); 957 } 958 // Check that the number of sent requests does not exceed the maximum number of redundant requests. 959 assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests); 960 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests); 961 962 // Let all the requests expire. 963 tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await; 964 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0); 965 assert_eq!(worker.pending.num_callbacks(transmission_id), 0); 966 967 // Flood the pending queue with transmission requests again, this time to a single peer 968 for i in 1..=num_flood_requests { 969 let worker_ = worker.clone(); 970 tokio::spawn(async move { 971 let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await; 972 }); 973 tokio::time::sleep(Duration::from_millis(10)).await; 974 assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests); 975 assert_eq!(worker.pending.num_callbacks(transmission_id), i); 976 } 977 // Check that the number of sent requests does not exceed the maximum number of redundant requests. 978 assert_eq!(worker.pending.num_sent_requests(transmission_id), 1); 979 assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests); 980 981 // Check that fulfilling a transmission request clears the pending queue. 982 let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await; 983 assert!(result.is_ok()); 984 assert_eq!(worker.pending.num_sent_requests(transmission_id), 0); 985 assert_eq!(worker.pending.num_callbacks(transmission_id), 0); 986 assert!(!worker.pending.contains(transmission_id)); 987 assert!(worker.ready.read().contains(transmission_id)); 988 } 989 990 #[tokio::test] 991 async fn test_storage_gc_on_initialization() { 992 let rng = &mut TestRng::default(); 993 994 for _ in 0..ITERATIONS { 995 // Mock the ledger round. 996 let max_gc_rounds = rng.gen_range(50..=100); 997 let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000); 998 let expected_gc_round = latest_ledger_round - max_gc_rounds; 999 1000 // Sample a committee. 1001 let committee = 1002 alphavm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng); 1003 1004 // Setup the mock gateway and ledger. 1005 let mut mock_ledger = MockLedger::default(); 1006 mock_ledger.expect_current_committee().returning(move || Ok(committee.clone())); 1007 1008 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger); 1009 // Initialize the storage. 1010 let storage = 1011 Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds); 1012 1013 // Ensure that the storage GC round is correct. 1014 assert_eq!(storage.gc_round(), expected_gc_round); 1015 } 1016 } 1017 } 1018 1019 #[cfg(test)] 1020 mod prop_tests { 1021 use super::*; 1022 use crate::Gateway; 1023 use alphaos_node_bft_ledger_service::MockLedgerService; 1024 use alphavm::{ 1025 console::account::Address, 1026 ledger::committee::{Committee, MIN_VALIDATOR_STAKE}, 1027 }; 1028 1029 use test_strategy::proptest; 1030 1031 type CurrentNetwork = alphavm::prelude::MainnetV0; 1032 1033 // Initializes a new test committee. 1034 fn new_test_committee(n: u16) -> Committee<CurrentNetwork> { 1035 let mut members = IndexMap::with_capacity(n as usize); 1036 for i in 0..n { 1037 // Sample the address. 1038 let rng = &mut TestRng::fixed(i as u64); 1039 let address = Address::new(rng.r#gen()); 1040 info!("Validator {i}: {address}"); 1041 members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100))); 1042 } 1043 // Initialize the committee. 1044 Committee::<CurrentNetwork>::new(1u64, members).unwrap() 1045 } 1046 1047 #[proptest] 1048 fn worker_initialization( 1049 #[strategy(0..MAX_WORKERS)] id: u8, 1050 gateway: Gateway<CurrentNetwork>, 1051 storage: Storage<CurrentNetwork>, 1052 ) { 1053 let committee = new_test_committee(4); 1054 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee)); 1055 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap(); 1056 assert_eq!(worker.id(), id); 1057 } 1058 1059 #[proptest] 1060 fn invalid_worker_id( 1061 #[strategy(MAX_WORKERS..)] id: u8, 1062 gateway: Gateway<CurrentNetwork>, 1063 storage: Storage<CurrentNetwork>, 1064 ) { 1065 let committee = new_test_committee(4); 1066 let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee)); 1067 let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()); 1068 // TODO once Worker implements Debug, simplify this with `unwrap_err` 1069 if let Err(error) = worker { 1070 assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'")); 1071 } 1072 } 1073 }