/ node / bft / tests / components / worker.rs
worker.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::common::{
 17      CurrentNetwork,
 18      primary::new_test_committee,
 19      utils::{sample_ledger, sample_worker},
 20  };
 21  use alphaos_node_bft::helpers::max_redundant_requests;
 22  use alphavm::{
 23      ledger::narwhal::TransmissionID,
 24      prelude::{Network, TestRng},
 25  };
 26  
 27  use std::net::SocketAddr;
 28  
 29  #[tokio::test]
 30  #[rustfmt::skip]
 31  async fn test_resend_transmission_request() {
 32      let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
 33  
 34      // Initialize the RNG.
 35      let mut rng = TestRng::default();
 36      // Initialize the accounts and the committee.
 37      let (accounts, committee) = new_test_committee(num_nodes, &mut rng);
 38      // Sample a ledger.
 39      let ledger = {
 40          let accounts = accounts.clone();
 41          tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap()
 42      };
 43      // Sample a worker.
 44      let worker = sample_worker(0, accounts[0].clone(), ledger.clone());
 45  
 46      // Determine the maximum number of redundant requests.
 47      let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap();
 48      assert_eq!(max_redundancy, 6, "Update me if the formula changes");
 49  
 50      // Prepare peer ips.
 51      let num_test_requests = 11;
 52      let mut peer_ips = (0..num_test_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>();
 53      let initial_peer_ip = peer_ips.pop().unwrap();
 54  
 55      // Prepare a dummy transmission ID.
 56      let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default());
 57  
 58      // Ensure the worker does not have the dummy transmission ID.
 59      assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist");
 60  
 61      // Send a request to fetch the dummy transmission.
 62      let worker_ = worker.clone();
 63      tokio::spawn(async move { worker_.get_or_fetch_transmission(initial_peer_ip, transmission_id).await });
 64  
 65      tokio::time::sleep(std::time::Duration::from_millis(10)).await;
 66  
 67      let pending = worker.pending();
 68      // Ensure the transmission ID exists in the pending queue.
 69      assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
 70      // Ensure the peer IP is in the pending queue for the transmission ID.
 71      assert!(pending.contains_peer(transmission_id, initial_peer_ip), "Missing a peer IP for transmission in the pending queue");
 72      assert_eq!(pending.get_peers(transmission_id), Some([initial_peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
 73      // Ensure the number of callbacks is correct.
 74      assert_eq!(pending.num_callbacks(transmission_id), 1, "Incorrect number of callbacks for transmission");
 75      // Ensure the number of sent requests is correct.
 76      assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission");
 77  
 78      // Rebroadcast the same request to the same peer to fetch the dummy transmission.
 79      for i in 1..num_test_requests {
 80          let worker_ = worker.clone();
 81          let peer_ip = initial_peer_ip;
 82          tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
 83  
 84          tokio::time::sleep(std::time::Duration::from_millis(10)).await;
 85  
 86          // Ensure the transmission ID exists in the pending queue.
 87          assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
 88          // Ensure the peer IP is in the pending queue for the transmission ID.
 89          assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
 90          assert_eq!(pending.get_peers(transmission_id), Some([peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
 91          // Ensure the number of callbacks is correct.
 92          assert_eq!(pending.num_callbacks(transmission_id), 1 + i, "Incorrect number of callbacks for transmission");
 93          // Ensure the number of sent requests is correct.
 94          assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission");
 95      }
 96  
 97      // Rebroadcast the same request to new peers fetch the dummy transmission.
 98      for i in 1..num_test_requests {
 99          let peer_ip = peer_ips.pop().unwrap();
100          let worker_ = worker.clone();
101          tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
102  
103          tokio::time::sleep(std::time::Duration::from_millis(10)).await;
104  
105          // Ensure the transmission ID exists in the pending queue.
106          assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
107          // Ensure the peer IP is in the pending queue for the transmission ID.
108          assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
109          // Ensure the number of sent requests is correct.
110          assert_eq!(pending.num_sent_requests(transmission_id), (1 + i).min(max_redundancy), "Incorrect number of sent requests for transmission");
111      }
112  }
113  
114  #[tokio::test]
115  #[rustfmt::skip]
116  async fn test_flood_transmission_requests() {
117      let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1;
118  
119      // Initialize the RNG.
120      let mut rng = TestRng::default();
121      // Initialize the accounts and the committee.
122      let (accounts, committee) = new_test_committee(num_nodes, &mut rng);
123      // Sample a ledger.
124      let ledger = {
125          let accounts = accounts.clone();
126          tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap()
127      };
128      // Sample a worker.
129      let worker = sample_worker(0, accounts[0].clone(), ledger.clone());
130  
131      // Determine the maximum number of redundant requests.
132      let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap();
133      assert_eq!(max_redundancy, 6, "Update me if the formula changes");
134  
135      // Prepare peer ips.
136      let mut peer_ips = (0..max_redundancy + 1).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>();
137      let all_peer_ips = peer_ips.clone();
138      let initial_peer_ip = peer_ips.pop().unwrap();
139      let mut remaining_peer_ips = peer_ips;
140  
141      // Prepare a dummy transmission ID.
142      let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default());
143  
144      // Ensure the worker does not have the dummy transmission ID.
145      assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist");
146  
147      // Send the maximum number of redundant requests to fetch the dummy transmission.
148      for peer_ip in remaining_peer_ips.clone() {
149          let worker_ = worker.clone();
150          tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
151      }
152  
153      tokio::time::sleep(std::time::Duration::from_millis(10)).await;
154  
155      let pending = worker.pending();
156      // Ensure the transmission ID exists in the pending queue.
157      assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
158      // Ensure the peer IP is in the pending queue for the transmission ID.
159      assert_eq!(pending.get_peers(transmission_id), Some(remaining_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
160      // Ensure the number of callbacks is correct.
161      assert_eq!(pending.num_callbacks(transmission_id), max_redundancy, "Incorrect number of callbacks for transmission");
162      // Ensure the number of sent requests is correct.
163      assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
164  
165      // Ensure any further redundant requests are not sent when sending to the same peer.
166      for i in 1..=6 {
167          let worker_ = worker.clone();
168          let peer_ip = initial_peer_ip;
169          tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
170  
171          tokio::time::sleep(std::time::Duration::from_millis(10)).await;
172  
173          // Ensure the transmission ID exists in the pending queue.
174          assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
175          // Ensure the peer IP is in the pending queue for the transmission ID.
176          assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
177          assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
178          // Ensure the number of callbacks is correct.
179          assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + i, "Incorrect number of callbacks for transmission");
180          // Ensure the number of sent requests is correct.
181          assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
182      }
183  
184      // Ensure any further redundant requests are not sent when sending to new peers.
185      for i in 1..=6 {
186          let worker_ = worker.clone();
187          let peer_ip = remaining_peer_ips.pop().unwrap();
188          tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await });
189  
190          tokio::time::sleep(std::time::Duration::from_millis(10)).await;
191  
192          // Ensure the transmission ID exists in the pending queue.
193          assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue");
194          // Ensure the peer IP is in the pending queue for the transmission ID.
195          assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue");
196          assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue");
197          // Ensure the number of callbacks is correct.
198          assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + 6 + i, "Incorrect number of callbacks for transmission");
199          // Ensure the number of sent requests is correct.
200          assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission");
201      }
202  }