dht_executor.rs
1 //! DHT Executor: Bridge between DhtOp planning and abzu-dht primitives 2 //! 3 //! This module implements the I/O layer that actually executes DHT operations. 4 //! It takes the pure DhtOp values from the planning layer and drives the 5 //! abzu-dht state machines, producing protocol messages for the network. 6 7 use std::collections::HashMap; 8 use std::time::{Duration, Instant}; 9 10 use abzu_dht::{ 11 DhtKey, NodeInfo, RoutingTable, 12 StoredValue, ValueType, 13 protocol::{DhtMessage, DhtResponse, MessageBuilder}, 14 lookup::ValueLookup, 15 }; 16 17 use crate::discovery::DhtOp; 18 19 // ============================================================================ 20 // Configuration 21 // ============================================================================ 22 23 /// Configuration for the DHT executor 24 #[derive(Debug, Clone)] 25 pub struct ExecutorConfig { 26 /// Number of parallel requests for lookups (alpha) 27 pub alpha: usize, 28 /// Maximum nodes to return from a query (k) 29 pub k: usize, 30 /// Timeout for individual requests 31 pub request_timeout: Duration, 32 /// Maximum pending operations 33 pub max_pending_ops: usize, 34 /// Replication factor for stores 35 pub replication_factor: usize, 36 } 37 38 impl Default for ExecutorConfig { 39 fn default() -> Self { 40 Self { 41 alpha: 3, 42 k: 20, 43 request_timeout: Duration::from_secs(5), 44 max_pending_ops: 100, 45 replication_factor: 3, 46 } 47 } 48 } 49 50 // ============================================================================ 51 // Action Types 52 // ============================================================================ 53 54 /// Actions the executor needs the I/O layer to perform 55 #[derive(Debug, Clone)] 56 pub enum DhtAction { 57 /// Send a DHT message to a specific peer 58 Send { 59 target: NodeInfo, 60 message: DhtMessage, 61 op_id: u64, 62 }, 63 64 /// Store operation completed successfully 65 StoreComplete { 66 op_id: u64, 67 key: DhtKey, 68 stored_on: Vec<NodeInfo>, 69 }, 70 71 /// Query operation completed with results 72 QueryComplete { 73 op_id: u64, 74 key: DhtKey, 75 values: Vec<StoredValue>, 76 }, 77 78 /// Operation failed 79 Failed { 80 op_id: u64, 81 error: String, 82 }, 83 } 84 85 /// Events returned by the executor when processing 86 #[derive(Debug)] 87 pub enum ExecutorEvent { 88 /// Actions that need to be performed 89 Actions(Vec<DhtAction>), 90 /// Operation completed (with result action) 91 Complete(DhtAction), 92 /// More responses needed to complete 93 Pending, 94 } 95 96 // ============================================================================ 97 // Pending Operation Tracking 98 // ============================================================================ 99 100 /// A pending store operation 101 struct PendingStore { 102 key: DhtKey, 103 value: StoredValue, 104 /// Peers we've successfully stored on 105 stored_on: Vec<NodeInfo>, 106 /// Peers we're waiting on 107 pending_peers: Vec<NodeInfo>, 108 /// Target replication count 109 target_count: usize, 110 started_at: Instant, 111 } 112 113 /// A pending query operation using ValueLookup state machine 114 struct PendingQuery { 115 key: DhtKey, 116 lookup: ValueLookup, 117 /// Values found so far 118 found_values: Vec<StoredValue>, 119 started_at: Instant, 120 } 121 122 /// Tracks a pending operation 123 enum PendingOp { 124 Store(PendingStore), 125 Query(PendingQuery), 126 } 127 128 // ============================================================================ 129 // DHT Executor 130 // ============================================================================ 131 132 /// The DHT executor bridges high-level DhtOp commands to low-level DHT protocol 133 pub struct DhtExecutor { 134 /// Our node information 135 our_node: NodeInfo, 136 137 /// Message builder for creating protocol messages 138 msg_builder: MessageBuilder, 139 140 /// Routing table for finding peers 141 routing_table: RoutingTable, 142 143 /// Pending operations by ID 144 pending: HashMap<u64, PendingOp>, 145 146 /// Next operation ID 147 next_op_id: u64, 148 149 /// Configuration 150 config: ExecutorConfig, 151 } 152 153 impl DhtExecutor { 154 /// Create a new executor 155 pub fn new(our_node: NodeInfo, config: ExecutorConfig) -> Self { 156 let routing_table = RoutingTable::new(our_node.id); 157 let msg_builder = MessageBuilder::new(our_node.clone()); 158 159 Self { 160 our_node, 161 msg_builder, 162 routing_table, 163 pending: HashMap::new(), 164 next_op_id: 1, 165 config, 166 } 167 } 168 169 /// Get next operation ID 170 fn next_op_id(&mut self) -> u64 { 171 let id = self.next_op_id; 172 self.next_op_id += 1; 173 id 174 } 175 176 /// Add a peer to the routing table 177 pub fn add_peer(&mut self, peer: NodeInfo, now: u64) { 178 self.routing_table.add(peer, now); 179 } 180 181 /// Submit a DHT operation for execution 182 pub fn submit(&mut self, op: DhtOp, now: u64) -> (u64, ExecutorEvent) { 183 let op_id = self.next_op_id(); 184 185 match op { 186 DhtOp::Store { circle_key, capability_keys, payload, ttl } => { 187 self.start_store(op_id, circle_key, capability_keys, payload, ttl, now) 188 } 189 DhtOp::Query { key } => { 190 self.start_query(op_id, key, now) 191 } 192 } 193 } 194 195 /// Start a store operation 196 fn start_store( 197 &mut self, 198 op_id: u64, 199 circle_key: DhtKey, 200 _capability_keys: Vec<[u8; 32]>, 201 payload: Vec<u8>, 202 ttl: u64, 203 now: u64, 204 ) -> (u64, ExecutorEvent) { 205 // Find closest nodes to store on (using circle_key as primary) 206 let closest = self.routing_table.closest(&circle_key, self.config.k); 207 208 if closest.nodes.is_empty() { 209 return (op_id, ExecutorEvent::Complete(DhtAction::Failed { 210 op_id, 211 error: "No peers in routing table".to_string(), 212 })); 213 } 214 215 // Create the stored value using the proper API 216 let stored_value = StoredValue::new( 217 circle_key, 218 ValueType::Application, // Generic application data for circle announcements 219 self.our_node.id, // Publisher is our node 220 payload, 221 now, 222 Some(ttl), 223 ); 224 225 // Build store messages for closest nodes 226 let mut actions = Vec::new(); 227 let target_count = self.config.replication_factor.min(closest.nodes.len()); 228 let targets: Vec<NodeInfo> = closest.nodes.iter() 229 .take(target_count) 230 .cloned() 231 .collect(); 232 233 for peer in &targets { 234 let message = self.msg_builder.store(stored_value.clone()); 235 actions.push(DhtAction::Send { 236 target: peer.clone(), 237 message, 238 op_id, 239 }); 240 } 241 242 // Track pending store 243 self.pending.insert(op_id, PendingOp::Store(PendingStore { 244 key: circle_key, 245 value: stored_value, 246 stored_on: Vec::new(), 247 pending_peers: targets, 248 target_count, 249 started_at: Instant::now(), 250 })); 251 252 (op_id, ExecutorEvent::Actions(actions)) 253 } 254 255 /// Start a query operation 256 fn start_query( 257 &mut self, 258 op_id: u64, 259 key: DhtKey, 260 _now: u64, 261 ) -> (u64, ExecutorEvent) { 262 // Get seed nodes for the lookup 263 let seeds: Vec<NodeInfo> = self.routing_table 264 .closest(&key, self.config.alpha) 265 .nodes; 266 267 if seeds.is_empty() { 268 return (op_id, ExecutorEvent::Complete(DhtAction::Failed { 269 op_id, 270 error: "No peers in routing table".to_string(), 271 })); 272 } 273 274 // Create lookup state machine (needs our_id as second param) 275 let mut lookup = ValueLookup::new(key, self.our_node.id, seeds); 276 277 // Get initial actions from lookup 278 let actions = self.drive_lookup(op_id, &mut lookup); 279 280 // Track pending query 281 self.pending.insert(op_id, PendingOp::Query(PendingQuery { 282 key, 283 lookup, 284 found_values: Vec::new(), 285 started_at: Instant::now(), 286 })); 287 288 (op_id, ExecutorEvent::Actions(actions)) 289 } 290 291 /// Extract pending queries from a lookup and convert to actions 292 fn drive_lookup(&self, op_id: u64, lookup: &mut ValueLookup) -> Vec<DhtAction> { 293 let mut actions = Vec::new(); 294 295 // ValueLookup exposes next_queries() with nodes to query 296 for node in lookup.next_queries() { 297 let message = self.msg_builder.find_value(*lookup.target()); 298 actions.push(DhtAction::Send { 299 target: node, 300 message, 301 op_id, 302 }); 303 } 304 305 actions 306 } 307 308 /// Handle a response from the network 309 pub fn on_response( 310 &mut self, 311 op_id: u64, 312 from: &NodeInfo, 313 response: DhtResponse, 314 now: u64, 315 ) -> ExecutorEvent { 316 // Update routing table with responding peer 317 self.routing_table.add(from.clone(), now); 318 319 // Remove the operation to get ownership (avoids borrow issues) 320 let Some(mut op) = self.pending.remove(&op_id) else { 321 return ExecutorEvent::Pending; 322 }; 323 324 let (event, should_reinsert) = match &mut op { 325 PendingOp::Store(store) => { 326 self.handle_store_response(op_id, from, response, store) 327 } 328 PendingOp::Query(query) => { 329 self.handle_query_response(op_id, from, response, query) 330 } 331 }; 332 333 // Reinsert if not complete 334 if should_reinsert { 335 self.pending.insert(op_id, op); 336 } 337 338 event 339 } 340 341 /// Handle store response 342 fn handle_store_response( 343 &self, 344 op_id: u64, 345 from: &NodeInfo, 346 response: DhtResponse, 347 store: &mut PendingStore, 348 ) -> (ExecutorEvent, bool) { 349 // Remove from pending peers 350 store.pending_peers.retain(|p| p.id != from.id); 351 352 match response { 353 DhtResponse::Stored { success, .. } if success => { 354 store.stored_on.push(from.clone()); 355 } 356 DhtResponse::Error { message, .. } => { 357 eprintln!("Store failed on peer: {}", message); 358 } 359 _ => {} 360 } 361 362 // Check if we're done 363 if store.stored_on.len() >= store.target_count || store.pending_peers.is_empty() { 364 let result = if store.stored_on.is_empty() { 365 DhtAction::Failed { 366 op_id, 367 error: "Failed to store on any peer".to_string(), 368 } 369 } else { 370 DhtAction::StoreComplete { 371 op_id, 372 key: store.key, 373 stored_on: store.stored_on.clone(), 374 } 375 }; 376 377 (ExecutorEvent::Complete(result), false) 378 } else { 379 (ExecutorEvent::Pending, true) 380 } 381 } 382 383 /// Handle query response 384 fn handle_query_response( 385 &self, 386 op_id: u64, 387 from: &NodeInfo, 388 response: DhtResponse, 389 query: &mut PendingQuery, 390 ) -> (ExecutorEvent, bool) { 391 match response { 392 DhtResponse::Value { values, closer_nodes } => { 393 // Found values! 394 query.found_values.extend(values.clone()); 395 396 // Feed response to lookup state machine 397 query.lookup.on_value(&from.id, values, closer_nodes); 398 } 399 DhtResponse::Nodes { nodes } => { 400 // No value, but got closer nodes 401 query.lookup.on_nodes(&from.id, nodes); 402 } 403 DhtResponse::Error { message, .. } => { 404 eprintln!("Query error from {}: {}", hex::encode(&from.id[..4]), message); 405 query.lookup.on_failure(&from.id); 406 } 407 _ => {} 408 } 409 410 // Check if lookup is complete 411 if query.lookup.is_complete() { 412 let result = DhtAction::QueryComplete { 413 op_id, 414 key: query.key, 415 values: query.found_values.clone(), 416 }; 417 return (ExecutorEvent::Complete(result), false); 418 } 419 420 // Get more actions from lookup - need mutable access 421 let actions = { 422 let mut local_actions = Vec::new(); 423 for node in query.lookup.next_queries() { 424 let message = self.msg_builder.find_value(*query.lookup.target()); 425 local_actions.push(DhtAction::Send { 426 target: node, 427 message, 428 op_id, 429 }); 430 } 431 local_actions 432 }; 433 434 if actions.is_empty() { 435 (ExecutorEvent::Pending, true) 436 } else { 437 (ExecutorEvent::Actions(actions), true) 438 } 439 } 440 441 /// Check for timed-out operations 442 pub fn check_timeouts(&mut self) -> Vec<DhtAction> { 443 let timeout = self.config.request_timeout; 444 let now = Instant::now(); 445 446 let timed_out: Vec<u64> = self.pending 447 .iter() 448 .filter_map(|(id, op)| { 449 let started = match op { 450 PendingOp::Store(s) => s.started_at, 451 PendingOp::Query(q) => q.started_at, 452 }; 453 if now.duration_since(started) > timeout { 454 Some(*id) 455 } else { 456 None 457 } 458 }) 459 .collect(); 460 461 timed_out 462 .into_iter() 463 .filter_map(|op_id| { 464 self.pending.remove(&op_id).map(|op| { 465 let key = match &op { 466 PendingOp::Store(s) => s.key, 467 PendingOp::Query(q) => q.key, 468 }; 469 DhtAction::Failed { 470 op_id, 471 error: format!("Operation timed out for key {}", hex::encode(&key[..8])), 472 } 473 }) 474 }) 475 .collect() 476 } 477 478 /// Get statistics about pending operations 479 pub fn stats(&self) -> ExecutorStats { 480 let mut stores = 0; 481 let mut queries = 0; 482 483 for op in self.pending.values() { 484 match op { 485 PendingOp::Store(_) => stores += 1, 486 PendingOp::Query(_) => queries += 1, 487 } 488 } 489 490 ExecutorStats { 491 pending_stores: stores, 492 pending_queries: queries, 493 routing_table_size: self.routing_table.len(), 494 } 495 } 496 } 497 498 /// Statistics about the executor state 499 #[derive(Debug, Clone)] 500 pub struct ExecutorStats { 501 pub pending_stores: usize, 502 pub pending_queries: usize, 503 pub routing_table_size: usize, 504 } 505 506 // ============================================================================ 507 // Tests 508 // ============================================================================ 509 510 #[cfg(test)] 511 mod tests { 512 use super::*; 513 514 fn make_node(id_byte: u8) -> NodeInfo { 515 let id = [id_byte; 32]; 516 NodeInfo::new(id, format!("127.0.0.1:800{}", id_byte % 10), 1000) 517 } 518 519 fn make_executor() -> DhtExecutor { 520 let our_node = make_node(0x00); 521 DhtExecutor::new(our_node, ExecutorConfig::default()) 522 } 523 524 #[test] 525 fn test_submit_store_no_peers() { 526 let mut executor = make_executor(); 527 528 let op = DhtOp::Store { 529 circle_key: [0x42; 32], 530 capability_keys: vec![], 531 payload: b"test data".to_vec(), 532 ttl: 3600, 533 }; 534 535 let (op_id, event) = executor.submit(op, 1000); 536 537 match event { 538 ExecutorEvent::Complete(DhtAction::Failed { error, .. }) => { 539 assert!(error.contains("No peers")); 540 } 541 _ => panic!("Expected failure with no peers"), 542 } 543 assert_eq!(op_id, 1); 544 } 545 546 #[test] 547 fn test_submit_store_with_peers() { 548 let mut executor = make_executor(); 549 550 // Add some peers 551 for i in 1..5u8 { 552 executor.add_peer(make_node(i), 1000); 553 } 554 555 let op = DhtOp::Store { 556 circle_key: [0x42; 32], 557 capability_keys: vec![], 558 payload: b"test data".to_vec(), 559 ttl: 3600, 560 }; 561 562 let (op_id, event) = executor.submit(op, 1000); 563 564 match event { 565 ExecutorEvent::Actions(actions) => { 566 // Should send to replication_factor peers (default 3) 567 assert_eq!(actions.len(), 3); 568 for action in actions { 569 match action { 570 DhtAction::Send { message, .. } => { 571 // Should be a Store request 572 assert!(matches!(message, DhtMessage::Request(_))); 573 } 574 _ => panic!("Expected Send action"), 575 } 576 } 577 } 578 _ => panic!("Expected Actions event"), 579 } 580 581 // Should have pending operation 582 assert!(executor.pending.contains_key(&op_id)); 583 } 584 585 #[test] 586 fn test_store_completes_on_response() { 587 let mut executor = make_executor(); 588 589 let peer1 = make_node(1); 590 let peer2 = make_node(2); 591 let peer3 = make_node(3); 592 executor.add_peer(peer1.clone(), 1000); 593 executor.add_peer(peer2.clone(), 1000); 594 executor.add_peer(peer3.clone(), 1000); 595 596 let op = DhtOp::Store { 597 circle_key: [0x42; 32], 598 capability_keys: vec![], 599 payload: b"test data".to_vec(), 600 ttl: 3600, 601 }; 602 603 let (op_id, _) = executor.submit(op, 1000); 604 605 // Simulate successful responses 606 let stored_response = DhtResponse::Stored { success: true, error: None }; 607 608 let event1 = executor.on_response(op_id, &peer1, stored_response.clone(), 1001); 609 assert!(matches!(event1, ExecutorEvent::Pending)); 610 611 let event2 = executor.on_response(op_id, &peer2, stored_response.clone(), 1002); 612 assert!(matches!(event2, ExecutorEvent::Pending)); 613 614 let event3 = executor.on_response(op_id, &peer3, stored_response, 1003); 615 616 match event3 { 617 ExecutorEvent::Complete(DhtAction::StoreComplete { stored_on, .. }) => { 618 assert_eq!(stored_on.len(), 3); 619 } 620 _ => panic!("Expected StoreComplete"), 621 } 622 623 // Operation should be removed 624 assert!(!executor.pending.contains_key(&op_id)); 625 } 626 627 #[test] 628 fn test_submit_query_no_peers() { 629 let mut executor = make_executor(); 630 631 let op = DhtOp::Query { key: [0x42; 32] }; 632 let (_, event) = executor.submit(op, 1000); 633 634 match event { 635 ExecutorEvent::Complete(DhtAction::Failed { error, .. }) => { 636 assert!(error.contains("No peers")); 637 } 638 _ => panic!("Expected failure with no peers"), 639 } 640 } 641 642 #[test] 643 fn test_stats() { 644 let mut executor = make_executor(); 645 646 for i in 1..5u8 { 647 executor.add_peer(make_node(i), 1000); 648 } 649 650 let stats = executor.stats(); 651 assert_eq!(stats.pending_stores, 0); 652 assert_eq!(stats.pending_queries, 0); 653 assert_eq!(stats.routing_table_size, 4); 654 655 // Submit a store 656 let op = DhtOp::Store { 657 circle_key: [0x42; 32], 658 capability_keys: vec![], 659 payload: b"test".to_vec(), 660 ttl: 3600, 661 }; 662 executor.submit(op, 1000); 663 664 let stats = executor.stats(); 665 assert_eq!(stats.pending_stores, 1); 666 } 667 }