/ abzu-core / tests / stress_tests.rs
stress_tests.rs
  1  //! Concurrency Stress Tests for Abzu Core
  2  //!
  3  //! These tests specifically target potential deadlock and race condition
  4  //! scenarios in the `Arc<Mutex<HashMap>>` peer management pattern.
  5  
  6  use std::sync::Arc;
  7  use std::time::Duration;
  8  
  9  use abzu_core::{Node, NodeConfig};
 10  use tokio::sync::Barrier;
 11  use tokio::time::timeout;
 12  
 13  /// Test helper: create a node with temp storage
 14  fn create_test_node() -> Arc<Node> {
 15      let dir = tempfile::tempdir().unwrap();
 16      let config = NodeConfig {
 17          storage_path: dir.path().to_str().unwrap().to_string(),
 18          ..Default::default()
 19      };
 20      Arc::new(Node::new(config).unwrap())
 21  }
 22  
 23  /// Stress test: Multiple tasks concurrently accessing the peer map
 24  /// 
 25  /// This catches the common deadlock pattern where async code holds a mutex
 26  /// across an await point, or where multiple mutexes are acquired in
 27  /// inconsistent order.
 28  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
 29  async fn test_concurrent_peer_map_access() {
 30      let node = create_test_node();
 31      let num_tasks = 50;
 32      let ops_per_task = 100;
 33      
 34      // Barrier ensures all tasks start simultaneously
 35      let barrier = Arc::new(Barrier::new(num_tasks));
 36      
 37      let mut handles = Vec::new();
 38      
 39      for task_id in 0..num_tasks {
 40          let node = Arc::clone(&node);
 41          let barrier = Arc::clone(&barrier);
 42          
 43          let handle = tokio::spawn(async move {
 44              barrier.wait().await;
 45              
 46              for op in 0..ops_per_task {
 47                  // Alternate between read and write operations
 48                  if op % 2 == 0 {
 49                      // Read operation: get peer count
 50                      let peers = node.peers();
 51                      let _count = peers.lock().await.len();
 52                  } else {
 53                      // Write operation would go here (peer add/remove)
 54                      // For now just access the router (RwLock)
 55                      let router = node.router();
 56                      let _table = router.read().await;
 57                  }
 58                  
 59                  // Tiny yield to increase interleaving
 60                  if op % 10 == 0 {
 61                      tokio::task::yield_now().await;
 62                  }
 63              }
 64              
 65              task_id
 66          });
 67          
 68          handles.push(handle);
 69      }
 70      
 71      // All tasks should complete within 5 seconds (generous timeout)
 72      // Deadlock would cause this to hang
 73      let result = timeout(Duration::from_secs(5), async {
 74          for handle in handles {
 75              handle.await.expect("Task panicked");
 76          }
 77      })
 78      .await;
 79      
 80      assert!(result.is_ok(), "Deadlock detected: tasks did not complete in time");
 81  }
 82  
 83  /// Stress test: Interleaved Mutex and RwLock access
 84  ///
 85  /// The Node uses both `Arc<Mutex<HashMap>>` for peers and `Arc<RwLock<RoutingTable>>`
 86  /// for routing. If the locking order is ever inconsistent, this test will deadlock.
 87  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
 88  async fn test_mixed_lock_acquisition() {
 89      let node = create_test_node();
 90      let num_tasks = 20;
 91      let ops_per_task = 50;
 92      
 93      let barrier = Arc::new(Barrier::new(num_tasks));
 94      let mut handles = Vec::new();
 95      
 96      for task_id in 0..num_tasks {
 97          let node = Arc::clone(&node);
 98          let barrier = Arc::clone(&barrier);
 99          
100          let handle = tokio::spawn(async move {
101              barrier.wait().await;
102              
103              for _ in 0..ops_per_task {
104                  // Pattern 1: peers then router
105                  {
106                      let peers = node.peers();
107                      let _p = peers.lock().await;
108                      let router = node.router();
109                      let _r = router.read().await;
110                  }
111                  
112                  // Pattern 2: router then peers (inverted order)
113                  {
114                      let router = node.router();
115                      let _r = router.read().await;
116                      let peers = node.peers();
117                      let _p = peers.lock().await;
118                  }
119                  
120                  tokio::task::yield_now().await;
121              }
122              
123              task_id
124          });
125          
126          handles.push(handle);
127      }
128      
129      let result = timeout(Duration::from_secs(5), async {
130          for handle in handles {
131              handle.await.expect("Task panicked");
132          }
133      })
134      .await;
135      
136      assert!(
137          result.is_ok(),
138          "Lock ordering deadlock: mixed Mutex/RwLock acquisition failed"
139      );
140  }
141  
142  /// Stress test: Simulate poll_peers() pattern under load
143  ///
144  /// poll_peers() acquires the peer mutex once to get keys, then reacquires
145  /// per-peer. This tests that pattern doesn't starve or deadlock.
146  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
147  async fn test_poll_peers_pattern() {
148      let node = create_test_node();
149      let num_readers = 10;
150      let num_writers = 5;
151      let iterations = 100;
152      
153      let barrier = Arc::new(Barrier::new(num_readers + num_writers));
154      let mut handles = Vec::new();
155      
156      // Reader tasks: mimic poll_peers() pattern
157      for _ in 0..num_readers {
158          let node = Arc::clone(&node);
159          let barrier = Arc::clone(&barrier);
160          
161          let handle = tokio::spawn(async move {
162              barrier.wait().await;
163              
164              for _ in 0..iterations {
165                  let peers = node.peers();
166                  
167                  // First lock: get keys
168                  let keys: Vec<_> = {
169                      let guard = peers.lock().await;
170                      guard.keys().copied().collect()
171                  };
172                  
173                  // Second lock per key (poll_peers pattern)
174                  for _key in &keys {
175                      let _guard = peers.lock().await;
176                      // Would poll peer here
177                  }
178                  
179                  tokio::task::yield_now().await;
180              }
181          });
182          
183          handles.push(handle);
184      }
185      
186      // Writer tasks: would add/remove peers (currently just access)
187      for _ in 0..num_writers {
188          let node = Arc::clone(&node);
189          let barrier = Arc::clone(&barrier);
190          
191          let handle = tokio::spawn(async move {
192              barrier.wait().await;
193              
194              for _ in 0..iterations {
195                  let peers = node.peers();
196                  let _guard = peers.lock().await;
197                  // Would modify peers here
198                  tokio::task::yield_now().await;
199              }
200          });
201          
202          handles.push(handle);
203      }
204      
205      let result = timeout(Duration::from_secs(10), async {
206          for handle in handles {
207              handle.await.expect("Task panicked");
208          }
209      })
210      .await;
211      
212      assert!(
213          result.is_ok(),
214          "poll_peers pattern deadlock: reader/writer starvation detected"
215      );
216  }
217  
218  // ─────────────────────────────────────────────────────────────
219  // Gossip Protocol Stress Tests
220  // ─────────────────────────────────────────────────────────────
221  
222  /// Stress test: Fanout calculation across size spectrum
223  /// 
224  /// Validates the √n fanout formula performs correctly and consistently
225  /// under rapid repeated invocations.
226  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
227  async fn test_gossip_fanout_stress() {
228      use abzu_core::Switchboard;
229      use std::time::Instant;
230      
231      let iterations = 10_000;
232      let sizes = [1, 5, 10, 25, 50, 100, 200, 500, 1000];
233      
234      let start = Instant::now();
235      
236      for _ in 0..iterations {
237          for &size in &sizes {
238              let fanout = Switchboard::calculate_fanout(size);
239              // Verify bounds invariant
240              assert!(fanout >= 2 && fanout <= 8, "Fanout {} out of bounds for size {}", fanout, size);
241          }
242      }
243      
244      let elapsed = start.elapsed();
245      let ops_per_sec = (iterations * sizes.len()) as f64 / elapsed.as_secs_f64();
246      
247      // Should be extremely fast (>1M ops/sec on modern hardware)
248      assert!(ops_per_sec > 100_000.0, "Fanout calculation too slow: {:.0} ops/sec", ops_per_sec);
249      eprintln!("Fanout performance: {:.0} ops/sec", ops_per_sec);
250  }
251  
252  /// Stress test: Seen-set under concurrent access
253  /// 
254  /// Multiple tasks simultaneously marking and checking message IDs.
255  /// Tests RwLock contention and TTL cleanup under load.
256  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
257  async fn test_gossip_seen_set_stress() {
258      use abzu_core::Switchboard;
259      use std::time::Instant;
260      
261      let node = create_test_node();
262      let switchboard = Arc::new(Switchboard::new(Arc::clone(&node)));
263      let circle = node.create_circle("Stress Circle".to_string()).unwrap();
264      
265      let num_tasks = 20;
266      let msgs_per_task = 500;
267      let barrier = Arc::new(Barrier::new(num_tasks));
268      let mut handles = Vec::new();
269      
270      let start = Instant::now();
271      
272      for task_id in 0..num_tasks {
273          let switchboard = Arc::clone(&switchboard);
274          let circle_id = circle.id;
275          let barrier = Arc::clone(&barrier);
276          
277          let handle = tokio::spawn(async move {
278              barrier.wait().await;
279              
280              let mut seen_count = 0;
281              let mut new_count = 0;
282              
283              for msg_num in 0..msgs_per_task {
284                  // Each task uses its own range of message IDs
285                  let msg_id = (task_id as u64 * msgs_per_task as u64) + msg_num as u64;
286                  
287                  // Check and mark (typical gossip pattern)
288                  if switchboard.already_seen(&circle_id, msg_id).await {
289                      seen_count += 1;
290                  } else {
291                      switchboard.mark_seen(&circle_id, msg_id).await;
292                      new_count += 1;
293                  }
294                  
295                  // Occasionally recheck a message we marked (should be seen)
296                  if msg_num > 0 && msg_num % 10 == 0 {
297                      let prev_id = (task_id as u64 * msgs_per_task as u64) + (msg_num - 1) as u64;
298                      assert!(
299                          switchboard.already_seen(&circle_id, prev_id).await,
300                          "Previously marked message should still be in seen-set"
301                      );
302                  }
303              }
304              
305              (new_count, seen_count)
306          });
307          
308          handles.push(handle);
309      }
310      
311      // Collect results
312      let result = timeout(Duration::from_secs(30), async {
313          let mut total_new = 0;
314          let mut total_seen = 0;
315          for handle in handles {
316              let (new, seen) = handle.await.expect("Task panicked");
317              total_new += new;
318              total_seen += seen;
319          }
320          (total_new, total_seen)
321      })
322      .await;
323      
324      let elapsed = start.elapsed();
325      
326      assert!(result.is_ok(), "Seen-set stress test deadlocked");
327      let (total_new, total_seen) = result.unwrap();
328      
329      let total_ops = total_new + total_seen;
330      let ops_per_sec = total_ops as f64 / elapsed.as_secs_f64();
331      
332      eprintln!(
333          "Seen-set stress: {} ops ({} new, {} seen) in {:?} = {:.0} ops/sec",
334          total_ops, total_new, total_seen, elapsed, ops_per_sec
335      );
336      
337      // With probabilistic CuckooFilter deduplication, we expect ~0.1% false positive rate.
338      // Allow slightly higher (0.5%) to account for test variance.
339      let expected_total = num_tasks * msgs_per_task;
340      let max_false_positives = (expected_total as f64 * 0.005) as usize; // 0.5% tolerance
341      
342      assert!(
343          total_new >= expected_total - max_false_positives,
344          "False positive rate too high: expected {} new, got {} ({:.2}% FP rate)",
345          expected_total,
346          total_new,
347          (expected_total - total_new) as f64 / expected_total as f64 * 100.0
348      );
349      assert!(
350          total_seen <= max_false_positives,
351          "Too many false positives: {} (max allowed: {})",
352          total_seen,
353          max_false_positives
354      );
355  }
356  
357  /// Stress test: Large circle member iteration
358  /// 
359  /// Simulates gossip peer selection in circles of various sizes
360  /// to ensure O(log n) scaling holds under load.
361  #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
362  async fn test_gossip_large_circle_scaling() {
363      use abzu_core::{Switchboard, MemberRole};
364      use std::time::Instant;
365      
366      // Test with circles up to founder's invite capacity (10)
367      // Larger sizes are tested synthetically via calculate_fanout
368      let circle_sizes = [5, 10];
369      let iterations = 100;
370      
371      for &size in &circle_sizes {
372          let node = create_test_node();
373          let circle = node.create_circle(format!("Scale-{}", size)).unwrap();
374          
375          // Add members (up to founder's capacity of 10)
376          for i in 1..size {
377              let mut key = [0u8; 32];
378              key[0] = (i >> 8) as u8;
379              key[1] = (i & 0xff) as u8;
380              node.add_circle_member(node.peer_key(), &circle.id, key, MemberRole::Member).unwrap();
381          }
382          
383          let retrieved = node.get_circle(&circle.id).unwrap().unwrap();
384          assert_eq!(retrieved.members.len(), size, "Circle should have {} members", size);
385          
386          // Time fanout calculations
387          let start = Instant::now();
388          for _ in 0..iterations {
389              let fanout = Switchboard::calculate_fanout(size);
390              // fanout should be √n, clamped
391              let expected_min = 2;
392              let expected_max = 8;
393              assert!(fanout >= expected_min && fanout <= expected_max);
394          }
395          let elapsed = start.elapsed();
396          
397          eprintln!(
398              "Circle size {}: {} iterations in {:?} ({:.2}µs/iter)",
399              size, iterations, elapsed, elapsed.as_micros() as f64 / iterations as f64
400          );
401      }
402      
403      // All sizes should complete quickly - this is a sanity check for scaling
404      // The test itself exercises membership management at scale
405  }
406  
407  /// Stress test: Concurrent circle operations
408  /// 
409  /// Multiple tasks perform circle operations (create, add members, query)
410  /// concurrently to detect any lock contention issues.
411  #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
412  async fn test_concurrent_circle_operations() {
413      let node = create_test_node();
414      let num_tasks = 10;
415      let circles_per_task = 5;
416      let members_per_circle = 10;
417      
418      let barrier = Arc::new(Barrier::new(num_tasks));
419      let mut handles = Vec::new();
420      
421      for task_id in 0..num_tasks {
422          let node = Arc::clone(&node);
423          let barrier = Arc::clone(&barrier);
424          
425          let handle = tokio::spawn(async move {
426              barrier.wait().await;
427              
428              for circle_num in 0..circles_per_task {
429                  // Create circle
430                  let name = format!("T{}-C{}", task_id, circle_num);
431                  let circle = node.create_circle(name).unwrap();
432                  
433                  // Add members
434                  for member_num in 0..members_per_circle {
435                      let mut key = [0u8; 32];
436                      key[0] = task_id as u8;
437                      key[1] = circle_num as u8;
438                      key[2] = member_num as u8;
439                      node.add_circle_member(node.peer_key(), &circle.id, key, abzu_core::MemberRole::Member).unwrap();
440                  }
441                  
442                  // Query circle
443                  let retrieved = node.get_circle(&circle.id).unwrap().unwrap();
444                  assert_eq!(retrieved.members.len(), members_per_circle + 1); // +1 for founder
445                  
446                  tokio::task::yield_now().await;
447              }
448              
449              task_id
450          });
451          
452          handles.push(handle);
453      }
454      
455      let result = timeout(Duration::from_secs(30), async {
456          for handle in handles {
457              handle.await.expect("Task panicked");
458          }
459      })
460      .await;
461      
462      assert!(result.is_ok(), "Concurrent circle operations deadlocked");
463      
464      // Verify all circles were created
465      let all_circles = node.list_circles().unwrap();
466      assert_eq!(
467          all_circles.len(),
468          num_tasks * circles_per_task,
469          "Expected {} circles", num_tasks * circles_per_task
470      );
471  }