aggregator.rs
1 /// Metrics aggregator with HDR histogram for latency tracking 2 use crate::BotEvent; 3 use hdrhistogram::Histogram; 4 use parking_lot::RwLock; 5 use std::collections::HashMap; 6 use std::sync::Arc; 7 8 /// Real-time metrics aggregator 9 #[derive(Clone)] 10 pub struct MetricsAggregator { 11 state: Arc<RwLock<AggregatorState>>, 12 } 13 14 struct AggregatorState { 15 /// Latency histogram (in microseconds, up to 1 minute) 16 latency_histogram: Histogram<u64>, 17 18 /// Total operations 19 total_operations: u64, 20 21 /// Total errors 22 total_errors: u64, 23 24 /// Operations per bot 25 bot_operations: HashMap<String, u64>, 26 27 /// Start time for TPS calculation 28 start_time_ms: i64, 29 30 /// Window start for rolling metrics 31 window_start_ms: i64, 32 33 /// Operations in current window 34 window_operations: u64, 35 36 /// Active bots 37 active_bots: HashMap<String, i64>, // bot_id -> last_seen_ms 38 39 /// Bots by role (for Prometheus metrics) 40 bots_by_role: HashMap<String, usize>, 41 42 /// Behavior success tracking 43 behavior_successes: HashMap<String, u64>, 44 behavior_failures: HashMap<String, u64>, 45 46 /// Active scenario name 47 active_scenario: Option<String>, 48 49 /// Scenario progress (0.0 to 1.0) 50 scenario_progress: Option<f64>, 51 52 /// Worker bot counts (distributed mode) 53 workers: HashMap<String, usize>, 54 } 55 56 impl MetricsAggregator { 57 /// Create a new metrics aggregator 58 pub fn new() -> Self { 59 Self { 60 state: Arc::new(RwLock::new(AggregatorState { 61 // HDR Histogram: 1 microsecond to 60 seconds, 3 significant digits 62 latency_histogram: Histogram::new_with_bounds(1, 60_000_000, 3) 63 .expect("Failed to create histogram"), 64 total_operations: 0, 65 total_errors: 0, 66 bot_operations: HashMap::new(), 67 start_time_ms: current_time_ms(), 68 window_start_ms: current_time_ms(), 69 window_operations: 0, 70 active_bots: HashMap::new(), 71 bots_by_role: HashMap::new(), 72 behavior_successes: HashMap::new(), 73 behavior_failures: HashMap::new(), 74 active_scenario: None, 75 scenario_progress: None, 76 workers: HashMap::new(), 77 })), 78 } 79 } 80 81 /// Process a single event 82 pub fn process_event(&self, event: &BotEvent) { 83 let mut state = self.state.write(); 84 85 match event { 86 BotEvent::BotStarted { bot_id, role, timestamp_ms } => { 87 state.active_bots.insert(bot_id.clone(), *timestamp_ms); 88 *state.bots_by_role.entry(role.clone()).or_insert(0) += 1; 89 } 90 91 BotEvent::BotStopped { bot_id, .. } => { 92 state.active_bots.remove(bot_id); 93 // Note: Not removing from bots_by_role as we want cumulative counts 94 } 95 96 BotEvent::BehaviorCompleted { 97 bot_id, 98 behavior_id, 99 duration_ms, 100 success, 101 .. 102 } => { 103 state.total_operations += 1; 104 state.window_operations += 1; 105 106 *state.bot_operations.entry(bot_id.clone()).or_insert(0) += 1; 107 108 if *success { 109 *state.behavior_successes.entry(behavior_id.clone()).or_insert(0) += 1; 110 } else { 111 *state.behavior_failures.entry(behavior_id.clone()).or_insert(0) += 1; 112 state.total_errors += 1; 113 } 114 115 // Record latency in microseconds 116 let _ = state.latency_histogram.record(*duration_ms * 1000); 117 } 118 119 BotEvent::TransactionConfirmed { 120 confirmation_time_ms, 121 .. 122 } => { 123 state.total_operations += 1; 124 state.window_operations += 1; 125 let _ = state.latency_histogram.record(*confirmation_time_ms * 1000); 126 } 127 128 BotEvent::NetworkResponse { latency_ms, .. } => { 129 let _ = state.latency_histogram.record(*latency_ms * 1000); 130 } 131 132 BotEvent::BotError { .. } | BotEvent::TransactionFailed { .. } => { 133 state.total_errors += 1; 134 } 135 136 _ => {} 137 } 138 } 139 140 /// Process a batch of events 141 pub fn process_batch(&self, events: &[BotEvent]) { 142 for event in events { 143 self.process_event(event); 144 } 145 } 146 147 /// Get current TPS (transactions per second) 148 pub fn tps(&self) -> f64 { 149 let state = self.state.read(); 150 let elapsed_ms = current_time_ms() - state.start_time_ms; 151 152 if elapsed_ms == 0 { 153 return 0.0; 154 } 155 156 (state.total_operations as f64) / (elapsed_ms as f64 / 1000.0) 157 } 158 159 /// Get TPS for a rolling window (last N milliseconds) 160 pub fn window_tps(&self, window_ms: i64) -> f64 { 161 let mut state = self.state.write(); 162 let now = current_time_ms(); 163 164 // Reset window if expired 165 if now - state.window_start_ms > window_ms { 166 state.window_start_ms = now; 167 state.window_operations = 0; 168 } 169 170 let elapsed = now - state.window_start_ms; 171 if elapsed == 0 { 172 return 0.0; 173 } 174 175 (state.window_operations as f64) / (elapsed as f64 / 1000.0) 176 } 177 178 /// Get latency percentile (in milliseconds) 179 pub fn latency_percentile(&self, percentile: f64) -> f64 { 180 let state = self.state.read(); 181 182 if state.latency_histogram.len() == 0 { 183 return 0.0; 184 } 185 186 let value_us = state.latency_histogram.value_at_quantile(percentile); 187 value_us as f64 / 1000.0 // Convert to milliseconds 188 } 189 190 /// Get p50 latency 191 pub fn latency_p50(&self) -> f64 { 192 self.latency_percentile(0.50) 193 } 194 195 /// Get p95 latency 196 pub fn latency_p95(&self) -> f64 { 197 self.latency_percentile(0.95) 198 } 199 200 /// Get p99 latency 201 pub fn latency_p99(&self) -> f64 { 202 self.latency_percentile(0.99) 203 } 204 205 /// Get error rate (errors / total operations) 206 pub fn error_rate(&self) -> f64 { 207 let state = self.state.read(); 208 209 if state.total_operations == 0 { 210 return 0.0; 211 } 212 213 (state.total_errors as f64) / (state.total_operations as f64) 214 } 215 216 /// Get total operations 217 pub fn total_operations(&self) -> u64 { 218 self.state.read().total_operations 219 } 220 221 /// Get total errors 222 pub fn total_errors(&self) -> u64 { 223 self.state.read().total_errors 224 } 225 226 /// Get active bot count 227 pub fn active_bot_count(&self) -> usize { 228 self.state.read().active_bots.len() 229 } 230 231 /// Get operations per bot 232 pub fn bot_operations(&self) -> HashMap<String, u64> { 233 self.state.read().bot_operations.clone() 234 } 235 236 /// Get full metrics snapshot 237 pub fn snapshot(&self) -> MetricsSnapshot { 238 let state = self.state.read(); 239 240 // Calculate behavior success rates 241 let mut behavior_success_rates = HashMap::new(); 242 for (behavior_id, successes) in &state.behavior_successes { 243 let failures = state.behavior_failures.get(behavior_id).unwrap_or(&0); 244 let total = successes + failures; 245 if total > 0 { 246 behavior_success_rates.insert(behavior_id.clone(), *successes as f64 / total as f64); 247 } 248 } 249 250 MetricsSnapshot { 251 timestamp_ms: current_time_ms(), 252 tps: self.tps(), 253 latency_p50_ms: self.latency_p50(), 254 latency_p95_ms: self.latency_p95(), 255 latency_p99_ms: self.latency_p99(), 256 error_rate: self.error_rate(), 257 total_operations: self.total_operations(), 258 total_errors: self.total_errors(), 259 active_bots: self.active_bot_count(), 260 bots_by_role: state.bots_by_role.clone(), 261 behavior_success_rates, 262 active_scenario: state.active_scenario.clone(), 263 scenario_progress: state.scenario_progress, 264 workers: state.workers.clone(), 265 } 266 } 267 268 /// Set active scenario (for scenario tracking) 269 pub fn set_active_scenario(&self, name: Option<String>) { 270 self.state.write().active_scenario = name; 271 } 272 273 /// Update scenario progress (0.0 to 1.0) 274 pub fn set_scenario_progress(&self, progress: f64) { 275 self.state.write().scenario_progress = Some(progress.clamp(0.0, 1.0)); 276 } 277 278 /// Update worker bot counts (distributed mode) 279 pub fn set_worker_bots(&self, worker_id: String, bot_count: usize) { 280 self.state.write().workers.insert(worker_id, bot_count); 281 } 282 283 /// Remove worker (distributed mode) 284 pub fn remove_worker(&self, worker_id: &str) { 285 self.state.write().workers.remove(worker_id); 286 } 287 288 /// Reset all metrics 289 pub fn reset(&self) { 290 let mut state = self.state.write(); 291 292 state.latency_histogram.clear(); 293 state.total_operations = 0; 294 state.total_errors = 0; 295 state.bot_operations.clear(); 296 state.start_time_ms = current_time_ms(); 297 state.window_start_ms = current_time_ms(); 298 state.window_operations = 0; 299 state.bots_by_role.clear(); 300 state.behavior_successes.clear(); 301 state.behavior_failures.clear(); 302 } 303 } 304 305 impl Default for MetricsAggregator { 306 fn default() -> Self { 307 Self::new() 308 } 309 } 310 311 /// Metrics snapshot at a point in time 312 #[derive(Debug, Clone)] 313 pub struct MetricsSnapshot { 314 pub timestamp_ms: i64, 315 pub tps: f64, 316 pub latency_p50_ms: f64, 317 pub latency_p95_ms: f64, 318 pub latency_p99_ms: f64, 319 pub error_rate: f64, 320 pub total_operations: u64, 321 pub total_errors: u64, 322 pub active_bots: usize, 323 pub bots_by_role: HashMap<String, usize>, 324 pub behavior_success_rates: HashMap<String, f64>, 325 pub active_scenario: Option<String>, 326 pub scenario_progress: Option<f64>, 327 pub workers: HashMap<String, usize>, 328 } 329 330 fn current_time_ms() -> i64 { 331 std::time::SystemTime::now() 332 .duration_since(std::time::UNIX_EPOCH) 333 .unwrap_or_default() 334 .as_millis() as i64 335 } 336 337 #[cfg(test)] 338 mod tests { 339 use super::*; 340 341 #[test] 342 fn test_metrics_aggregation() { 343 let aggregator = MetricsAggregator::new(); 344 345 // Process some events 346 aggregator.process_event(&BotEvent::BehaviorCompleted { 347 bot_id: "bot-1".to_string(), 348 behavior_id: "test".to_string(), 349 timestamp_ms: current_time_ms(), 350 duration_ms: 100, 351 success: true, 352 }); 353 354 assert_eq!(aggregator.total_operations(), 1); 355 assert_eq!(aggregator.total_errors(), 0); 356 } 357 358 #[test] 359 fn test_error_rate() { 360 let aggregator = MetricsAggregator::new(); 361 362 aggregator.process_event(&BotEvent::BehaviorCompleted { 363 bot_id: "bot-1".to_string(), 364 behavior_id: "test".to_string(), 365 timestamp_ms: current_time_ms(), 366 duration_ms: 100, 367 success: true, 368 }); 369 370 aggregator.process_event(&BotEvent::BehaviorCompleted { 371 bot_id: "bot-1".to_string(), 372 behavior_id: "test".to_string(), 373 timestamp_ms: current_time_ms(), 374 duration_ms: 100, 375 success: false, 376 }); 377 378 assert_eq!(aggregator.error_rate(), 0.5); 379 } 380 381 #[test] 382 fn test_active_bots() { 383 let aggregator = MetricsAggregator::new(); 384 385 aggregator.process_event(&BotEvent::BotStarted { 386 bot_id: "bot-1".to_string(), 387 role: "trader".to_string(), 388 timestamp_ms: current_time_ms(), 389 }); 390 391 aggregator.process_event(&BotEvent::BotStarted { 392 bot_id: "bot-2".to_string(), 393 role: "trader".to_string(), 394 timestamp_ms: current_time_ms(), 395 }); 396 397 assert_eq!(aggregator.active_bot_count(), 2); 398 399 aggregator.process_event(&BotEvent::BotStopped { 400 bot_id: "bot-1".to_string(), 401 timestamp_ms: current_time_ms(), 402 reason: "test".to_string(), 403 }); 404 405 assert_eq!(aggregator.active_bot_count(), 1); 406 } 407 }