fault_tolerance.rs
1 /// Fault tolerance for distributed bot orchestration 2 use crate::proto::*; 3 use crate::registry::WorkerRegistry; 4 use anyhow::Result; 5 use std::sync::Arc; 6 use std::time::Duration; 7 use tokio::time::interval; 8 use tracing::{info, warn}; 9 10 /// Fault detector monitors worker health 11 pub struct FaultDetector { 12 registry: Arc<WorkerRegistry>, 13 check_interval: Duration, 14 heartbeat_timeout: Duration, 15 } 16 17 impl FaultDetector { 18 pub fn new(registry: Arc<WorkerRegistry>) -> Self { 19 Self { 20 registry, 21 check_interval: Duration::from_secs(10), 22 heartbeat_timeout: Duration::from_secs(15), // 3 missed heartbeats 23 } 24 } 25 26 /// Start fault detection loop 27 pub async fn run(&self) { 28 let mut ticker = interval(self.check_interval); 29 30 loop { 31 ticker.tick().await; 32 self.check_stale_workers(); 33 } 34 } 35 36 /// Check for stale workers and mark as unhealthy 37 fn check_stale_workers(&self) { 38 let timeout_ms = self.heartbeat_timeout.as_millis() as i64; 39 self.registry.check_stale_workers(timeout_ms); 40 41 // Log any unhealthy workers 42 let unhealthy_count = self.registry.worker_count() - self.registry.healthy_worker_count(); 43 if unhealthy_count > 0 { 44 warn!("Detected {} unhealthy workers", unhealthy_count); 45 } 46 } 47 } 48 49 /// Bot migration handles moving bots from failed workers 50 pub struct BotMigration { 51 registry: Arc<WorkerRegistry>, 52 } 53 54 impl BotMigration { 55 pub fn new(registry: Arc<WorkerRegistry>) -> Self { 56 Self { registry } 57 } 58 59 /// Migrate bots from failed worker to healthy workers 60 pub async fn migrate_bots_from_worker(&self, failed_worker_id: &str) -> Result<Vec<BotHandle>> { 61 info!("Migrating bots from failed worker: {}", failed_worker_id); 62 63 // TODO: Track which bots were on the failed worker 64 // For now, return empty list 65 let migrated = Vec::new(); 66 67 info!("Migrated {} bots from failed worker", migrated.len()); 68 69 Ok(migrated) 70 } 71 72 /// Distribute bots across healthy workers 73 pub async fn redistribute_bots(&self, bot_specs: Vec<BotSpec>) -> Result<Vec<BotHandle>> { 74 let healthy_workers = self.registry.list_workers(); 75 76 if healthy_workers.is_empty() { 77 anyhow::bail!("No healthy workers available for redistribution"); 78 } 79 80 info!( 81 "Redistributing {} bots across {} healthy workers", 82 bot_specs.len(), 83 healthy_workers.len() 84 ); 85 86 let mut handles = Vec::new(); 87 let mut worker_idx = 0; 88 89 for bot_spec in bot_specs { 90 let worker = &healthy_workers[worker_idx % healthy_workers.len()]; 91 92 // TODO: Send spawn request to worker 93 handles.push(BotHandle { 94 bot_id: bot_spec.bot_id.clone(), 95 worker_id: worker.worker_id.clone(), 96 success: true, 97 message: "Bot migrated".to_string(), 98 }); 99 100 worker_idx += 1; 101 } 102 103 Ok(handles) 104 } 105 } 106 107 /// Metrics buffering for workers when coordinator is unreachable 108 pub struct MetricsBuffer { 109 buffer: Arc<parking_lot::RwLock<Vec<WorkerMetrics>>>, 110 max_buffer_size: usize, 111 } 112 113 impl MetricsBuffer { 114 pub fn new(max_buffer_size: usize) -> Self { 115 Self { 116 buffer: Arc::new(parking_lot::RwLock::new(Vec::new())), 117 max_buffer_size, 118 } 119 } 120 121 /// Buffer metrics locally 122 pub fn buffer(&self, metrics: WorkerMetrics) { 123 let mut buffer = self.buffer.write(); 124 125 if buffer.len() >= self.max_buffer_size { 126 // Remove oldest 127 buffer.remove(0); 128 } 129 130 buffer.push(metrics); 131 } 132 133 /// Flush buffered metrics to coordinator 134 pub fn flush(&self) -> Vec<WorkerMetrics> { 135 let mut buffer = self.buffer.write(); 136 std::mem::take(&mut *buffer) 137 } 138 139 /// Get buffer size 140 pub fn size(&self) -> usize { 141 self.buffer.read().len() 142 } 143 } 144 145 /// Coordinator state checkpointing 146 pub struct CoordinatorCheckpoint { 147 checkpoint_path: String, 148 } 149 150 impl CoordinatorCheckpoint { 151 pub fn new(checkpoint_path: String) -> Self { 152 Self { checkpoint_path } 153 } 154 155 /// Save coordinator state to disk 156 pub async fn save(&self, state: &CoordinatorState) -> Result<()> { 157 info!("Saving coordinator checkpoint to {}", self.checkpoint_path); 158 159 let json = serde_json::to_string_pretty(state)?; 160 tokio::fs::write(&self.checkpoint_path, json).await?; 161 162 Ok(()) 163 } 164 165 /// Load coordinator state from disk 166 pub async fn load(&self) -> Result<CoordinatorState> { 167 info!("Loading coordinator checkpoint from {}", self.checkpoint_path); 168 169 let json = tokio::fs::read_to_string(&self.checkpoint_path).await?; 170 let state = serde_json::from_str(&json)?; 171 172 Ok(state) 173 } 174 } 175 176 /// Coordinator state for checkpointing 177 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 178 pub struct CoordinatorState { 179 pub worker_count: usize, 180 pub total_bots: usize, 181 pub active_scenarios: Vec<String>, 182 pub checkpoint_timestamp: i64, 183 } 184 185 impl CoordinatorState { 186 pub fn new() -> Self { 187 Self { 188 worker_count: 0, 189 total_bots: 0, 190 active_scenarios: Vec::new(), 191 checkpoint_timestamp: std::time::SystemTime::now() 192 .duration_since(std::time::UNIX_EPOCH) 193 .unwrap_or_default() 194 .as_secs() as i64, 195 } 196 } 197 } 198 199 impl Default for CoordinatorState { 200 fn default() -> Self { 201 Self::new() 202 } 203 } 204 205 #[cfg(test)] 206 mod tests { 207 use super::*; 208 209 #[test] 210 fn test_metrics_buffer() { 211 let buffer = MetricsBuffer::new(5); 212 213 // Buffer some metrics 214 for i in 0..7 { 215 buffer.buffer(WorkerMetrics { 216 worker_id: format!("worker-{}", i), 217 active_bots: i as u32, 218 bot_metrics: vec![], 219 cpu_usage_percent: 50, 220 memory_usage_bytes: 1000, 221 timestamp_ms: i as i64, 222 }); 223 } 224 225 // Should only keep 5 (max buffer size) 226 assert_eq!(buffer.size(), 5); 227 228 // Flush should return all and clear 229 let flushed = buffer.flush(); 230 assert_eq!(flushed.len(), 5); 231 assert_eq!(buffer.size(), 0); 232 } 233 }