worker.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::common::{ 17 CurrentNetwork, 18 primary::new_test_committee, 19 utils::{sample_ledger, sample_worker}, 20 }; 21 use alphaos_node_bft::helpers::max_redundant_requests; 22 use alphavm::{ 23 ledger::narwhal::TransmissionID, 24 prelude::{Network, TestRng}, 25 }; 26 27 use std::net::SocketAddr; 28 29 #[tokio::test] 30 #[rustfmt::skip] 31 async fn test_resend_transmission_request() { 32 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1; 33 34 // Initialize the RNG. 35 let mut rng = TestRng::default(); 36 // Initialize the accounts and the committee. 37 let (accounts, committee) = new_test_committee(num_nodes, &mut rng); 38 // Sample a ledger. 39 let ledger = { 40 let accounts = accounts.clone(); 41 tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap() 42 }; 43 // Sample a worker. 44 let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); 45 46 // Determine the maximum number of redundant requests. 47 let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap(); 48 assert_eq!(max_redundancy, 6, "Update me if the formula changes"); 49 50 // Prepare peer ips. 51 let num_test_requests = 11; 52 let mut peer_ips = (0..num_test_requests).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>(); 53 let initial_peer_ip = peer_ips.pop().unwrap(); 54 55 // Prepare a dummy transmission ID. 56 let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default()); 57 58 // Ensure the worker does not have the dummy transmission ID. 59 assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist"); 60 61 // Send a request to fetch the dummy transmission. 62 let worker_ = worker.clone(); 63 tokio::spawn(async move { worker_.get_or_fetch_transmission(initial_peer_ip, transmission_id).await }); 64 65 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 66 67 let pending = worker.pending(); 68 // Ensure the transmission ID exists in the pending queue. 69 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 70 // Ensure the peer IP is in the pending queue for the transmission ID. 71 assert!(pending.contains_peer(transmission_id, initial_peer_ip), "Missing a peer IP for transmission in the pending queue"); 72 assert_eq!(pending.get_peers(transmission_id), Some([initial_peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue"); 73 // Ensure the number of callbacks is correct. 74 assert_eq!(pending.num_callbacks(transmission_id), 1, "Incorrect number of callbacks for transmission"); 75 // Ensure the number of sent requests is correct. 76 assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission"); 77 78 // Rebroadcast the same request to the same peer to fetch the dummy transmission. 79 for i in 1..num_test_requests { 80 let worker_ = worker.clone(); 81 let peer_ip = initial_peer_ip; 82 tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await }); 83 84 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 85 86 // Ensure the transmission ID exists in the pending queue. 87 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 88 // Ensure the peer IP is in the pending queue for the transmission ID. 89 assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue"); 90 assert_eq!(pending.get_peers(transmission_id), Some([peer_ip].into_iter().collect()), "Missing a peer IP for transmission in the pending queue"); 91 // Ensure the number of callbacks is correct. 92 assert_eq!(pending.num_callbacks(transmission_id), 1 + i, "Incorrect number of callbacks for transmission"); 93 // Ensure the number of sent requests is correct. 94 assert_eq!(pending.num_sent_requests(transmission_id), 1, "Incorrect number of sent requests for transmission"); 95 } 96 97 // Rebroadcast the same request to new peers fetch the dummy transmission. 98 for i in 1..num_test_requests { 99 let peer_ip = peer_ips.pop().unwrap(); 100 let worker_ = worker.clone(); 101 tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await }); 102 103 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 104 105 // Ensure the transmission ID exists in the pending queue. 106 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 107 // Ensure the peer IP is in the pending queue for the transmission ID. 108 assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue"); 109 // Ensure the number of sent requests is correct. 110 assert_eq!(pending.num_sent_requests(transmission_id), (1 + i).min(max_redundancy), "Incorrect number of sent requests for transmission"); 111 } 112 } 113 114 #[tokio::test] 115 #[rustfmt::skip] 116 async fn test_flood_transmission_requests() { 117 let num_nodes: u16 = CurrentNetwork::MAX_CERTIFICATES.first().unwrap().1; 118 119 // Initialize the RNG. 120 let mut rng = TestRng::default(); 121 // Initialize the accounts and the committee. 122 let (accounts, committee) = new_test_committee(num_nodes, &mut rng); 123 // Sample a ledger. 124 let ledger = { 125 let accounts = accounts.clone(); 126 tokio::task::spawn_blocking(move || sample_ledger(&accounts, &committee, &mut rng)).await.unwrap() 127 }; 128 // Sample a worker. 129 let worker = sample_worker(0, accounts[0].clone(), ledger.clone()); 130 131 // Determine the maximum number of redundant requests. 132 let max_redundancy = max_redundant_requests(ledger.clone(), 0).unwrap(); 133 assert_eq!(max_redundancy, 6, "Update me if the formula changes"); 134 135 // Prepare peer ips. 136 let mut peer_ips = (0..max_redundancy + 1).map(|i| SocketAddr::from(([127, 0, 0, 1], 1234 + i as u16))).collect::<Vec<_>>(); 137 let all_peer_ips = peer_ips.clone(); 138 let initial_peer_ip = peer_ips.pop().unwrap(); 139 let mut remaining_peer_ips = peer_ips; 140 141 // Prepare a dummy transmission ID. 142 let transmission_id = TransmissionID::Transaction(<CurrentNetwork as Network>::TransactionID::default(), <CurrentNetwork as Network>::TransmissionChecksum::default()); 143 144 // Ensure the worker does not have the dummy transmission ID. 145 assert!(!worker.contains_transmission(transmission_id), "Transmission should not exist"); 146 147 // Send the maximum number of redundant requests to fetch the dummy transmission. 148 for peer_ip in remaining_peer_ips.clone() { 149 let worker_ = worker.clone(); 150 tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await }); 151 } 152 153 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 154 155 let pending = worker.pending(); 156 // Ensure the transmission ID exists in the pending queue. 157 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 158 // Ensure the peer IP is in the pending queue for the transmission ID. 159 assert_eq!(pending.get_peers(transmission_id), Some(remaining_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue"); 160 // Ensure the number of callbacks is correct. 161 assert_eq!(pending.num_callbacks(transmission_id), max_redundancy, "Incorrect number of callbacks for transmission"); 162 // Ensure the number of sent requests is correct. 163 assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission"); 164 165 // Ensure any further redundant requests are not sent when sending to the same peer. 166 for i in 1..=6 { 167 let worker_ = worker.clone(); 168 let peer_ip = initial_peer_ip; 169 tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await }); 170 171 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 172 173 // Ensure the transmission ID exists in the pending queue. 174 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 175 // Ensure the peer IP is in the pending queue for the transmission ID. 176 assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue"); 177 assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue"); 178 // Ensure the number of callbacks is correct. 179 assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + i, "Incorrect number of callbacks for transmission"); 180 // Ensure the number of sent requests is correct. 181 assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission"); 182 } 183 184 // Ensure any further redundant requests are not sent when sending to new peers. 185 for i in 1..=6 { 186 let worker_ = worker.clone(); 187 let peer_ip = remaining_peer_ips.pop().unwrap(); 188 tokio::spawn(async move { worker_.get_or_fetch_transmission(peer_ip, transmission_id).await }); 189 190 tokio::time::sleep(std::time::Duration::from_millis(10)).await; 191 192 // Ensure the transmission ID exists in the pending queue. 193 assert!(pending.contains(transmission_id), "Missing a transmission in the pending queue"); 194 // Ensure the peer IP is in the pending queue for the transmission ID. 195 assert!(pending.contains_peer(transmission_id, peer_ip), "Missing a peer IP for transmission in the pending queue"); 196 assert_eq!(pending.get_peers(transmission_id), Some(all_peer_ips.clone().into_iter().collect()), "Missing a peer IP for transmission in the pending queue"); 197 // Ensure the number of callbacks is correct. 198 assert_eq!(pending.num_callbacks(transmission_id), max_redundancy + 6 + i, "Incorrect number of callbacks for transmission"); 199 // Ensure the number of sent requests is correct. 200 assert_eq!(pending.num_sent_requests(transmission_id), max_redundancy, "Incorrect number of sent requests for transmission"); 201 } 202 }