/ abzu-inference / src / dht_executor.rs
dht_executor.rs
  1  //! DHT Executor: Bridge between DhtOp planning and abzu-dht primitives
  2  //!
  3  //! This module implements the I/O layer that actually executes DHT operations.
  4  //! It takes the pure DhtOp values from the planning layer and drives the
  5  //! abzu-dht state machines, producing protocol messages for the network.
  6  
  7  use std::collections::HashMap;
  8  use std::time::{Duration, Instant};
  9  
 10  use abzu_dht::{
 11      DhtKey, NodeInfo, RoutingTable,
 12      StoredValue, ValueType,
 13      protocol::{DhtMessage, DhtResponse, MessageBuilder},
 14      lookup::ValueLookup,
 15  };
 16  
 17  use crate::discovery::DhtOp;
 18  
 19  // ============================================================================
 20  // Configuration
 21  // ============================================================================
 22  
 23  /// Configuration for the DHT executor
 24  #[derive(Debug, Clone)]
 25  pub struct ExecutorConfig {
 26      /// Number of parallel requests for lookups (alpha)
 27      pub alpha: usize,
 28      /// Maximum nodes to return from a query (k)
 29      pub k: usize,
 30      /// Timeout for individual requests
 31      pub request_timeout: Duration,
 32      /// Maximum pending operations
 33      pub max_pending_ops: usize,
 34      /// Replication factor for stores
 35      pub replication_factor: usize,
 36  }
 37  
 38  impl Default for ExecutorConfig {
 39      fn default() -> Self {
 40          Self {
 41              alpha: 3,
 42              k: 20,
 43              request_timeout: Duration::from_secs(5),
 44              max_pending_ops: 100,
 45              replication_factor: 3,
 46          }
 47      }
 48  }
 49  
 50  // ============================================================================
 51  // Action Types
 52  // ============================================================================
 53  
 54  /// Actions the executor needs the I/O layer to perform
 55  #[derive(Debug, Clone)]
 56  pub enum DhtAction {
 57      /// Send a DHT message to a specific peer
 58      Send {
 59          target: NodeInfo,
 60          message: DhtMessage,
 61          op_id: u64,
 62      },
 63      
 64      /// Store operation completed successfully
 65      StoreComplete {
 66          op_id: u64,
 67          key: DhtKey,
 68          stored_on: Vec<NodeInfo>,
 69      },
 70      
 71      /// Query operation completed with results
 72      QueryComplete {
 73          op_id: u64,
 74          key: DhtKey,
 75          values: Vec<StoredValue>,
 76      },
 77      
 78      /// Operation failed
 79      Failed {
 80          op_id: u64,
 81          error: String,
 82      },
 83  }
 84  
 85  /// Events returned by the executor when processing
 86  #[derive(Debug)]
 87  pub enum ExecutorEvent {
 88      /// Actions that need to be performed
 89      Actions(Vec<DhtAction>),
 90      /// Operation completed (with result action)
 91      Complete(DhtAction),
 92      /// More responses needed to complete
 93      Pending,
 94  }
 95  
 96  // ============================================================================
 97  // Pending Operation Tracking
 98  // ============================================================================
 99  
100  /// A pending store operation  
101  struct PendingStore {
102      key: DhtKey,
103      value: StoredValue,
104      /// Peers we've successfully stored on
105      stored_on: Vec<NodeInfo>,
106      /// Peers we're waiting on
107      pending_peers: Vec<NodeInfo>,
108      /// Target replication count
109      target_count: usize,
110      started_at: Instant,
111  }
112  
113  /// A pending query operation using ValueLookup state machine
114  struct PendingQuery {
115      key: DhtKey,
116      lookup: ValueLookup,
117      /// Values found so far
118      found_values: Vec<StoredValue>,
119      started_at: Instant,
120  }
121  
122  /// Tracks a pending operation
123  enum PendingOp {
124      Store(PendingStore),
125      Query(PendingQuery),
126  }
127  
128  // ============================================================================
129  // DHT Executor
130  // ============================================================================
131  
132  /// The DHT executor bridges high-level DhtOp commands to low-level DHT protocol
133  pub struct DhtExecutor {
134      /// Our node information
135      our_node: NodeInfo,
136      
137      /// Message builder for creating protocol messages
138      msg_builder: MessageBuilder,
139      
140      /// Routing table for finding peers
141      routing_table: RoutingTable,
142      
143      /// Pending operations by ID
144      pending: HashMap<u64, PendingOp>,
145      
146      /// Next operation ID
147      next_op_id: u64,
148      
149      /// Configuration
150      config: ExecutorConfig,
151  }
152  
153  impl DhtExecutor {
154      /// Create a new executor
155      pub fn new(our_node: NodeInfo, config: ExecutorConfig) -> Self {
156          let routing_table = RoutingTable::new(our_node.id);
157          let msg_builder = MessageBuilder::new(our_node.clone());
158          
159          Self {
160              our_node,
161              msg_builder,
162              routing_table,
163              pending: HashMap::new(),
164              next_op_id: 1,
165              config,
166          }
167      }
168      
169      /// Get next operation ID
170      fn next_op_id(&mut self) -> u64 {
171          let id = self.next_op_id;
172          self.next_op_id += 1;
173          id
174      }
175      
176      /// Add a peer to the routing table
177      pub fn add_peer(&mut self, peer: NodeInfo, now: u64) {
178          self.routing_table.add(peer, now);
179      }
180      
181      /// Submit a DHT operation for execution
182      pub fn submit(&mut self, op: DhtOp, now: u64) -> (u64, ExecutorEvent) {
183          let op_id = self.next_op_id();
184          
185          match op {
186              DhtOp::Store { circle_key, capability_keys, payload, ttl } => {
187                  self.start_store(op_id, circle_key, capability_keys, payload, ttl, now)
188              }
189              DhtOp::Query { key } => {
190                  self.start_query(op_id, key, now)
191              }
192          }
193      }
194      
195      /// Start a store operation
196      fn start_store(
197          &mut self,
198          op_id: u64,
199          circle_key: DhtKey,
200          _capability_keys: Vec<[u8; 32]>,
201          payload: Vec<u8>,
202          ttl: u64,
203          now: u64,
204      ) -> (u64, ExecutorEvent) {
205          // Find closest nodes to store on (using circle_key as primary)
206          let closest = self.routing_table.closest(&circle_key, self.config.k);
207          
208          if closest.nodes.is_empty() {
209              return (op_id, ExecutorEvent::Complete(DhtAction::Failed {
210                  op_id,
211                  error: "No peers in routing table".to_string(),
212              }));
213          }
214          
215          // Create the stored value using the proper API
216          let stored_value = StoredValue::new(
217              circle_key,
218              ValueType::Application, // Generic application data for circle announcements
219              self.our_node.id,       // Publisher is our node
220              payload,
221              now,
222              Some(ttl),
223          );
224          
225          // Build store messages for closest nodes
226          let mut actions = Vec::new();
227          let target_count = self.config.replication_factor.min(closest.nodes.len());
228          let targets: Vec<NodeInfo> = closest.nodes.iter()
229              .take(target_count)
230              .cloned()
231              .collect();
232          
233          for peer in &targets {
234              let message = self.msg_builder.store(stored_value.clone());
235              actions.push(DhtAction::Send {
236                  target: peer.clone(),
237                  message,
238                  op_id,
239              });
240          }
241          
242          // Track pending store
243          self.pending.insert(op_id, PendingOp::Store(PendingStore {
244              key: circle_key,
245              value: stored_value,
246              stored_on: Vec::new(),
247              pending_peers: targets,
248              target_count,
249              started_at: Instant::now(),
250          }));
251          
252          (op_id, ExecutorEvent::Actions(actions))
253      }
254      
255      /// Start a query operation
256      fn start_query(
257          &mut self,
258          op_id: u64,
259          key: DhtKey,
260          _now: u64,
261      ) -> (u64, ExecutorEvent) {
262          // Get seed nodes for the lookup
263          let seeds: Vec<NodeInfo> = self.routing_table
264              .closest(&key, self.config.alpha)
265              .nodes;
266          
267          if seeds.is_empty() {
268              return (op_id, ExecutorEvent::Complete(DhtAction::Failed {
269                  op_id,
270                  error: "No peers in routing table".to_string(),
271              }));
272          }
273          
274          // Create lookup state machine (needs our_id as second param)
275          let mut lookup = ValueLookup::new(key, self.our_node.id, seeds);
276          
277          // Get initial actions from lookup
278          let actions = self.drive_lookup(op_id, &mut lookup);
279          
280          // Track pending query
281          self.pending.insert(op_id, PendingOp::Query(PendingQuery {
282              key,
283              lookup,
284              found_values: Vec::new(),
285              started_at: Instant::now(),
286          }));
287          
288          (op_id, ExecutorEvent::Actions(actions))
289      }
290      
291      /// Extract pending queries from a lookup and convert to actions
292      fn drive_lookup(&self, op_id: u64, lookup: &mut ValueLookup) -> Vec<DhtAction> {
293          let mut actions = Vec::new();
294          
295          // ValueLookup exposes next_queries() with nodes to query
296          for node in lookup.next_queries() {
297              let message = self.msg_builder.find_value(*lookup.target());
298              actions.push(DhtAction::Send {
299                  target: node,
300                  message,
301                  op_id,
302              });
303          }
304          
305          actions
306      }
307      
308      /// Handle a response from the network
309      pub fn on_response(
310          &mut self,
311          op_id: u64,
312          from: &NodeInfo,
313          response: DhtResponse,
314          now: u64,
315      ) -> ExecutorEvent {
316          // Update routing table with responding peer
317          self.routing_table.add(from.clone(), now);
318          
319          // Remove the operation to get ownership (avoids borrow issues)
320          let Some(mut op) = self.pending.remove(&op_id) else {
321              return ExecutorEvent::Pending;
322          };
323          
324          let (event, should_reinsert) = match &mut op {
325              PendingOp::Store(store) => {
326                  self.handle_store_response(op_id, from, response, store)
327              }
328              PendingOp::Query(query) => {
329                  self.handle_query_response(op_id, from, response, query)
330              }
331          };
332          
333          // Reinsert if not complete
334          if should_reinsert {
335              self.pending.insert(op_id, op);
336          }
337          
338          event
339      }
340      
341      /// Handle store response
342      fn handle_store_response(
343          &self,
344          op_id: u64,
345          from: &NodeInfo,
346          response: DhtResponse,
347          store: &mut PendingStore,
348      ) -> (ExecutorEvent, bool) {
349          // Remove from pending peers
350          store.pending_peers.retain(|p| p.id != from.id);
351          
352          match response {
353              DhtResponse::Stored { success, .. } if success => {
354                  store.stored_on.push(from.clone());
355              }
356              DhtResponse::Error { message, .. } => {
357                  eprintln!("Store failed on peer: {}", message);
358              }
359              _ => {}
360          }
361          
362          // Check if we're done
363          if store.stored_on.len() >= store.target_count || store.pending_peers.is_empty() {
364              let result = if store.stored_on.is_empty() {
365                  DhtAction::Failed {
366                      op_id,
367                      error: "Failed to store on any peer".to_string(),
368                  }
369              } else {
370                  DhtAction::StoreComplete {
371                      op_id,
372                      key: store.key,
373                      stored_on: store.stored_on.clone(),
374                  }
375              };
376              
377              (ExecutorEvent::Complete(result), false)
378          } else {
379              (ExecutorEvent::Pending, true)
380          }
381      }
382      
383      /// Handle query response
384      fn handle_query_response(
385          &self,
386          op_id: u64,
387          from: &NodeInfo,
388          response: DhtResponse,
389          query: &mut PendingQuery,
390      ) -> (ExecutorEvent, bool) {
391          match response {
392              DhtResponse::Value { values, closer_nodes } => {
393                  // Found values!
394                  query.found_values.extend(values.clone());
395                  
396                  // Feed response to lookup state machine
397                  query.lookup.on_value(&from.id, values, closer_nodes);
398              }
399              DhtResponse::Nodes { nodes } => {
400                  // No value, but got closer nodes
401                  query.lookup.on_nodes(&from.id, nodes);
402              }
403              DhtResponse::Error { message, .. } => {
404                  eprintln!("Query error from {}: {}", hex::encode(&from.id[..4]), message);
405                  query.lookup.on_failure(&from.id);
406              }
407              _ => {}
408          }
409          
410          // Check if lookup is complete
411          if query.lookup.is_complete() {
412              let result = DhtAction::QueryComplete {
413                  op_id,
414                  key: query.key,
415                  values: query.found_values.clone(),
416              };
417              return (ExecutorEvent::Complete(result), false);
418          }
419          
420          // Get more actions from lookup - need mutable access
421          let actions = {
422              let mut local_actions = Vec::new();
423              for node in query.lookup.next_queries() {
424                  let message = self.msg_builder.find_value(*query.lookup.target());
425                  local_actions.push(DhtAction::Send {
426                      target: node,
427                      message,
428                      op_id,
429                  });
430              }
431              local_actions
432          };
433          
434          if actions.is_empty() {
435              (ExecutorEvent::Pending, true)
436          } else {
437              (ExecutorEvent::Actions(actions), true)
438          }
439      }
440      
441      /// Check for timed-out operations
442      pub fn check_timeouts(&mut self) -> Vec<DhtAction> {
443          let timeout = self.config.request_timeout;
444          let now = Instant::now();
445          
446          let timed_out: Vec<u64> = self.pending
447              .iter()
448              .filter_map(|(id, op)| {
449                  let started = match op {
450                      PendingOp::Store(s) => s.started_at,
451                      PendingOp::Query(q) => q.started_at,
452                  };
453                  if now.duration_since(started) > timeout {
454                      Some(*id)
455                  } else {
456                      None
457                  }
458              })
459              .collect();
460          
461          timed_out
462              .into_iter()
463              .filter_map(|op_id| {
464                  self.pending.remove(&op_id).map(|op| {
465                      let key = match &op {
466                          PendingOp::Store(s) => s.key,
467                          PendingOp::Query(q) => q.key,
468                      };
469                      DhtAction::Failed {
470                          op_id,
471                          error: format!("Operation timed out for key {}", hex::encode(&key[..8])),
472                      }
473                  })
474              })
475              .collect()
476      }
477      
478      /// Get statistics about pending operations
479      pub fn stats(&self) -> ExecutorStats {
480          let mut stores = 0;
481          let mut queries = 0;
482          
483          for op in self.pending.values() {
484              match op {
485                  PendingOp::Store(_) => stores += 1,
486                  PendingOp::Query(_) => queries += 1,
487              }
488          }
489          
490          ExecutorStats {
491              pending_stores: stores,
492              pending_queries: queries,
493              routing_table_size: self.routing_table.len(),
494          }
495      }
496  }
497  
498  /// Statistics about the executor state
499  #[derive(Debug, Clone)]
500  pub struct ExecutorStats {
501      pub pending_stores: usize,
502      pub pending_queries: usize,
503      pub routing_table_size: usize,
504  }
505  
506  // ============================================================================
507  // Tests
508  // ============================================================================
509  
510  #[cfg(test)]
511  mod tests {
512      use super::*;
513      
514      fn make_node(id_byte: u8) -> NodeInfo {
515          let id = [id_byte; 32];
516          NodeInfo::new(id, format!("127.0.0.1:800{}", id_byte % 10), 1000)
517      }
518      
519      fn make_executor() -> DhtExecutor {
520          let our_node = make_node(0x00);
521          DhtExecutor::new(our_node, ExecutorConfig::default())
522      }
523      
524      #[test]
525      fn test_submit_store_no_peers() {
526          let mut executor = make_executor();
527          
528          let op = DhtOp::Store {
529              circle_key: [0x42; 32],
530              capability_keys: vec![],
531              payload: b"test data".to_vec(),
532              ttl: 3600,
533          };
534          
535          let (op_id, event) = executor.submit(op, 1000);
536          
537          match event {
538              ExecutorEvent::Complete(DhtAction::Failed { error, .. }) => {
539                  assert!(error.contains("No peers"));
540              }
541              _ => panic!("Expected failure with no peers"),
542          }
543          assert_eq!(op_id, 1);
544      }
545      
546      #[test]
547      fn test_submit_store_with_peers() {
548          let mut executor = make_executor();
549          
550          // Add some peers
551          for i in 1..5u8 {
552              executor.add_peer(make_node(i), 1000);
553          }
554          
555          let op = DhtOp::Store {
556              circle_key: [0x42; 32],
557              capability_keys: vec![],
558              payload: b"test data".to_vec(),
559              ttl: 3600,
560          };
561          
562          let (op_id, event) = executor.submit(op, 1000);
563          
564          match event {
565              ExecutorEvent::Actions(actions) => {
566                  // Should send to replication_factor peers (default 3)
567                  assert_eq!(actions.len(), 3);
568                  for action in actions {
569                      match action {
570                          DhtAction::Send { message, .. } => {
571                              // Should be a Store request
572                              assert!(matches!(message, DhtMessage::Request(_)));
573                          }
574                          _ => panic!("Expected Send action"),
575                      }
576                  }
577              }
578              _ => panic!("Expected Actions event"),
579          }
580          
581          // Should have pending operation
582          assert!(executor.pending.contains_key(&op_id));
583      }
584      
585      #[test]
586      fn test_store_completes_on_response() {
587          let mut executor = make_executor();
588          
589          let peer1 = make_node(1);
590          let peer2 = make_node(2);
591          let peer3 = make_node(3);
592          executor.add_peer(peer1.clone(), 1000);
593          executor.add_peer(peer2.clone(), 1000);
594          executor.add_peer(peer3.clone(), 1000);
595          
596          let op = DhtOp::Store {
597              circle_key: [0x42; 32],
598              capability_keys: vec![],
599              payload: b"test data".to_vec(),
600              ttl: 3600,
601          };
602          
603          let (op_id, _) = executor.submit(op, 1000);
604          
605          // Simulate successful responses
606          let stored_response = DhtResponse::Stored { success: true, error: None };
607          
608          let event1 = executor.on_response(op_id, &peer1, stored_response.clone(), 1001);
609          assert!(matches!(event1, ExecutorEvent::Pending));
610          
611          let event2 = executor.on_response(op_id, &peer2, stored_response.clone(), 1002);
612          assert!(matches!(event2, ExecutorEvent::Pending));
613          
614          let event3 = executor.on_response(op_id, &peer3, stored_response, 1003);
615          
616          match event3 {
617              ExecutorEvent::Complete(DhtAction::StoreComplete { stored_on, .. }) => {
618                  assert_eq!(stored_on.len(), 3);
619              }
620              _ => panic!("Expected StoreComplete"),
621          }
622          
623          // Operation should be removed
624          assert!(!executor.pending.contains_key(&op_id));
625      }
626      
627      #[test]  
628      fn test_submit_query_no_peers() {
629          let mut executor = make_executor();
630          
631          let op = DhtOp::Query { key: [0x42; 32] };
632          let (_, event) = executor.submit(op, 1000);
633          
634          match event {
635              ExecutorEvent::Complete(DhtAction::Failed { error, .. }) => {
636                  assert!(error.contains("No peers"));
637              }
638              _ => panic!("Expected failure with no peers"),
639          }
640      }
641      
642      #[test]
643      fn test_stats() {
644          let mut executor = make_executor();
645          
646          for i in 1..5u8 {
647              executor.add_peer(make_node(i), 1000);
648          }
649          
650          let stats = executor.stats();
651          assert_eq!(stats.pending_stores, 0);
652          assert_eq!(stats.pending_queries, 0);
653          assert_eq!(stats.routing_table_size, 4);
654          
655          // Submit a store
656          let op = DhtOp::Store {
657              circle_key: [0x42; 32],
658              capability_keys: vec![],
659              payload: b"test".to_vec(),
660              ttl: 3600,
661          };
662          executor.submit(op, 1000);
663          
664          let stats = executor.stats();
665          assert_eq!(stats.pending_stores, 1);
666      }
667  }