stress_tests.rs
1 //! Concurrency Stress Tests for Abzu Core 2 //! 3 //! These tests specifically target potential deadlock and race condition 4 //! scenarios in the `Arc<Mutex<HashMap>>` peer management pattern. 5 6 use std::sync::Arc; 7 use std::time::Duration; 8 9 use abzu_core::{Node, NodeConfig}; 10 use tokio::sync::Barrier; 11 use tokio::time::timeout; 12 13 /// Test helper: create a node with temp storage 14 fn create_test_node() -> Arc<Node> { 15 let dir = tempfile::tempdir().unwrap(); 16 let config = NodeConfig { 17 storage_path: dir.path().to_str().unwrap().to_string(), 18 ..Default::default() 19 }; 20 Arc::new(Node::new(config).unwrap()) 21 } 22 23 /// Stress test: Multiple tasks concurrently accessing the peer map 24 /// 25 /// This catches the common deadlock pattern where async code holds a mutex 26 /// across an await point, or where multiple mutexes are acquired in 27 /// inconsistent order. 28 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 29 async fn test_concurrent_peer_map_access() { 30 let node = create_test_node(); 31 let num_tasks = 50; 32 let ops_per_task = 100; 33 34 // Barrier ensures all tasks start simultaneously 35 let barrier = Arc::new(Barrier::new(num_tasks)); 36 37 let mut handles = Vec::new(); 38 39 for task_id in 0..num_tasks { 40 let node = Arc::clone(&node); 41 let barrier = Arc::clone(&barrier); 42 43 let handle = tokio::spawn(async move { 44 barrier.wait().await; 45 46 for op in 0..ops_per_task { 47 // Alternate between read and write operations 48 if op % 2 == 0 { 49 // Read operation: get peer count 50 let peers = node.peers(); 51 let _count = peers.lock().await.len(); 52 } else { 53 // Write operation would go here (peer add/remove) 54 // For now just access the router (RwLock) 55 let router = node.router(); 56 let _table = router.read().await; 57 } 58 59 // Tiny yield to increase interleaving 60 if op % 10 == 0 { 61 tokio::task::yield_now().await; 62 } 63 } 64 65 task_id 66 }); 67 68 handles.push(handle); 69 } 70 71 // All tasks should complete within 5 seconds (generous timeout) 72 // Deadlock would cause this to hang 73 let result = timeout(Duration::from_secs(5), async { 74 for handle in handles { 75 handle.await.expect("Task panicked"); 76 } 77 }) 78 .await; 79 80 assert!(result.is_ok(), "Deadlock detected: tasks did not complete in time"); 81 } 82 83 /// Stress test: Interleaved Mutex and RwLock access 84 /// 85 /// The Node uses both `Arc<Mutex<HashMap>>` for peers and `Arc<RwLock<RoutingTable>>` 86 /// for routing. If the locking order is ever inconsistent, this test will deadlock. 87 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 88 async fn test_mixed_lock_acquisition() { 89 let node = create_test_node(); 90 let num_tasks = 20; 91 let ops_per_task = 50; 92 93 let barrier = Arc::new(Barrier::new(num_tasks)); 94 let mut handles = Vec::new(); 95 96 for task_id in 0..num_tasks { 97 let node = Arc::clone(&node); 98 let barrier = Arc::clone(&barrier); 99 100 let handle = tokio::spawn(async move { 101 barrier.wait().await; 102 103 for _ in 0..ops_per_task { 104 // Pattern 1: peers then router 105 { 106 let peers = node.peers(); 107 let _p = peers.lock().await; 108 let router = node.router(); 109 let _r = router.read().await; 110 } 111 112 // Pattern 2: router then peers (inverted order) 113 { 114 let router = node.router(); 115 let _r = router.read().await; 116 let peers = node.peers(); 117 let _p = peers.lock().await; 118 } 119 120 tokio::task::yield_now().await; 121 } 122 123 task_id 124 }); 125 126 handles.push(handle); 127 } 128 129 let result = timeout(Duration::from_secs(5), async { 130 for handle in handles { 131 handle.await.expect("Task panicked"); 132 } 133 }) 134 .await; 135 136 assert!( 137 result.is_ok(), 138 "Lock ordering deadlock: mixed Mutex/RwLock acquisition failed" 139 ); 140 } 141 142 /// Stress test: Simulate poll_peers() pattern under load 143 /// 144 /// poll_peers() acquires the peer mutex once to get keys, then reacquires 145 /// per-peer. This tests that pattern doesn't starve or deadlock. 146 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 147 async fn test_poll_peers_pattern() { 148 let node = create_test_node(); 149 let num_readers = 10; 150 let num_writers = 5; 151 let iterations = 100; 152 153 let barrier = Arc::new(Barrier::new(num_readers + num_writers)); 154 let mut handles = Vec::new(); 155 156 // Reader tasks: mimic poll_peers() pattern 157 for _ in 0..num_readers { 158 let node = Arc::clone(&node); 159 let barrier = Arc::clone(&barrier); 160 161 let handle = tokio::spawn(async move { 162 barrier.wait().await; 163 164 for _ in 0..iterations { 165 let peers = node.peers(); 166 167 // First lock: get keys 168 let keys: Vec<_> = { 169 let guard = peers.lock().await; 170 guard.keys().copied().collect() 171 }; 172 173 // Second lock per key (poll_peers pattern) 174 for _key in &keys { 175 let _guard = peers.lock().await; 176 // Would poll peer here 177 } 178 179 tokio::task::yield_now().await; 180 } 181 }); 182 183 handles.push(handle); 184 } 185 186 // Writer tasks: would add/remove peers (currently just access) 187 for _ in 0..num_writers { 188 let node = Arc::clone(&node); 189 let barrier = Arc::clone(&barrier); 190 191 let handle = tokio::spawn(async move { 192 barrier.wait().await; 193 194 for _ in 0..iterations { 195 let peers = node.peers(); 196 let _guard = peers.lock().await; 197 // Would modify peers here 198 tokio::task::yield_now().await; 199 } 200 }); 201 202 handles.push(handle); 203 } 204 205 let result = timeout(Duration::from_secs(10), async { 206 for handle in handles { 207 handle.await.expect("Task panicked"); 208 } 209 }) 210 .await; 211 212 assert!( 213 result.is_ok(), 214 "poll_peers pattern deadlock: reader/writer starvation detected" 215 ); 216 } 217 218 // ───────────────────────────────────────────────────────────── 219 // Gossip Protocol Stress Tests 220 // ───────────────────────────────────────────────────────────── 221 222 /// Stress test: Fanout calculation across size spectrum 223 /// 224 /// Validates the √n fanout formula performs correctly and consistently 225 /// under rapid repeated invocations. 226 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 227 async fn test_gossip_fanout_stress() { 228 use abzu_core::Switchboard; 229 use std::time::Instant; 230 231 let iterations = 10_000; 232 let sizes = [1, 5, 10, 25, 50, 100, 200, 500, 1000]; 233 234 let start = Instant::now(); 235 236 for _ in 0..iterations { 237 for &size in &sizes { 238 let fanout = Switchboard::calculate_fanout(size); 239 // Verify bounds invariant 240 assert!(fanout >= 2 && fanout <= 8, "Fanout {} out of bounds for size {}", fanout, size); 241 } 242 } 243 244 let elapsed = start.elapsed(); 245 let ops_per_sec = (iterations * sizes.len()) as f64 / elapsed.as_secs_f64(); 246 247 // Should be extremely fast (>1M ops/sec on modern hardware) 248 assert!(ops_per_sec > 100_000.0, "Fanout calculation too slow: {:.0} ops/sec", ops_per_sec); 249 eprintln!("Fanout performance: {:.0} ops/sec", ops_per_sec); 250 } 251 252 /// Stress test: Seen-set under concurrent access 253 /// 254 /// Multiple tasks simultaneously marking and checking message IDs. 255 /// Tests RwLock contention and TTL cleanup under load. 256 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 257 async fn test_gossip_seen_set_stress() { 258 use abzu_core::Switchboard; 259 use std::time::Instant; 260 261 let node = create_test_node(); 262 let switchboard = Arc::new(Switchboard::new(Arc::clone(&node))); 263 let circle = node.create_circle("Stress Circle".to_string()).unwrap(); 264 265 let num_tasks = 20; 266 let msgs_per_task = 500; 267 let barrier = Arc::new(Barrier::new(num_tasks)); 268 let mut handles = Vec::new(); 269 270 let start = Instant::now(); 271 272 for task_id in 0..num_tasks { 273 let switchboard = Arc::clone(&switchboard); 274 let circle_id = circle.id; 275 let barrier = Arc::clone(&barrier); 276 277 let handle = tokio::spawn(async move { 278 barrier.wait().await; 279 280 let mut seen_count = 0; 281 let mut new_count = 0; 282 283 for msg_num in 0..msgs_per_task { 284 // Each task uses its own range of message IDs 285 let msg_id = (task_id as u64 * msgs_per_task as u64) + msg_num as u64; 286 287 // Check and mark (typical gossip pattern) 288 if switchboard.already_seen(&circle_id, msg_id).await { 289 seen_count += 1; 290 } else { 291 switchboard.mark_seen(&circle_id, msg_id).await; 292 new_count += 1; 293 } 294 295 // Occasionally recheck a message we marked (should be seen) 296 if msg_num > 0 && msg_num % 10 == 0 { 297 let prev_id = (task_id as u64 * msgs_per_task as u64) + (msg_num - 1) as u64; 298 assert!( 299 switchboard.already_seen(&circle_id, prev_id).await, 300 "Previously marked message should still be in seen-set" 301 ); 302 } 303 } 304 305 (new_count, seen_count) 306 }); 307 308 handles.push(handle); 309 } 310 311 // Collect results 312 let result = timeout(Duration::from_secs(30), async { 313 let mut total_new = 0; 314 let mut total_seen = 0; 315 for handle in handles { 316 let (new, seen) = handle.await.expect("Task panicked"); 317 total_new += new; 318 total_seen += seen; 319 } 320 (total_new, total_seen) 321 }) 322 .await; 323 324 let elapsed = start.elapsed(); 325 326 assert!(result.is_ok(), "Seen-set stress test deadlocked"); 327 let (total_new, total_seen) = result.unwrap(); 328 329 let total_ops = total_new + total_seen; 330 let ops_per_sec = total_ops as f64 / elapsed.as_secs_f64(); 331 332 eprintln!( 333 "Seen-set stress: {} ops ({} new, {} seen) in {:?} = {:.0} ops/sec", 334 total_ops, total_new, total_seen, elapsed, ops_per_sec 335 ); 336 337 // With probabilistic CuckooFilter deduplication, we expect ~0.1% false positive rate. 338 // Allow slightly higher (0.5%) to account for test variance. 339 let expected_total = num_tasks * msgs_per_task; 340 let max_false_positives = (expected_total as f64 * 0.005) as usize; // 0.5% tolerance 341 342 assert!( 343 total_new >= expected_total - max_false_positives, 344 "False positive rate too high: expected {} new, got {} ({:.2}% FP rate)", 345 expected_total, 346 total_new, 347 (expected_total - total_new) as f64 / expected_total as f64 * 100.0 348 ); 349 assert!( 350 total_seen <= max_false_positives, 351 "Too many false positives: {} (max allowed: {})", 352 total_seen, 353 max_false_positives 354 ); 355 } 356 357 /// Stress test: Large circle member iteration 358 /// 359 /// Simulates gossip peer selection in circles of various sizes 360 /// to ensure O(log n) scaling holds under load. 361 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 362 async fn test_gossip_large_circle_scaling() { 363 use abzu_core::{Switchboard, MemberRole}; 364 use std::time::Instant; 365 366 // Test with circles up to founder's invite capacity (10) 367 // Larger sizes are tested synthetically via calculate_fanout 368 let circle_sizes = [5, 10]; 369 let iterations = 100; 370 371 for &size in &circle_sizes { 372 let node = create_test_node(); 373 let circle = node.create_circle(format!("Scale-{}", size)).unwrap(); 374 375 // Add members (up to founder's capacity of 10) 376 for i in 1..size { 377 let mut key = [0u8; 32]; 378 key[0] = (i >> 8) as u8; 379 key[1] = (i & 0xff) as u8; 380 node.add_circle_member(node.peer_key(), &circle.id, key, MemberRole::Member).unwrap(); 381 } 382 383 let retrieved = node.get_circle(&circle.id).unwrap().unwrap(); 384 assert_eq!(retrieved.members.len(), size, "Circle should have {} members", size); 385 386 // Time fanout calculations 387 let start = Instant::now(); 388 for _ in 0..iterations { 389 let fanout = Switchboard::calculate_fanout(size); 390 // fanout should be √n, clamped 391 let expected_min = 2; 392 let expected_max = 8; 393 assert!(fanout >= expected_min && fanout <= expected_max); 394 } 395 let elapsed = start.elapsed(); 396 397 eprintln!( 398 "Circle size {}: {} iterations in {:?} ({:.2}µs/iter)", 399 size, iterations, elapsed, elapsed.as_micros() as f64 / iterations as f64 400 ); 401 } 402 403 // All sizes should complete quickly - this is a sanity check for scaling 404 // The test itself exercises membership management at scale 405 } 406 407 /// Stress test: Concurrent circle operations 408 /// 409 /// Multiple tasks perform circle operations (create, add members, query) 410 /// concurrently to detect any lock contention issues. 411 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 412 async fn test_concurrent_circle_operations() { 413 let node = create_test_node(); 414 let num_tasks = 10; 415 let circles_per_task = 5; 416 let members_per_circle = 10; 417 418 let barrier = Arc::new(Barrier::new(num_tasks)); 419 let mut handles = Vec::new(); 420 421 for task_id in 0..num_tasks { 422 let node = Arc::clone(&node); 423 let barrier = Arc::clone(&barrier); 424 425 let handle = tokio::spawn(async move { 426 barrier.wait().await; 427 428 for circle_num in 0..circles_per_task { 429 // Create circle 430 let name = format!("T{}-C{}", task_id, circle_num); 431 let circle = node.create_circle(name).unwrap(); 432 433 // Add members 434 for member_num in 0..members_per_circle { 435 let mut key = [0u8; 32]; 436 key[0] = task_id as u8; 437 key[1] = circle_num as u8; 438 key[2] = member_num as u8; 439 node.add_circle_member(node.peer_key(), &circle.id, key, abzu_core::MemberRole::Member).unwrap(); 440 } 441 442 // Query circle 443 let retrieved = node.get_circle(&circle.id).unwrap().unwrap(); 444 assert_eq!(retrieved.members.len(), members_per_circle + 1); // +1 for founder 445 446 tokio::task::yield_now().await; 447 } 448 449 task_id 450 }); 451 452 handles.push(handle); 453 } 454 455 let result = timeout(Duration::from_secs(30), async { 456 for handle in handles { 457 handle.await.expect("Task panicked"); 458 } 459 }) 460 .await; 461 462 assert!(result.is_ok(), "Concurrent circle operations deadlocked"); 463 464 // Verify all circles were created 465 let all_circles = node.list_circles().unwrap(); 466 assert_eq!( 467 all_circles.len(), 468 num_tasks * circles_per_task, 469 "Expected {} circles", num_tasks * circles_per_task 470 ); 471 }