/ node / bft / tests / bft_e2e.rs
bft_e2e.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  #[allow(dead_code)]
 17  mod common;
 18  #[allow(dead_code)]
 19  mod components;
 20  
 21  use crate::common::primary::{TestNetwork, TestNetworkConfig};
 22  use alphaos_node_bft::MAX_FETCH_TIMEOUT_IN_MS;
 23  use deadline::deadline;
 24  use itertools::Itertools;
 25  use std::time::Duration;
 26  use tokio::time::sleep;
 27  
 28  #[tokio::test(flavor = "multi_thread")]
 29  #[ignore = "long-running e2e test"]
 30  async fn test_state_coherence() {
 31      const N: u16 = 4;
 32      const TRANSMISSION_INTERVAL_MS: u64 = 10;
 33  
 34      let mut network = tokio::task::spawn_blocking(|| {
 35          TestNetwork::new(TestNetworkConfig {
 36              num_nodes: N,
 37              bft: true,
 38              connect_all: true,
 39              fire_transmissions: Some(TRANSMISSION_INTERVAL_MS),
 40              // Set this to Some(0..=4) to see the logs.
 41              log_level: Some(0),
 42              log_connections: true,
 43          })
 44      })
 45      .await
 46      .unwrap();
 47  
 48      network.start().await;
 49  
 50      std::future::pending::<()>().await;
 51  }
 52  
 53  #[tokio::test(flavor = "multi_thread")]
 54  #[ignore = "fails"]
 55  async fn test_resync() {
 56      // Start N nodes, connect them and start the cannons for each.
 57      const N: u16 = 4;
 58      const TRANSMISSION_INTERVAL_MS: u64 = 10;
 59      let mut network = tokio::task::spawn_blocking(|| {
 60          TestNetwork::new(TestNetworkConfig {
 61              num_nodes: N,
 62              bft: true,
 63              connect_all: true,
 64              fire_transmissions: Some(TRANSMISSION_INTERVAL_MS),
 65              // Set this to Some(0..=4) to see the logs.
 66              log_level: Some(0),
 67              log_connections: false,
 68          })
 69      })
 70      .await
 71      .unwrap();
 72      network.start().await;
 73  
 74      // Let the nodes advance through the rounds.
 75      const BREAK_ROUND: u64 = 4;
 76      let network_clone = network.clone();
 77      deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(BREAK_ROUND) });
 78  
 79      network.disconnect(N).await;
 80  
 81      let mut spare_network = TestNetwork::new(TestNetworkConfig {
 82          num_nodes: N,
 83          bft: true,
 84          connect_all: false,
 85          fire_transmissions: None,
 86          log_level: None,
 87          log_connections: false,
 88      });
 89      spare_network.start().await;
 90  
 91      for i in 1..N {
 92          let spare_validator = spare_network.validators.get(&i).cloned().unwrap();
 93          network.validators.insert(i, spare_validator);
 94      }
 95  
 96      network.connect_all().await;
 97  
 98      const RECOVERY_ROUND: u64 = 8;
 99      let network_clone = network.clone();
100      deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(RECOVERY_ROUND) });
101  }
102  
103  #[tokio::test(flavor = "multi_thread")]
104  async fn test_quorum_threshold() {
105      // Start N nodes but don't connect them.
106      const N: u16 = 4;
107      const TRANSMISSION_INTERVAL_MS: u64 = 10;
108  
109      let mut network = tokio::task::spawn_blocking(|| {
110          TestNetwork::new(TestNetworkConfig {
111              num_nodes: N,
112              bft: true,
113              connect_all: false,
114              fire_transmissions: None,
115              // Set this to Some(0..=4) to see the logs.
116              log_level: None,
117              log_connections: true,
118          })
119      })
120      .await
121      .unwrap();
122      network.start().await;
123  
124      // Check each node is at round 1 (0 is genesis).
125      for validators in network.validators.values() {
126          assert_eq!(validators.primary.current_round(), 1);
127      }
128  
129      // Start the cannons for node 0.
130      network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS);
131  
132      sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
133  
134      // Check each node is still at round 1.
135      for validator in network.validators.values() {
136          assert_eq!(validator.primary.current_round(), 1);
137      }
138  
139      // Connect the first two nodes and start the cannons for node 1.
140      network.connect_validators(0, 1).await;
141      network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS);
142  
143      sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await;
144  
145      // Check each node is still at round 1.
146      for validator in network.validators.values() {
147          assert_eq!(validator.primary.current_round(), 1);
148      }
149  
150      // Connect the third node and start the cannons for it.
151      network.connect_validators(0, 2).await;
152      network.connect_validators(1, 2).await;
153      network.fire_transmissions_at(2, TRANSMISSION_INTERVAL_MS);
154  
155      // Check the nodes reach quorum and advance through the rounds.
156      const TARGET_ROUND: u64 = 4;
157      let net = network.clone();
158      deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) });
159  }
160  
161  #[tokio::test(flavor = "multi_thread")]
162  async fn test_quorum_break() {
163      // Start N nodes, connect them and start the cannons for each.
164      const N: u16 = 4;
165      const TRANSMISSION_INTERVAL_MS: u64 = 10;
166      let mut network = tokio::task::spawn_blocking(|| {
167          TestNetwork::new(TestNetworkConfig {
168              num_nodes: N,
169              bft: true,
170              connect_all: true,
171              fire_transmissions: Some(TRANSMISSION_INTERVAL_MS),
172              // Set this to Some(0..=4) to see the logs.
173              log_level: None,
174              log_connections: true,
175          })
176      })
177      .await
178      .unwrap();
179      network.start().await;
180  
181      // Check the nodes have started advancing through the rounds.
182      const TARGET_ROUND: u64 = 4;
183      // Note: cloning the network is fine because the primaries it wraps are `Arc`ed.
184      let network_clone = network.clone();
185      deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(TARGET_ROUND) });
186  
187      // Break the quorum by disconnecting two nodes.
188      const NUM_NODES: u16 = 2;
189      network.disconnect(NUM_NODES).await;
190  
191      // Check the nodes have stopped advancing through the rounds.
192      assert!(network.is_halted().await);
193  }
194  
195  #[tokio::test(flavor = "multi_thread")]
196  async fn test_leader_election_consistency() {
197      // The minimum and maximum rounds to check for leader consistency.
198      // From manual experimentation, the minimum round that works is 4.
199      // Starting at 0 or 2 causes assertion failures. Seems like the committee takes a few rounds to stabilize.
200      const STARTING_ROUND: u64 = 4;
201      const MAX_ROUND: u64 = 20;
202  
203      // Start N nodes, connect them and start the cannons for each.
204      const N: u16 = 4;
205      const CANNON_INTERVAL_MS: u64 = 10;
206      let mut network = tokio::task::spawn_blocking(|| {
207          TestNetwork::new(TestNetworkConfig {
208              num_nodes: N,
209              bft: true,
210              connect_all: true,
211              fire_transmissions: Some(CANNON_INTERVAL_MS),
212              // Set this to Some(0..=4) to see the logs.
213              log_level: None,
214              log_connections: true,
215          })
216      })
217      .await
218      .unwrap();
219      network.start().await;
220  
221      // Wait for starting round to be reached
222      let cloned_network = network.clone();
223      deadline!(Duration::from_secs(60), move || { cloned_network.is_round_reached(STARTING_ROUND) });
224  
225      // Check that validators agree about leaders in every even round
226      for target_round in (STARTING_ROUND..=MAX_ROUND).step_by(2) {
227          let cloned_network = network.clone();
228          deadline!(Duration::from_secs(20), move || { cloned_network.is_round_reached(target_round) });
229  
230          // Get all validators in the network
231          let validators = network.validators.values().collect_vec();
232  
233          // Get leaders of all validators in the current round
234          let mut leaders = Vec::new();
235          for validator in validators.iter() {
236              if validator.primary.current_round() == target_round {
237                  let bft = validator.bft.get().unwrap();
238                  if let Some(leader) = bft.leader() {
239                      // Validator is a live object - just because it's
240                      // been on the current round above doesn't mean
241                      // that's still the case
242                      if validator.primary.current_round() == target_round {
243                          leaders.push(leader);
244                      }
245                  }
246              }
247          }
248  
249          println!("Found {} validators with a leader ({} out of sync)", leaders.len(), validators.len() - leaders.len());
250  
251          // Assert that all leaders are equal
252          assert!(leaders.iter().all_equal());
253      }
254  }
255  
256  #[tokio::test(flavor = "multi_thread")]
257  #[ignore = "run multiple times to see failure"]
258  async fn test_transient_break() {
259      // Start N nodes, connect them and start the cannons for each.
260      const N: u16 = 4;
261      const TRANSMISSION_INTERVAL_MS: u64 = 10;
262      let mut network = tokio::task::spawn_blocking(|| {
263          TestNetwork::new(TestNetworkConfig {
264              num_nodes: N,
265              bft: true,
266              connect_all: true,
267              fire_transmissions: Some(TRANSMISSION_INTERVAL_MS),
268              // Set this to Some(0..=4) to see the logs.
269              log_level: Some(6),
270              log_connections: false,
271          })
272      })
273      .await
274      .unwrap();
275      network.start().await;
276  
277      // Check the nodes have started advancing through the rounds.
278      const FIRST_BREAK_ROUND: u64 = 10;
279      let network_clone = network.clone();
280      deadline!(Duration::from_secs(60), move || { network_clone.is_round_reached(FIRST_BREAK_ROUND) });
281  
282      // Disconnect the last node.
283      network.disconnect_one(3).await;
284  
285      // Check the nodes have started advancing through the rounds.
286      const SECOND_BREAK_ROUND: u64 = 25;
287      let network_clone = network.clone();
288      deadline!(Duration::from_secs(80), move || { network_clone.is_round_reached(SECOND_BREAK_ROUND) });
289  
290      // Disconnect another node, break quorum.
291      network.disconnect_one(2).await;
292  
293      // Check the nodes have stopped advancing through the rounds.
294      assert!(network.is_halted().await);
295  
296      // Connect the last node again.
297      network.connect_one(3).await;
298  
299      const RECOVERY_ROUND: u64 = 30;
300      let network_clone = network.clone();
301      deadline!(Duration::from_secs(60), move || { network_clone.is_round_reached(RECOVERY_ROUND) });
302  }