/ node / bft / src / worker.rs
worker.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // This file is part of the AlphaOS library.
   3  
   4  // Licensed under the Apache License, Version 2.0 (the "License");
   5  // you may not use this file except in compliance with the License.
   6  // You may obtain a copy of the License at:
   7  
   8  // http://www.apache.org/licenses/LICENSE-2.0
   9  
  10  // Unless required by applicable law or agreed to in writing, software
  11  // distributed under the License is distributed on an "AS IS" BASIS,
  12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13  // See the License for the specific language governing permissions and
  14  // limitations under the License.
  15  
  16  use crate::{
  17      MAX_FETCH_TIMEOUT_IN_MS,
  18      MAX_WORKERS,
  19      ProposedBatch,
  20      Transport,
  21      events::{Event, TransmissionRequest, TransmissionResponse},
  22      helpers::{Pending, Ready, Storage, WorkerReceiver, fmt_id, max_redundant_requests},
  23      spawn_blocking,
  24  };
  25  use alphaos_node_bft_ledger_service::LedgerService;
  26  use alphavm::{
  27      console::prelude::*,
  28      ledger::{
  29          Transaction,
  30          narwhal::{BatchHeader, Data, Transmission, TransmissionID},
  31          puzzle::{Solution, SolutionID},
  32      },
  33  };
  34  
  35  use anyhow::Context;
  36  use colored::Colorize;
  37  use indexmap::{IndexMap, IndexSet};
  38  #[cfg(feature = "locktick")]
  39  use locktick::parking_lot::{Mutex, RwLock};
  40  #[cfg(not(feature = "locktick"))]
  41  use parking_lot::{Mutex, RwLock};
  42  use rand::seq::IteratorRandom;
  43  use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
  44  use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
  45  
  46  /// A worker's main role is maintaining a queue of verified ("ready") transmissions,
  47  /// which will eventually be fetched by the primary when the primary generates a new batch.
  48  #[derive(Clone)]
  49  pub struct Worker<N: Network> {
  50      /// The worker ID.
  51      id: u8,
  52      /// The gateway.
  53      gateway: Arc<dyn Transport<N>>,
  54      /// The storage.
  55      storage: Storage<N>,
  56      /// The ledger service.
  57      ledger: Arc<dyn LedgerService<N>>,
  58      /// The proposed batch.
  59      proposed_batch: Arc<ProposedBatch<N>>,
  60      /// The ready queue.
  61      ready: Arc<RwLock<Ready<N>>>,
  62      /// The pending transmissions queue.
  63      pending: Arc<Pending<TransmissionID<N>, Transmission<N>>>,
  64      /// The spawned handles.
  65      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
  66  }
  67  
  68  impl<N: Network> Worker<N> {
  69      /// Initializes a new worker instance.
  70      pub fn new(
  71          id: u8,
  72          gateway: Arc<dyn Transport<N>>,
  73          storage: Storage<N>,
  74          ledger: Arc<dyn LedgerService<N>>,
  75          proposed_batch: Arc<ProposedBatch<N>>,
  76      ) -> Result<Self> {
  77          // Ensure the worker ID is valid.
  78          ensure!(id < MAX_WORKERS, "Invalid worker ID '{id}'");
  79          // Return the worker.
  80          Ok(Self {
  81              id,
  82              gateway,
  83              storage,
  84              ledger,
  85              proposed_batch,
  86              ready: Default::default(),
  87              pending: Default::default(),
  88              handles: Default::default(),
  89          })
  90      }
  91  
  92      /// Run the worker instance.
  93      pub fn run(&self, receiver: WorkerReceiver<N>) {
  94          info!("Starting worker instance {} of the memory pool...", self.id);
  95          // Start the worker handlers.
  96          self.start_handlers(receiver);
  97      }
  98  
  99      /// Returns the worker ID.
 100      pub const fn id(&self) -> u8 {
 101          self.id
 102      }
 103  
 104      /// Returns a reference to the pending transmissions queue.
 105      pub fn pending(&self) -> &Arc<Pending<TransmissionID<N>, Transmission<N>>> {
 106          &self.pending
 107      }
 108  }
 109  
 110  impl<N: Network> Worker<N> {
 111      /// The maximum number of transmissions allowed in a worker.
 112      pub const MAX_TRANSMISSIONS_PER_WORKER: usize =
 113          BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / MAX_WORKERS as usize;
 114      /// The maximum number of transmissions allowed in a worker ping.
 115      pub const MAX_TRANSMISSIONS_PER_WORKER_PING: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / 10;
 116  
 117      /// Returns the number of transmissions in the ready queue.
 118      pub fn num_transmissions(&self) -> usize {
 119          self.ready.read().num_transmissions()
 120      }
 121  
 122      /// Returns the number of ratifications in the ready queue.
 123      pub fn num_ratifications(&self) -> usize {
 124          self.ready.read().num_ratifications()
 125      }
 126  
 127      /// Returns the number of solutions in the ready queue.
 128      pub fn num_solutions(&self) -> usize {
 129          self.ready.read().num_solutions()
 130      }
 131  
 132      /// Returns the number of transactions in the ready queue.
 133      pub fn num_transactions(&self) -> usize {
 134          self.ready.read().num_transactions()
 135      }
 136  }
 137  
 138  impl<N: Network> Worker<N> {
 139      /// Returns the transmission IDs in the ready queue.
 140      pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
 141          self.ready.read().transmission_ids()
 142      }
 143  
 144      /// Returns the transmissions in the ready queue.
 145      pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
 146          self.ready.read().transmissions()
 147      }
 148  
 149      /// Returns the solutions in the ready queue.
 150      pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
 151          self.ready.read().solutions().into_iter()
 152      }
 153  
 154      /// Returns the transactions in the ready queue.
 155      pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
 156          self.ready.read().transactions().into_iter()
 157      }
 158  }
 159  
 160  impl<N: Network> Worker<N> {
 161      /// Clears the solutions from the ready queue.
 162      pub(super) fn clear_solutions(&self) {
 163          self.ready.write().clear_solutions()
 164      }
 165  }
 166  
 167  impl<N: Network> Worker<N> {
 168      /// Returns `true` if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
 169      pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
 170          let transmission_id = transmission_id.into();
 171          // Check if the transmission ID exists in the ready queue, proposed batch, storage, or ledger.
 172          self.ready.read().contains(transmission_id)
 173              || self.proposed_batch.read().as_ref().is_some_and(|p| p.contains_transmission(transmission_id))
 174              || self.storage.contains_transmission(transmission_id)
 175              || self.ledger.contains_transmission(&transmission_id).unwrap_or(false)
 176      }
 177  
 178      /// Returns the transmission if it exists in the ready queue, proposed batch, storage.
 179      ///
 180      /// Note: We explicitly forbid retrieving a transmission from the ledger, as transmissions
 181      /// in the ledger are not guaranteed to be invalid for the current batch.
 182      pub fn get_transmission(&self, transmission_id: TransmissionID<N>) -> Option<Transmission<N>> {
 183          // Check if the transmission ID exists in the ready queue.
 184          if let Some(transmission) = self.ready.read().get(transmission_id) {
 185              return Some(transmission);
 186          }
 187          // Check if the transmission ID exists in storage.
 188          if let Some(transmission) = self.storage.get_transmission(transmission_id) {
 189              return Some(transmission);
 190          }
 191          // Check if the transmission ID exists in the proposed batch.
 192          if let Some(transmission) =
 193              self.proposed_batch.read().as_ref().and_then(|p| p.get_transmission(transmission_id))
 194          {
 195              return Some(transmission.clone());
 196          }
 197          None
 198      }
 199  
 200      /// Returns the transmissions if it exists in the worker, or requests it from the specified peer.
 201      pub async fn get_or_fetch_transmission(
 202          &self,
 203          peer_ip: SocketAddr,
 204          transmission_id: TransmissionID<N>,
 205      ) -> Result<(TransmissionID<N>, Transmission<N>)> {
 206          // Attempt to get the transmission from the worker.
 207          if let Some(transmission) = self.get_transmission(transmission_id) {
 208              return Ok((transmission_id, transmission));
 209          }
 210          // Send a transmission request to the peer.
 211          let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?;
 212          // Ensure the transmission ID matches.
 213          ensure!(candidate_id == transmission_id, "Invalid transmission ID");
 214          // Return the transmission.
 215          Ok((transmission_id, transmission))
 216      }
 217  
 218      /// Inserts the transmission at the front of the ready queue.
 219      pub(crate) fn insert_front(&self, key: TransmissionID<N>, value: Transmission<N>) {
 220          self.ready.write().insert_front(key, value);
 221      }
 222  
 223      /// Removes and returns the transmission at the front of the ready queue.
 224      pub(crate) fn remove_front(&self) -> Option<(TransmissionID<N>, Transmission<N>)> {
 225          self.ready.write().remove_front()
 226      }
 227  
 228      /// Reinserts the specified transmission into the ready queue.
 229      pub(crate) fn reinsert(&self, transmission_id: TransmissionID<N>, transmission: Transmission<N>) -> bool {
 230          // Check if the transmission ID exists.
 231          if !self.contains_transmission(transmission_id) {
 232              // Insert the transmission into the ready queue.
 233              return self.ready.write().insert(transmission_id, transmission);
 234          }
 235          false
 236      }
 237  
 238      /// Broadcasts a worker ping event.
 239      pub(crate) fn broadcast_ping(&self) {
 240          // Retrieve the transmission IDs.
 241          let transmission_ids = self
 242              .ready
 243              .read()
 244              .transmission_ids()
 245              .into_iter()
 246              .choose_multiple(&mut rand::thread_rng(), Self::MAX_TRANSMISSIONS_PER_WORKER_PING)
 247              .into_iter()
 248              .collect::<IndexSet<_>>();
 249  
 250          // Broadcast the ping event.
 251          if !transmission_ids.is_empty() {
 252              self.gateway.broadcast(Event::WorkerPing(transmission_ids.into()));
 253          }
 254      }
 255  }
 256  
 257  impl<N: Network> Worker<N> {
 258      /// Handles the incoming transmission ID from a worker ping event.
 259      fn process_transmission_id_from_ping(&self, peer_ip: SocketAddr, transmission_id: TransmissionID<N>) {
 260          // Check if the transmission ID exists.
 261          if self.contains_transmission(transmission_id) {
 262              return;
 263          }
 264          // If the ready queue is full, then skip this transmission.
 265          // Note: We must prioritize the unconfirmed solutions and unconfirmed transactions, not transmissions.
 266          if self.ready.read().num_transmissions() > Self::MAX_TRANSMISSIONS_PER_WORKER {
 267              return;
 268          }
 269          // Attempt to fetch the transmission from the peer.
 270          let self_ = self.clone();
 271          tokio::spawn(async move {
 272              // Send a transmission request to the peer.
 273              match self_.send_transmission_request(peer_ip, transmission_id).await {
 274                  // If the transmission was fetched, then process it.
 275                  Ok((candidate_id, transmission)) => {
 276                      // Ensure the transmission ID matches.
 277                      if candidate_id == transmission_id {
 278                          // Insert the transmission into the ready queue.
 279                          // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched,
 280                          // it could have already been inserted into the ready queue.
 281                          self_.process_transmission_from_peer(peer_ip, transmission_id, transmission);
 282                      }
 283                  }
 284                  // If the transmission was not fetched, then attempt to fetch it again.
 285                  Err(e) => {
 286                      warn!(
 287                          "Worker {} - Failed to fetch transmission '{}.{}' from '{peer_ip}' (ping) - {e}",
 288                          self_.id,
 289                          fmt_id(transmission_id),
 290                          fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
 291                      );
 292                  }
 293              }
 294          });
 295      }
 296  
 297      /// Handles the incoming transmission from a peer.
 298      pub(crate) fn process_transmission_from_peer(
 299          &self,
 300          peer_ip: SocketAddr,
 301          transmission_id: TransmissionID<N>,
 302          transmission: Transmission<N>,
 303      ) {
 304          // If the transmission ID already exists, then do not store it.
 305          if self.contains_transmission(transmission_id) {
 306              return;
 307          }
 308          // Ensure the transmission ID and transmission type matches.
 309          let is_well_formed = match (&transmission_id, &transmission) {
 310              (TransmissionID::Solution(_, _), Transmission::Solution(_)) => true,
 311              (TransmissionID::Transaction(_, _), Transmission::Transaction(_)) => true,
 312              // Note: We explicitly forbid inserting ratifications into the ready queue,
 313              // as the protocol currently does not support ratifications.
 314              (TransmissionID::Ratification, Transmission::Ratification) => false,
 315              // All other combinations are clearly invalid.
 316              _ => false,
 317          };
 318          // If the transmission is a deserialized execution, verify it immediately.
 319          // This takes heavy transaction verification out of the hot path during block generation.
 320          if let (TransmissionID::Transaction(tx_id, _), Transmission::Transaction(Data::Object(tx))) =
 321              (transmission_id, &transmission)
 322          {
 323              if tx.is_execute() {
 324                  let self_ = self.clone();
 325                  let tx_ = tx.clone();
 326                  tokio::spawn(async move {
 327                      let _ = self_.ledger.check_transaction_basic(tx_id, tx_).await;
 328                  });
 329              }
 330          }
 331          // If the transmission ID and transmission type matches, then insert the transmission into the ready queue.
 332          if is_well_formed && self.ready.write().insert(transmission_id, transmission) {
 333              trace!(
 334                  "Worker {} - Added transmission '{}.{}' from '{peer_ip}'",
 335                  self.id,
 336                  fmt_id(transmission_id),
 337                  fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
 338              );
 339          }
 340      }
 341  
 342      /// Handles the incoming unconfirmed solution.
 343      /// Note: This method assumes the incoming solution is valid and does not exist in the ledger.
 344      ///
 345      /// # Returns
 346      /// - `Ok(true)` if the solution was added to the ready queue.
 347      /// - `Ok(false)` if the solution was valid but already exists in the ready queue.
 348      /// - `Err(anyhow::Error)` if the solution is invalid.
 349      pub(crate) async fn process_unconfirmed_solution(
 350          &self,
 351          solution_id: SolutionID<N>,
 352          solution: Data<Solution<N>>,
 353      ) -> Result<bool> {
 354          // Construct the transmission.
 355          let transmission = Transmission::Solution(solution.clone());
 356          // Compute the checksum.
 357          let checksum = solution.to_checksum::<N>()?;
 358          // Construct the transmission ID.
 359          let transmission_id = TransmissionID::Solution(solution_id, checksum);
 360          // Remove the solution ID from the pending queue.
 361          self.pending.remove(transmission_id, Some(transmission.clone()));
 362          // Check if the solution exists.
 363          if self.contains_transmission(transmission_id) {
 364              return Ok(false);
 365          }
 366          // Check that the solution is well-formed and unique.
 367          self.ledger.check_solution_basic(solution_id, solution).await?;
 368          // Adds the solution to the ready queue.
 369          if self.ready.write().insert(transmission_id, transmission) {
 370              trace!(
 371                  "Worker {} - Added unconfirmed solution '{}.{}'",
 372                  self.id,
 373                  fmt_id(solution_id),
 374                  fmt_id(checksum).dimmed()
 375              );
 376          }
 377          Ok(true)
 378      }
 379  
 380      /// Handles the incoming unconfirmed transaction.
 381      ///
 382      /// # Returns
 383      /// - `Ok(true)` if the transaction was added to the ready queue.
 384      /// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
 385      /// - `Err(anyhow::Error)` if the transaction was invalid.
 386      pub(crate) async fn process_unconfirmed_transaction(
 387          &self,
 388          transaction_id: N::TransactionID,
 389          transaction: Data<Transaction<N>>,
 390      ) -> Result<bool> {
 391          // Construct the transmission.
 392          let transmission = Transmission::Transaction(transaction.clone());
 393          // Compute the checksum.
 394          let checksum = transaction.to_checksum::<N>()?;
 395          // Construct the transmission ID.
 396          let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
 397          // Remove the transaction from the pending queue.
 398          self.pending.remove(transmission_id, Some(transmission.clone()));
 399          // Check if the transaction ID exists.
 400          if self.contains_transmission(transmission_id) {
 401              return Ok(false);
 402          }
 403          // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
 404          let transaction = spawn_blocking!({
 405              match transaction {
 406                  Data::Object(transaction) => Ok(transaction),
 407                  Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?),
 408              }
 409          })?;
 410  
 411          // Check that the transaction is well-formed and unique.
 412          self.ledger.check_transaction_basic(transaction_id, transaction).await?;
 413          // Adds the transaction to the ready queue.
 414          if self.ready.write().insert(transmission_id, transmission) {
 415              trace!(
 416                  "Worker {}.{} - Added unconfirmed transaction '{}'",
 417                  self.id,
 418                  fmt_id(transaction_id),
 419                  fmt_id(checksum).dimmed()
 420              );
 421          }
 422          Ok(true)
 423      }
 424  }
 425  
 426  impl<N: Network> Worker<N> {
 427      /// Starts the worker handlers.
 428      fn start_handlers(&self, receiver: WorkerReceiver<N>) {
 429          let WorkerReceiver { mut rx_worker_ping, mut rx_transmission_request, mut rx_transmission_response } = receiver;
 430  
 431          // Start the pending queue expiration loop.
 432          let self_ = self.clone();
 433          self.spawn(async move {
 434              loop {
 435                  // Sleep briefly.
 436                  tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
 437  
 438                  // Remove the expired pending certificate requests.
 439                  let self__ = self_.clone();
 440                  let _ = spawn_blocking!({
 441                      self__.pending.clear_expired_callbacks();
 442                      Ok(())
 443                  });
 444              }
 445          });
 446  
 447          // Process the ping events.
 448          let self_ = self.clone();
 449          self.spawn(async move {
 450              while let Some((peer_ip, transmission_id)) = rx_worker_ping.recv().await {
 451                  self_.process_transmission_id_from_ping(peer_ip, transmission_id);
 452              }
 453          });
 454  
 455          // Process the transmission requests.
 456          let self_ = self.clone();
 457          self.spawn(async move {
 458              while let Some((peer_ip, transmission_request)) = rx_transmission_request.recv().await {
 459                  self_.send_transmission_response(peer_ip, transmission_request);
 460              }
 461          });
 462  
 463          // Process the transmission responses.
 464          let self_ = self.clone();
 465          self.spawn(async move {
 466              while let Some((peer_ip, transmission_response)) = rx_transmission_response.recv().await {
 467                  // Process the transmission response.
 468                  let self__ = self_.clone();
 469                  let _ = spawn_blocking!({
 470                      self__.finish_transmission_request(peer_ip, transmission_response);
 471                      Ok(())
 472                  });
 473              }
 474          });
 475      }
 476  
 477      /// Sends a transmission request to the specified peer.
 478      async fn send_transmission_request(
 479          &self,
 480          peer_ip: SocketAddr,
 481          transmission_id: TransmissionID<N>,
 482      ) -> Result<(TransmissionID<N>, Transmission<N>)> {
 483          // Initialize a oneshot channel.
 484          let (callback_sender, callback_receiver) = oneshot::channel();
 485          // Determine how many sent requests are pending.
 486          let num_sent_requests = self.pending.num_sent_requests(transmission_id);
 487          // Determine if we've already sent a request to the peer.
 488          let contains_peer_with_sent_request = self.pending.contains_peer_with_sent_request(transmission_id, peer_ip);
 489          // Determine the maximum number of redundant requests.
 490          let num_redundant_requests = max_redundant_requests(self.ledger.clone(), self.storage.current_round())?;
 491          // Determine if we should send a transmission request to the peer.
 492          // We send at most `num_redundant_requests` requests and each peer can only receive one request at a time.
 493          let should_send_request = num_sent_requests < num_redundant_requests && !contains_peer_with_sent_request;
 494  
 495          // Insert the transmission ID into the pending queue.
 496          self.pending.insert(transmission_id, peer_ip, Some((callback_sender, should_send_request)));
 497  
 498          // Helper to print the transmission ID and checksum.
 499          let print_transmission_id = |transmission_id: TransmissionID<N>| {
 500              format!("{}.{}", fmt_id(transmission_id), fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed())
 501          };
 502  
 503          // If the number of requests is less than or equal to the the redundancy factor, send the transmission request to the peer.
 504          if should_send_request {
 505              // Send the transmission request to the peer.
 506              if self.gateway.send(peer_ip, Event::TransmissionRequest(transmission_id.into())).await.is_none() {
 507                  bail!("Unable to fetch transmission - failed to send request")
 508              }
 509          } else {
 510              debug!(
 511                  "Skipped sending request for transmission {} to '{peer_ip}' ({num_sent_requests} redundant requests)",
 512                  print_transmission_id(transmission_id)
 513              );
 514          }
 515          // Wait for the transmission to be fetched.
 516  
 517          let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver)
 518              .await
 519              .with_context(|| {
 520                  format!("Unable to fetch transmission {} (timeout)", print_transmission_id(transmission_id),)
 521              })?
 522              .with_context(|| format!("Unable to fetch transmission {}", print_transmission_id(transmission_id),))?;
 523  
 524          Ok((transmission_id, transmission))
 525      }
 526  
 527      /// Handles the incoming transmission response.
 528      /// This method ensures the transmission response is well-formed and matches the transmission ID.
 529      fn finish_transmission_request(&self, peer_ip: SocketAddr, response: TransmissionResponse<N>) {
 530          let TransmissionResponse { transmission_id, mut transmission } = response;
 531          // Check if the peer IP exists in the pending queue for the given transmission ID.
 532          let exists = self.pending.get_peers(transmission_id).unwrap_or_default().contains(&peer_ip);
 533          // If the peer IP exists, finish the pending request.
 534          if exists {
 535              // Ensure the transmission is not a fee and matches the transmission ID.
 536              match self.ledger.ensure_transmission_is_well_formed(transmission_id, &mut transmission) {
 537                  Ok(()) => {
 538                      // Remove the transmission ID from the pending queue.
 539                      self.pending.remove(transmission_id, Some(transmission));
 540                  }
 541                  Err(err) => warn!("Failed to finish transmission response from peer '{peer_ip}': {err}"),
 542              };
 543          }
 544      }
 545  
 546      /// Sends the requested transmission to the specified peer.
 547      fn send_transmission_response(&self, peer_ip: SocketAddr, request: TransmissionRequest<N>) {
 548          let TransmissionRequest { transmission_id } = request;
 549          // Attempt to retrieve the transmission.
 550          if let Some(transmission) = self.get_transmission(transmission_id) {
 551              // Send the transmission response to the peer.
 552              let self_ = self.clone();
 553              tokio::spawn(async move {
 554                  self_.gateway.send(peer_ip, Event::TransmissionResponse((transmission_id, transmission).into())).await;
 555              });
 556          }
 557      }
 558  
 559      /// Spawns a task with the given future; it should only be used for long-running tasks.
 560      fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
 561          self.handles.lock().push(tokio::spawn(future));
 562      }
 563  
 564      /// Shuts down the worker.
 565      pub(crate) fn shut_down(&self) {
 566          trace!("Shutting down worker {}...", self.id);
 567          // Abort the tasks.
 568          self.handles.lock().iter().for_each(|handle| handle.abort());
 569      }
 570  }
 571  
 572  #[cfg(test)]
 573  mod tests {
 574      use super::*;
 575      use crate::helpers::CALLBACK_EXPIRATION_IN_SECS;
 576      use alphaos_node_bft_ledger_service::LedgerService;
 577      use alphaos_node_bft_storage_service::BFTMemoryService;
 578      use alphavm::{
 579          console::{network::Network, types::Field},
 580          ledger::{
 581              Block,
 582              PendingBlock,
 583              committee::Committee,
 584              narwhal::{BatchCertificate, Subdag, Transmission, TransmissionID},
 585              test_helpers::sample_execution_transaction_with_fee,
 586          },
 587          prelude::Address,
 588      };
 589  
 590      use bytes::Bytes;
 591      use indexmap::IndexMap;
 592      use mockall::mock;
 593      use std::{io, ops::Range};
 594  
 595      type CurrentNetwork = alphavm::prelude::MainnetV0;
 596  
 597      const ITERATIONS: usize = 100;
 598  
 599      mock! {
 600          Gateway<N: Network> {}
 601          #[async_trait]
 602          impl<N:Network> Transport<N> for Gateway<N> {
 603              fn broadcast(&self, event: Event<N>);
 604              async fn send(&self, peer_ip: SocketAddr, event: Event<N>) -> Option<oneshot::Receiver<io::Result<()>>>;
 605          }
 606      }
 607  
 608      mock! {
 609          #[derive(Debug)]
 610          Ledger<N: Network> {}
 611          #[async_trait]
 612          impl<N: Network> LedgerService<N> for Ledger<N> {
 613              fn latest_round(&self) -> u64;
 614              fn latest_block_height(&self) -> u32;
 615              fn latest_block(&self) -> Block<N>;
 616              fn latest_restrictions_id(&self) -> Field<N>;
 617              fn latest_leader(&self) -> Option<(u64, Address<N>)>;
 618              fn update_latest_leader(&self, round: u64, leader: Address<N>);
 619              fn contains_block_height(&self, height: u32) -> bool;
 620              fn get_block_height(&self, hash: &N::BlockHash) -> Result<u32>;
 621              fn get_block_hash(&self, height: u32) -> Result<N::BlockHash>;
 622              fn get_block_round(&self, height: u32) -> Result<u64>;
 623              fn get_block(&self, height: u32) -> Result<Block<N>>;
 624              fn get_blocks(&self, heights: Range<u32>) -> Result<Vec<Block<N>>>;
 625              fn get_solution(&self, solution_id: &SolutionID<N>) -> Result<Solution<N>>;
 626              fn get_unconfirmed_transaction(&self, transaction_id: N::TransactionID) -> Result<Transaction<N>>;
 627              fn get_batch_certificate(&self, certificate_id: &Field<N>) -> Result<BatchCertificate<N>>;
 628              fn current_committee(&self) -> Result<Committee<N>>;
 629              fn get_committee_for_round(&self, round: u64) -> Result<Committee<N>>;
 630              fn get_committee_lookback_for_round(&self, round: u64) -> Result<Committee<N>>;
 631              fn contains_certificate(&self, certificate_id: &Field<N>) -> Result<bool>;
 632              fn contains_transmission(&self, transmission_id: &TransmissionID<N>) -> Result<bool>;
 633              fn ensure_transmission_is_well_formed(
 634                  &self,
 635                  transmission_id: TransmissionID<N>,
 636                  transmission: &mut Transmission<N>,
 637              ) -> Result<()>;
 638              async fn check_solution_basic(
 639                  &self,
 640                  solution_id: SolutionID<N>,
 641                  solution: Data<Solution<N>>,
 642              ) -> Result<()>;
 643              async fn check_transaction_basic(
 644                  &self,
 645                  transaction_id: N::TransactionID,
 646                  transaction: Transaction<N>,
 647              ) -> Result<()>;
 648              fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>>;
 649              fn check_block_content(&self, _block: PendingBlock<N>) -> Result<Block<N>>;
 650              fn check_next_block(&self, block: &Block<N>) -> Result<()>;
 651              fn prepare_advance_to_next_quorum_block(
 652                  &self,
 653                  subdag: Subdag<N>,
 654                  transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
 655              ) -> Result<Block<N>>;
 656              fn advance_to_next_block(&self, block: &Block<N>) -> Result<()>;
 657              fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
 658          }
 659      }
 660  
 661      #[tokio::test]
 662      async fn test_max_redundant_requests() {
 663          let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
 664  
 665          let rng = &mut TestRng::default();
 666          // Sample a committee.
 667          let committee =
 668              alphavm::ledger::committee::test_helpers::sample_committee_for_round_and_size(0, num_nodes, rng);
 669          let committee_clone = committee.clone();
 670          // Setup the mock ledger.
 671          let mut mock_ledger = MockLedger::default();
 672          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 673          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 674          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 675          mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
 676          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 677  
 678          // Ensure the maximum number of redundant requests is correct and consistent across iterations.
 679          assert_eq!(max_redundant_requests(ledger, 0).unwrap(), 6, "Update me if the formula changes");
 680      }
 681  
 682      #[tokio::test]
 683      async fn test_process_transmission() {
 684          let rng = &mut TestRng::default();
 685          // Sample a committee.
 686          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 687          let committee_clone = committee.clone();
 688          // Setup the mock gateway and ledger.
 689          let gateway = MockGateway::default();
 690          let mut mock_ledger = MockLedger::default();
 691          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 692          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 693          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 694          mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
 695          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 696          // Initialize the storage.
 697          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 698  
 699          // Create the Worker.
 700          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 701          let data =
 702              |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
 703          let transmission_id = TransmissionID::Solution(
 704              rng.r#gen::<u64>().into(),
 705              rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
 706          );
 707          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 708          let transmission = Transmission::Solution(data(rng));
 709  
 710          // Process the transmission.
 711          worker.process_transmission_from_peer(peer_ip, transmission_id, transmission.clone());
 712          assert!(worker.contains_transmission(transmission_id));
 713          assert!(worker.ready.read().contains(transmission_id));
 714          assert_eq!(worker.get_transmission(transmission_id), Some(transmission));
 715          // Take the transmission from the ready set.
 716          assert!(worker.ready.write().remove_front().is_some());
 717          assert!(!worker.ready.read().contains(transmission_id));
 718      }
 719  
 720      #[tokio::test]
 721      async fn test_send_transmission() {
 722          let rng = &mut TestRng::default();
 723          // Sample a committee.
 724          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 725          let committee_clone = committee.clone();
 726          // Setup the mock gateway and ledger.
 727          let mut gateway = MockGateway::default();
 728          gateway.expect_send().returning(|_, _| {
 729              let (_tx, rx) = oneshot::channel();
 730              Some(rx)
 731          });
 732          let mut mock_ledger = MockLedger::default();
 733          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 734          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 735          mock_ledger.expect_ensure_transmission_is_well_formed().returning(|_, _| Ok(()));
 736          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 737          // Initialize the storage.
 738          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 739  
 740          // Create the Worker.
 741          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 742          let transmission_id = TransmissionID::Solution(
 743              rng.r#gen::<u64>().into(),
 744              rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
 745          );
 746          let worker_ = worker.clone();
 747          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 748          let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 749          assert!(worker.pending.contains(transmission_id));
 750          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 751          // Fake the transmission response.
 752          worker.finish_transmission_request(peer_ip, TransmissionResponse {
 753              transmission_id,
 754              transmission: Transmission::Solution(Data::Buffer(Bytes::from(vec![0; 512]))),
 755          });
 756          // Check the transmission was removed from the pending set.
 757          assert!(!worker.pending.contains(transmission_id));
 758      }
 759  
 760      #[tokio::test]
 761      async fn test_process_solution_ok() {
 762          let rng = &mut TestRng::default();
 763          // Sample a committee.
 764          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 765          let committee_clone = committee.clone();
 766          // Setup the mock gateway and ledger.
 767          let mut gateway = MockGateway::default();
 768          gateway.expect_send().returning(|_, _| {
 769              let (_tx, rx) = oneshot::channel();
 770              Some(rx)
 771          });
 772          let mut mock_ledger = MockLedger::default();
 773          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 774          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 775          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 776          mock_ledger.expect_check_solution_basic().returning(|_, _| Ok(()));
 777          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 778          // Initialize the storage.
 779          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 780  
 781          // Create the Worker.
 782          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 783          let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
 784          let solution_id = rng.r#gen::<u64>().into();
 785          let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
 786          let transmission_id = TransmissionID::Solution(solution_id, solution_checksum);
 787          let worker_ = worker.clone();
 788          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 789          let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 790          assert!(worker.pending.contains(transmission_id));
 791          let result = worker.process_unconfirmed_solution(solution_id, solution).await;
 792          assert!(result.is_ok());
 793          assert!(!worker.pending.contains(transmission_id));
 794          assert!(worker.ready.read().contains(transmission_id));
 795      }
 796  
 797      #[tokio::test]
 798      async fn test_process_solution_nok() {
 799          let rng = &mut TestRng::default();
 800          // Sample a committee.
 801          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 802          let committee_clone = committee.clone();
 803          // Setup the mock gateway and ledger.
 804          let mut gateway = MockGateway::default();
 805          gateway.expect_send().returning(|_, _| {
 806              let (_tx, rx) = oneshot::channel();
 807              Some(rx)
 808          });
 809          let mut mock_ledger = MockLedger::default();
 810          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 811          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 812          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 813          mock_ledger.expect_check_solution_basic().returning(|_, _| Err(anyhow!("")));
 814          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 815          // Initialize the storage.
 816          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 817  
 818          // Create the Worker.
 819          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 820          let solution_id = rng.r#gen::<u64>().into();
 821          let solution = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
 822          let checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
 823          let transmission_id = TransmissionID::Solution(solution_id, checksum);
 824          let worker_ = worker.clone();
 825          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 826          let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 827          assert!(worker.pending.contains(transmission_id));
 828          let result = worker.process_unconfirmed_solution(solution_id, solution).await;
 829          assert!(result.is_err());
 830          assert!(!worker.pending.contains(transmission_id));
 831          assert!(!worker.ready.read().contains(transmission_id));
 832      }
 833  
 834      #[tokio::test]
 835      async fn test_process_transaction_ok() {
 836          let rng = &mut TestRng::default();
 837          // Sample a committee.
 838          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 839          let committee_clone = committee.clone();
 840          // Setup the mock gateway and ledger.
 841          let mut gateway = MockGateway::default();
 842          gateway.expect_send().returning(|_, _| {
 843              let (_tx, rx) = oneshot::channel();
 844              Some(rx)
 845          });
 846          let mut mock_ledger = MockLedger::default();
 847          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 848          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 849          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 850          mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
 851          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 852          // Initialize the storage.
 853          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 854  
 855          // Create the Worker.
 856          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 857          let transaction = sample_execution_transaction_with_fee(false, rng, 0);
 858          let transaction_id = transaction.id();
 859          let transaction_data = Data::Object(transaction);
 860          let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
 861          let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
 862          let worker_ = worker.clone();
 863          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 864          let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 865          assert!(worker.pending.contains(transmission_id));
 866          let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
 867          assert!(result.is_ok());
 868          assert!(!worker.pending.contains(transmission_id));
 869          assert!(worker.ready.read().contains(transmission_id));
 870      }
 871  
 872      #[tokio::test]
 873      async fn test_process_transaction_nok() {
 874          let mut rng = &mut TestRng::default();
 875          // Sample a committee.
 876          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 877          let committee_clone = committee.clone();
 878          // Setup the mock gateway and ledger.
 879          let mut gateway = MockGateway::default();
 880          gateway.expect_send().returning(|_, _| {
 881              let (_tx, rx) = oneshot::channel();
 882              Some(rx)
 883          });
 884          let mut mock_ledger = MockLedger::default();
 885          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 886          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 887          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 888          mock_ledger.expect_check_transaction_basic().returning(|_, _| Err(anyhow!("")));
 889          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 890          // Initialize the storage.
 891          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 892  
 893          // Create the Worker.
 894          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 895          let transaction_id: <CurrentNetwork as Network>::TransactionID = Field::<CurrentNetwork>::rand(&mut rng).into();
 896          let transaction = Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
 897          let checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
 898          let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
 899          let worker_ = worker.clone();
 900          let peer_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
 901          let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 902          assert!(worker.pending.contains(transmission_id));
 903          let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
 904          assert!(result.is_err());
 905          assert!(!worker.pending.contains(transmission_id));
 906          assert!(!worker.ready.read().contains(transmission_id));
 907      }
 908  
 909      #[tokio::test]
 910      async fn test_flood_transmission_requests() {
 911          let rng = &mut TestRng::default();
 912          // Sample a committee.
 913          let committee = alphavm::ledger::committee::test_helpers::sample_committee(rng);
 914          let committee_clone = committee.clone();
 915          // Setup the mock gateway and ledger.
 916          let mut gateway = MockGateway::default();
 917          gateway.expect_send().returning(|_, _| {
 918              let (_tx, rx) = oneshot::channel();
 919              Some(rx)
 920          });
 921          let mut mock_ledger = MockLedger::default();
 922          mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
 923          mock_ledger.expect_get_committee_lookback_for_round().returning(move |_| Ok(committee_clone.clone()));
 924          mock_ledger.expect_contains_transmission().returning(|_| Ok(false));
 925          mock_ledger.expect_check_transaction_basic().returning(|_, _| Ok(()));
 926          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
 927          // Initialize the storage.
 928          let storage = Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
 929  
 930          // Create the Worker.
 931          let worker = Worker::new(0, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
 932          let transaction = sample_execution_transaction_with_fee(false, rng, 0);
 933          let transaction_id = transaction.id();
 934          let transaction_data = Data::Object(transaction);
 935          let checksum = transaction_data.to_checksum::<CurrentNetwork>().unwrap();
 936          let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
 937  
 938          // Determine the number of redundant requests are sent.
 939          let num_redundant_requests =
 940              max_redundant_requests(worker.ledger.clone(), worker.storage.current_round()).unwrap();
 941          let num_flood_requests = num_redundant_requests * 10;
 942          let mut peer_ips =
 943              (0..num_flood_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect_vec();
 944          let first_peer_ip = peer_ips[0];
 945  
 946          // Flood the pending queue with transmission requests.
 947          for i in 1..=num_flood_requests {
 948              let worker_ = worker.clone();
 949              let peer_ip = peer_ips.pop().unwrap();
 950              tokio::spawn(async move {
 951                  let _ = worker_.send_transmission_request(peer_ip, transmission_id).await;
 952              });
 953              tokio::time::sleep(Duration::from_millis(10)).await;
 954              // Check that the number of sent requests does not exceed the maximum number of redundant requests.
 955              assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
 956              assert_eq!(worker.pending.num_callbacks(transmission_id), i);
 957          }
 958          // Check that the number of sent requests does not exceed the maximum number of redundant requests.
 959          assert_eq!(worker.pending.num_sent_requests(transmission_id), num_redundant_requests);
 960          assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
 961  
 962          // Let all the requests expire.
 963          tokio::time::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1)).await;
 964          assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
 965          assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
 966  
 967          // Flood the pending queue with transmission requests again, this time to a single peer
 968          for i in 1..=num_flood_requests {
 969              let worker_ = worker.clone();
 970              tokio::spawn(async move {
 971                  let _ = worker_.send_transmission_request(first_peer_ip, transmission_id).await;
 972              });
 973              tokio::time::sleep(Duration::from_millis(10)).await;
 974              assert!(worker.pending.num_sent_requests(transmission_id) <= num_redundant_requests);
 975              assert_eq!(worker.pending.num_callbacks(transmission_id), i);
 976          }
 977          // Check that the number of sent requests does not exceed the maximum number of redundant requests.
 978          assert_eq!(worker.pending.num_sent_requests(transmission_id), 1);
 979          assert_eq!(worker.pending.num_callbacks(transmission_id), num_flood_requests);
 980  
 981          // Check that fulfilling a transmission request clears the pending queue.
 982          let result = worker.process_unconfirmed_transaction(transaction_id, transaction_data).await;
 983          assert!(result.is_ok());
 984          assert_eq!(worker.pending.num_sent_requests(transmission_id), 0);
 985          assert_eq!(worker.pending.num_callbacks(transmission_id), 0);
 986          assert!(!worker.pending.contains(transmission_id));
 987          assert!(worker.ready.read().contains(transmission_id));
 988      }
 989  
 990      #[tokio::test]
 991      async fn test_storage_gc_on_initialization() {
 992          let rng = &mut TestRng::default();
 993  
 994          for _ in 0..ITERATIONS {
 995              // Mock the ledger round.
 996              let max_gc_rounds = rng.gen_range(50..=100);
 997              let latest_ledger_round = rng.gen_range((max_gc_rounds + 1)..1000);
 998              let expected_gc_round = latest_ledger_round - max_gc_rounds;
 999  
1000              // Sample a committee.
1001              let committee =
1002                  alphavm::ledger::committee::test_helpers::sample_committee_for_round(latest_ledger_round, rng);
1003  
1004              // Setup the mock gateway and ledger.
1005              let mut mock_ledger = MockLedger::default();
1006              mock_ledger.expect_current_committee().returning(move || Ok(committee.clone()));
1007  
1008              let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(mock_ledger);
1009              // Initialize the storage.
1010              let storage =
1011                  Storage::<CurrentNetwork>::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1012  
1013              // Ensure that the storage GC round is correct.
1014              assert_eq!(storage.gc_round(), expected_gc_round);
1015          }
1016      }
1017  }
1018  
1019  #[cfg(test)]
1020  mod prop_tests {
1021      use super::*;
1022      use crate::Gateway;
1023      use alphaos_node_bft_ledger_service::MockLedgerService;
1024      use alphavm::{
1025          console::account::Address,
1026          ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1027      };
1028  
1029      use test_strategy::proptest;
1030  
1031      type CurrentNetwork = alphavm::prelude::MainnetV0;
1032  
1033      // Initializes a new test committee.
1034      fn new_test_committee(n: u16) -> Committee<CurrentNetwork> {
1035          let mut members = IndexMap::with_capacity(n as usize);
1036          for i in 0..n {
1037              // Sample the address.
1038              let rng = &mut TestRng::fixed(i as u64);
1039              let address = Address::new(rng.r#gen());
1040              info!("Validator {i}: {address}");
1041              members.insert(address, (MIN_VALIDATOR_STAKE, false, rng.gen_range(0..100)));
1042          }
1043          // Initialize the committee.
1044          Committee::<CurrentNetwork>::new(1u64, members).unwrap()
1045      }
1046  
1047      #[proptest]
1048      fn worker_initialization(
1049          #[strategy(0..MAX_WORKERS)] id: u8,
1050          gateway: Gateway<CurrentNetwork>,
1051          storage: Storage<CurrentNetwork>,
1052      ) {
1053          let committee = new_test_committee(4);
1054          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1055          let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default()).unwrap();
1056          assert_eq!(worker.id(), id);
1057      }
1058  
1059      #[proptest]
1060      fn invalid_worker_id(
1061          #[strategy(MAX_WORKERS..)] id: u8,
1062          gateway: Gateway<CurrentNetwork>,
1063          storage: Storage<CurrentNetwork>,
1064      ) {
1065          let committee = new_test_committee(4);
1066          let ledger: Arc<dyn LedgerService<CurrentNetwork>> = Arc::new(MockLedgerService::new(committee));
1067          let worker = Worker::new(id, Arc::new(gateway), storage, ledger, Default::default());
1068          // TODO once Worker implements Debug, simplify this with `unwrap_err`
1069          if let Err(error) = worker {
1070              assert_eq!(error.to_string(), format!("Invalid worker ID '{id}'"));
1071          }
1072      }
1073  }