/ crates / metrics / src / aggregator.rs
aggregator.rs
  1  /// Metrics aggregator with HDR histogram for latency tracking
  2  use crate::BotEvent;
  3  use hdrhistogram::Histogram;
  4  use parking_lot::RwLock;
  5  use std::collections::HashMap;
  6  use std::sync::Arc;
  7  
  8  /// Real-time metrics aggregator
  9  #[derive(Clone)]
 10  pub struct MetricsAggregator {
 11      state: Arc<RwLock<AggregatorState>>,
 12  }
 13  
 14  struct AggregatorState {
 15      /// Latency histogram (in microseconds, up to 1 minute)
 16      latency_histogram: Histogram<u64>,
 17  
 18      /// Total operations
 19      total_operations: u64,
 20  
 21      /// Total errors
 22      total_errors: u64,
 23  
 24      /// Operations per bot
 25      bot_operations: HashMap<String, u64>,
 26  
 27      /// Start time for TPS calculation
 28      start_time_ms: i64,
 29  
 30      /// Window start for rolling metrics
 31      window_start_ms: i64,
 32  
 33      /// Operations in current window
 34      window_operations: u64,
 35  
 36      /// Active bots
 37      active_bots: HashMap<String, i64>,  // bot_id -> last_seen_ms
 38  
 39      /// Bots by role (for Prometheus metrics)
 40      bots_by_role: HashMap<String, usize>,
 41  
 42      /// Behavior success tracking
 43      behavior_successes: HashMap<String, u64>,
 44      behavior_failures: HashMap<String, u64>,
 45  
 46      /// Active scenario name
 47      active_scenario: Option<String>,
 48  
 49      /// Scenario progress (0.0 to 1.0)
 50      scenario_progress: Option<f64>,
 51  
 52      /// Worker bot counts (distributed mode)
 53      workers: HashMap<String, usize>,
 54  }
 55  
 56  impl MetricsAggregator {
 57      /// Create a new metrics aggregator
 58      pub fn new() -> Self {
 59          Self {
 60              state: Arc::new(RwLock::new(AggregatorState {
 61                  // HDR Histogram: 1 microsecond to 60 seconds, 3 significant digits
 62                  latency_histogram: Histogram::new_with_bounds(1, 60_000_000, 3)
 63                      .expect("Failed to create histogram"),
 64                  total_operations: 0,
 65                  total_errors: 0,
 66                  bot_operations: HashMap::new(),
 67                  start_time_ms: current_time_ms(),
 68                  window_start_ms: current_time_ms(),
 69                  window_operations: 0,
 70                  active_bots: HashMap::new(),
 71                  bots_by_role: HashMap::new(),
 72                  behavior_successes: HashMap::new(),
 73                  behavior_failures: HashMap::new(),
 74                  active_scenario: None,
 75                  scenario_progress: None,
 76                  workers: HashMap::new(),
 77              })),
 78          }
 79      }
 80  
 81      /// Process a single event
 82      pub fn process_event(&self, event: &BotEvent) {
 83          let mut state = self.state.write();
 84  
 85          match event {
 86              BotEvent::BotStarted { bot_id, role, timestamp_ms } => {
 87                  state.active_bots.insert(bot_id.clone(), *timestamp_ms);
 88                  *state.bots_by_role.entry(role.clone()).or_insert(0) += 1;
 89              }
 90  
 91              BotEvent::BotStopped { bot_id, .. } => {
 92                  state.active_bots.remove(bot_id);
 93                  // Note: Not removing from bots_by_role as we want cumulative counts
 94              }
 95  
 96              BotEvent::BehaviorCompleted {
 97                  bot_id,
 98                  behavior_id,
 99                  duration_ms,
100                  success,
101                  ..
102              } => {
103                  state.total_operations += 1;
104                  state.window_operations += 1;
105  
106                  *state.bot_operations.entry(bot_id.clone()).or_insert(0) += 1;
107  
108                  if *success {
109                      *state.behavior_successes.entry(behavior_id.clone()).or_insert(0) += 1;
110                  } else {
111                      *state.behavior_failures.entry(behavior_id.clone()).or_insert(0) += 1;
112                      state.total_errors += 1;
113                  }
114  
115                  // Record latency in microseconds
116                  let _ = state.latency_histogram.record(*duration_ms * 1000);
117              }
118  
119              BotEvent::TransactionConfirmed {
120                  confirmation_time_ms,
121                  ..
122              } => {
123                  state.total_operations += 1;
124                  state.window_operations += 1;
125                  let _ = state.latency_histogram.record(*confirmation_time_ms * 1000);
126              }
127  
128              BotEvent::NetworkResponse { latency_ms, .. } => {
129                  let _ = state.latency_histogram.record(*latency_ms * 1000);
130              }
131  
132              BotEvent::BotError { .. } | BotEvent::TransactionFailed { .. } => {
133                  state.total_errors += 1;
134              }
135  
136              _ => {}
137          }
138      }
139  
140      /// Process a batch of events
141      pub fn process_batch(&self, events: &[BotEvent]) {
142          for event in events {
143              self.process_event(event);
144          }
145      }
146  
147      /// Get current TPS (transactions per second)
148      pub fn tps(&self) -> f64 {
149          let state = self.state.read();
150          let elapsed_ms = current_time_ms() - state.start_time_ms;
151  
152          if elapsed_ms == 0 {
153              return 0.0;
154          }
155  
156          (state.total_operations as f64) / (elapsed_ms as f64 / 1000.0)
157      }
158  
159      /// Get TPS for a rolling window (last N milliseconds)
160      pub fn window_tps(&self, window_ms: i64) -> f64 {
161          let mut state = self.state.write();
162          let now = current_time_ms();
163  
164          // Reset window if expired
165          if now - state.window_start_ms > window_ms {
166              state.window_start_ms = now;
167              state.window_operations = 0;
168          }
169  
170          let elapsed = now - state.window_start_ms;
171          if elapsed == 0 {
172              return 0.0;
173          }
174  
175          (state.window_operations as f64) / (elapsed as f64 / 1000.0)
176      }
177  
178      /// Get latency percentile (in milliseconds)
179      pub fn latency_percentile(&self, percentile: f64) -> f64 {
180          let state = self.state.read();
181  
182          if state.latency_histogram.len() == 0 {
183              return 0.0;
184          }
185  
186          let value_us = state.latency_histogram.value_at_quantile(percentile);
187          value_us as f64 / 1000.0  // Convert to milliseconds
188      }
189  
190      /// Get p50 latency
191      pub fn latency_p50(&self) -> f64 {
192          self.latency_percentile(0.50)
193      }
194  
195      /// Get p95 latency
196      pub fn latency_p95(&self) -> f64 {
197          self.latency_percentile(0.95)
198      }
199  
200      /// Get p99 latency
201      pub fn latency_p99(&self) -> f64 {
202          self.latency_percentile(0.99)
203      }
204  
205      /// Get error rate (errors / total operations)
206      pub fn error_rate(&self) -> f64 {
207          let state = self.state.read();
208  
209          if state.total_operations == 0 {
210              return 0.0;
211          }
212  
213          (state.total_errors as f64) / (state.total_operations as f64)
214      }
215  
216      /// Get total operations
217      pub fn total_operations(&self) -> u64 {
218          self.state.read().total_operations
219      }
220  
221      /// Get total errors
222      pub fn total_errors(&self) -> u64 {
223          self.state.read().total_errors
224      }
225  
226      /// Get active bot count
227      pub fn active_bot_count(&self) -> usize {
228          self.state.read().active_bots.len()
229      }
230  
231      /// Get operations per bot
232      pub fn bot_operations(&self) -> HashMap<String, u64> {
233          self.state.read().bot_operations.clone()
234      }
235  
236      /// Get full metrics snapshot
237      pub fn snapshot(&self) -> MetricsSnapshot {
238          let state = self.state.read();
239  
240          // Calculate behavior success rates
241          let mut behavior_success_rates = HashMap::new();
242          for (behavior_id, successes) in &state.behavior_successes {
243              let failures = state.behavior_failures.get(behavior_id).unwrap_or(&0);
244              let total = successes + failures;
245              if total > 0 {
246                  behavior_success_rates.insert(behavior_id.clone(), *successes as f64 / total as f64);
247              }
248          }
249  
250          MetricsSnapshot {
251              timestamp_ms: current_time_ms(),
252              tps: self.tps(),
253              latency_p50_ms: self.latency_p50(),
254              latency_p95_ms: self.latency_p95(),
255              latency_p99_ms: self.latency_p99(),
256              error_rate: self.error_rate(),
257              total_operations: self.total_operations(),
258              total_errors: self.total_errors(),
259              active_bots: self.active_bot_count(),
260              bots_by_role: state.bots_by_role.clone(),
261              behavior_success_rates,
262              active_scenario: state.active_scenario.clone(),
263              scenario_progress: state.scenario_progress,
264              workers: state.workers.clone(),
265          }
266      }
267  
268      /// Set active scenario (for scenario tracking)
269      pub fn set_active_scenario(&self, name: Option<String>) {
270          self.state.write().active_scenario = name;
271      }
272  
273      /// Update scenario progress (0.0 to 1.0)
274      pub fn set_scenario_progress(&self, progress: f64) {
275          self.state.write().scenario_progress = Some(progress.clamp(0.0, 1.0));
276      }
277  
278      /// Update worker bot counts (distributed mode)
279      pub fn set_worker_bots(&self, worker_id: String, bot_count: usize) {
280          self.state.write().workers.insert(worker_id, bot_count);
281      }
282  
283      /// Remove worker (distributed mode)
284      pub fn remove_worker(&self, worker_id: &str) {
285          self.state.write().workers.remove(worker_id);
286      }
287  
288      /// Reset all metrics
289      pub fn reset(&self) {
290          let mut state = self.state.write();
291  
292          state.latency_histogram.clear();
293          state.total_operations = 0;
294          state.total_errors = 0;
295          state.bot_operations.clear();
296          state.start_time_ms = current_time_ms();
297          state.window_start_ms = current_time_ms();
298          state.window_operations = 0;
299          state.bots_by_role.clear();
300          state.behavior_successes.clear();
301          state.behavior_failures.clear();
302      }
303  }
304  
305  impl Default for MetricsAggregator {
306      fn default() -> Self {
307          Self::new()
308      }
309  }
310  
311  /// Metrics snapshot at a point in time
312  #[derive(Debug, Clone)]
313  pub struct MetricsSnapshot {
314      pub timestamp_ms: i64,
315      pub tps: f64,
316      pub latency_p50_ms: f64,
317      pub latency_p95_ms: f64,
318      pub latency_p99_ms: f64,
319      pub error_rate: f64,
320      pub total_operations: u64,
321      pub total_errors: u64,
322      pub active_bots: usize,
323      pub bots_by_role: HashMap<String, usize>,
324      pub behavior_success_rates: HashMap<String, f64>,
325      pub active_scenario: Option<String>,
326      pub scenario_progress: Option<f64>,
327      pub workers: HashMap<String, usize>,
328  }
329  
330  fn current_time_ms() -> i64 {
331      std::time::SystemTime::now()
332          .duration_since(std::time::UNIX_EPOCH)
333          .unwrap_or_default()
334          .as_millis() as i64
335  }
336  
337  #[cfg(test)]
338  mod tests {
339      use super::*;
340  
341      #[test]
342      fn test_metrics_aggregation() {
343          let aggregator = MetricsAggregator::new();
344  
345          // Process some events
346          aggregator.process_event(&BotEvent::BehaviorCompleted {
347              bot_id: "bot-1".to_string(),
348              behavior_id: "test".to_string(),
349              timestamp_ms: current_time_ms(),
350              duration_ms: 100,
351              success: true,
352          });
353  
354          assert_eq!(aggregator.total_operations(), 1);
355          assert_eq!(aggregator.total_errors(), 0);
356      }
357  
358      #[test]
359      fn test_error_rate() {
360          let aggregator = MetricsAggregator::new();
361  
362          aggregator.process_event(&BotEvent::BehaviorCompleted {
363              bot_id: "bot-1".to_string(),
364              behavior_id: "test".to_string(),
365              timestamp_ms: current_time_ms(),
366              duration_ms: 100,
367              success: true,
368          });
369  
370          aggregator.process_event(&BotEvent::BehaviorCompleted {
371              bot_id: "bot-1".to_string(),
372              behavior_id: "test".to_string(),
373              timestamp_ms: current_time_ms(),
374              duration_ms: 100,
375              success: false,
376          });
377  
378          assert_eq!(aggregator.error_rate(), 0.5);
379      }
380  
381      #[test]
382      fn test_active_bots() {
383          let aggregator = MetricsAggregator::new();
384  
385          aggregator.process_event(&BotEvent::BotStarted {
386              bot_id: "bot-1".to_string(),
387              role: "trader".to_string(),
388              timestamp_ms: current_time_ms(),
389          });
390  
391          aggregator.process_event(&BotEvent::BotStarted {
392              bot_id: "bot-2".to_string(),
393              role: "trader".to_string(),
394              timestamp_ms: current_time_ms(),
395          });
396  
397          assert_eq!(aggregator.active_bot_count(), 2);
398  
399          aggregator.process_event(&BotEvent::BotStopped {
400              bot_id: "bot-1".to_string(),
401              timestamp_ms: current_time_ms(),
402              reason: "test".to_string(),
403          });
404  
405          assert_eq!(aggregator.active_bot_count(), 1);
406      }
407  }