/ crates / distributed / src / fault_tolerance.rs
fault_tolerance.rs
  1  /// Fault tolerance for distributed bot orchestration
  2  use crate::proto::*;
  3  use crate::registry::WorkerRegistry;
  4  use anyhow::Result;
  5  use std::sync::Arc;
  6  use std::time::Duration;
  7  use tokio::time::interval;
  8  use tracing::{info, warn};
  9  
 10  /// Fault detector monitors worker health
 11  pub struct FaultDetector {
 12      registry: Arc<WorkerRegistry>,
 13      check_interval: Duration,
 14      heartbeat_timeout: Duration,
 15  }
 16  
 17  impl FaultDetector {
 18      pub fn new(registry: Arc<WorkerRegistry>) -> Self {
 19          Self {
 20              registry,
 21              check_interval: Duration::from_secs(10),
 22              heartbeat_timeout: Duration::from_secs(15), // 3 missed heartbeats
 23          }
 24      }
 25  
 26      /// Start fault detection loop
 27      pub async fn run(&self) {
 28          let mut ticker = interval(self.check_interval);
 29  
 30          loop {
 31              ticker.tick().await;
 32              self.check_stale_workers();
 33          }
 34      }
 35  
 36      /// Check for stale workers and mark as unhealthy
 37      fn check_stale_workers(&self) {
 38          let timeout_ms = self.heartbeat_timeout.as_millis() as i64;
 39          self.registry.check_stale_workers(timeout_ms);
 40  
 41          // Log any unhealthy workers
 42          let unhealthy_count = self.registry.worker_count() - self.registry.healthy_worker_count();
 43          if unhealthy_count > 0 {
 44              warn!("Detected {} unhealthy workers", unhealthy_count);
 45          }
 46      }
 47  }
 48  
 49  /// Bot migration handles moving bots from failed workers
 50  pub struct BotMigration {
 51      registry: Arc<WorkerRegistry>,
 52  }
 53  
 54  impl BotMigration {
 55      pub fn new(registry: Arc<WorkerRegistry>) -> Self {
 56          Self { registry }
 57      }
 58  
 59      /// Migrate bots from failed worker to healthy workers
 60      pub async fn migrate_bots_from_worker(&self, failed_worker_id: &str) -> Result<Vec<BotHandle>> {
 61          info!("Migrating bots from failed worker: {}", failed_worker_id);
 62  
 63          // TODO: Track which bots were on the failed worker
 64          // For now, return empty list
 65          let migrated = Vec::new();
 66  
 67          info!("Migrated {} bots from failed worker", migrated.len());
 68  
 69          Ok(migrated)
 70      }
 71  
 72      /// Distribute bots across healthy workers
 73      pub async fn redistribute_bots(&self, bot_specs: Vec<BotSpec>) -> Result<Vec<BotHandle>> {
 74          let healthy_workers = self.registry.list_workers();
 75  
 76          if healthy_workers.is_empty() {
 77              anyhow::bail!("No healthy workers available for redistribution");
 78          }
 79  
 80          info!(
 81              "Redistributing {} bots across {} healthy workers",
 82              bot_specs.len(),
 83              healthy_workers.len()
 84          );
 85  
 86          let mut handles = Vec::new();
 87          let mut worker_idx = 0;
 88  
 89          for bot_spec in bot_specs {
 90              let worker = &healthy_workers[worker_idx % healthy_workers.len()];
 91  
 92              // TODO: Send spawn request to worker
 93              handles.push(BotHandle {
 94                  bot_id: bot_spec.bot_id.clone(),
 95                  worker_id: worker.worker_id.clone(),
 96                  success: true,
 97                  message: "Bot migrated".to_string(),
 98              });
 99  
100              worker_idx += 1;
101          }
102  
103          Ok(handles)
104      }
105  }
106  
107  /// Metrics buffering for workers when coordinator is unreachable
108  pub struct MetricsBuffer {
109      buffer: Arc<parking_lot::RwLock<Vec<WorkerMetrics>>>,
110      max_buffer_size: usize,
111  }
112  
113  impl MetricsBuffer {
114      pub fn new(max_buffer_size: usize) -> Self {
115          Self {
116              buffer: Arc::new(parking_lot::RwLock::new(Vec::new())),
117              max_buffer_size,
118          }
119      }
120  
121      /// Buffer metrics locally
122      pub fn buffer(&self, metrics: WorkerMetrics) {
123          let mut buffer = self.buffer.write();
124  
125          if buffer.len() >= self.max_buffer_size {
126              // Remove oldest
127              buffer.remove(0);
128          }
129  
130          buffer.push(metrics);
131      }
132  
133      /// Flush buffered metrics to coordinator
134      pub fn flush(&self) -> Vec<WorkerMetrics> {
135          let mut buffer = self.buffer.write();
136          std::mem::take(&mut *buffer)
137      }
138  
139      /// Get buffer size
140      pub fn size(&self) -> usize {
141          self.buffer.read().len()
142      }
143  }
144  
145  /// Coordinator state checkpointing
146  pub struct CoordinatorCheckpoint {
147      checkpoint_path: String,
148  }
149  
150  impl CoordinatorCheckpoint {
151      pub fn new(checkpoint_path: String) -> Self {
152          Self { checkpoint_path }
153      }
154  
155      /// Save coordinator state to disk
156      pub async fn save(&self, state: &CoordinatorState) -> Result<()> {
157          info!("Saving coordinator checkpoint to {}", self.checkpoint_path);
158  
159          let json = serde_json::to_string_pretty(state)?;
160          tokio::fs::write(&self.checkpoint_path, json).await?;
161  
162          Ok(())
163      }
164  
165      /// Load coordinator state from disk
166      pub async fn load(&self) -> Result<CoordinatorState> {
167          info!("Loading coordinator checkpoint from {}", self.checkpoint_path);
168  
169          let json = tokio::fs::read_to_string(&self.checkpoint_path).await?;
170          let state = serde_json::from_str(&json)?;
171  
172          Ok(state)
173      }
174  }
175  
176  /// Coordinator state for checkpointing
177  #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
178  pub struct CoordinatorState {
179      pub worker_count: usize,
180      pub total_bots: usize,
181      pub active_scenarios: Vec<String>,
182      pub checkpoint_timestamp: i64,
183  }
184  
185  impl CoordinatorState {
186      pub fn new() -> Self {
187          Self {
188              worker_count: 0,
189              total_bots: 0,
190              active_scenarios: Vec::new(),
191              checkpoint_timestamp: std::time::SystemTime::now()
192                  .duration_since(std::time::UNIX_EPOCH)
193                  .unwrap_or_default()
194                  .as_secs() as i64,
195          }
196      }
197  }
198  
199  impl Default for CoordinatorState {
200      fn default() -> Self {
201          Self::new()
202      }
203  }
204  
205  #[cfg(test)]
206  mod tests {
207      use super::*;
208  
209      #[test]
210      fn test_metrics_buffer() {
211          let buffer = MetricsBuffer::new(5);
212  
213          // Buffer some metrics
214          for i in 0..7 {
215              buffer.buffer(WorkerMetrics {
216                  worker_id: format!("worker-{}", i),
217                  active_bots: i as u32,
218                  bot_metrics: vec![],
219                  cpu_usage_percent: 50,
220                  memory_usage_bytes: 1000,
221                  timestamp_ms: i as i64,
222              });
223          }
224  
225          // Should only keep 5 (max buffer size)
226          assert_eq!(buffer.size(), 5);
227  
228          // Flush should return all and clear
229          let flushed = buffer.flush();
230          assert_eq!(flushed.len(), 5);
231          assert_eq!(buffer.size(), 0);
232      }
233  }