metrics.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // SPDX-License-Identifier: Apache-2.0 3 4 //! Prometheus-compatible metrics for DeltaOS node 5 //! 6 //! Exposes metrics at `/metrics` endpoint in Prometheus text format. 7 8 use axum::{Router, extract::State, response::IntoResponse, routing::get}; 9 use std::sync::Arc; 10 use std::sync::atomic::{AtomicU64, Ordering}; 11 use std::time::Instant; 12 use tokio::net::TcpListener; 13 14 /// Metrics registry holding all node metrics 15 #[derive(Debug)] 16 pub struct Metrics { 17 /// Node start time for uptime calculation 18 start_time: Instant, 19 20 // Consensus metrics 21 /// Current block height 22 pub block_height: AtomicU64, 23 /// Total blocks produced by this node 24 pub blocks_produced: AtomicU64, 25 /// Total blocks received from peers 26 pub blocks_received: AtomicU64, 27 28 // Transaction metrics 29 /// Total transactions processed 30 pub transactions_processed: AtomicU64, 31 /// Pending transactions in mempool 32 pub transactions_pending: AtomicU64, 33 /// Failed transactions 34 pub transactions_failed: AtomicU64, 35 36 // Network metrics 37 /// Connected peers count 38 pub peer_count: AtomicU64, 39 /// Total bytes sent 40 pub bytes_sent: AtomicU64, 41 /// Total bytes received 42 pub bytes_received: AtomicU64, 43 /// Total P2P messages sent 44 pub messages_sent: AtomicU64, 45 /// Total P2P messages received 46 pub messages_received: AtomicU64, 47 48 // Storage metrics 49 /// Storage size in bytes 50 pub storage_size_bytes: AtomicU64, 51 /// Total accounts in state 52 pub account_count: AtomicU64, 53 54 // Exchange metrics (DELTA specific) 55 /// Total trades executed 56 pub trades_executed: AtomicU64, 57 /// Open orders count 58 pub open_orders: AtomicU64, 59 /// 24h trading volume (in base units) 60 pub trading_volume_24h: AtomicU64, 61 } 62 63 impl Default for Metrics { 64 fn default() -> Self { 65 Self::new() 66 } 67 } 68 69 impl Metrics { 70 /// Create a new metrics registry 71 pub fn new() -> Self { 72 Self { 73 start_time: Instant::now(), 74 block_height: AtomicU64::new(0), 75 blocks_produced: AtomicU64::new(0), 76 blocks_received: AtomicU64::new(0), 77 transactions_processed: AtomicU64::new(0), 78 transactions_pending: AtomicU64::new(0), 79 transactions_failed: AtomicU64::new(0), 80 peer_count: AtomicU64::new(0), 81 bytes_sent: AtomicU64::new(0), 82 bytes_received: AtomicU64::new(0), 83 messages_sent: AtomicU64::new(0), 84 messages_received: AtomicU64::new(0), 85 storage_size_bytes: AtomicU64::new(0), 86 account_count: AtomicU64::new(0), 87 trades_executed: AtomicU64::new(0), 88 open_orders: AtomicU64::new(0), 89 trading_volume_24h: AtomicU64::new(0), 90 } 91 } 92 93 /// Get uptime in seconds 94 pub fn uptime_seconds(&self) -> u64 { 95 self.start_time.elapsed().as_secs() 96 } 97 98 /// Increment block height 99 pub fn set_block_height(&self, height: u64) { 100 self.block_height.store(height, Ordering::Relaxed); 101 } 102 103 /// Increment blocks produced 104 pub fn inc_blocks_produced(&self) { 105 self.blocks_produced.fetch_add(1, Ordering::Relaxed); 106 } 107 108 /// Increment blocks received 109 pub fn inc_blocks_received(&self) { 110 self.blocks_received.fetch_add(1, Ordering::Relaxed); 111 } 112 113 /// Increment transactions processed 114 pub fn inc_transactions_processed(&self, count: u64) { 115 self.transactions_processed 116 .fetch_add(count, Ordering::Relaxed); 117 } 118 119 /// Set pending transaction count 120 pub fn set_transactions_pending(&self, count: u64) { 121 self.transactions_pending.store(count, Ordering::Relaxed); 122 } 123 124 /// Increment failed transactions 125 pub fn inc_transactions_failed(&self) { 126 self.transactions_failed.fetch_add(1, Ordering::Relaxed); 127 } 128 129 /// Set peer count 130 pub fn set_peer_count(&self, count: u64) { 131 self.peer_count.store(count, Ordering::Relaxed); 132 } 133 134 /// Add bytes sent 135 pub fn add_bytes_sent(&self, bytes: u64) { 136 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed); 137 } 138 139 /// Add bytes received 140 pub fn add_bytes_received(&self, bytes: u64) { 141 self.bytes_received.fetch_add(bytes, Ordering::Relaxed); 142 } 143 144 /// Increment messages sent 145 pub fn inc_messages_sent(&self) { 146 self.messages_sent.fetch_add(1, Ordering::Relaxed); 147 } 148 149 /// Increment messages received 150 pub fn inc_messages_received(&self) { 151 self.messages_received.fetch_add(1, Ordering::Relaxed); 152 } 153 154 /// Set storage size 155 pub fn set_storage_size(&self, bytes: u64) { 156 self.storage_size_bytes.store(bytes, Ordering::Relaxed); 157 } 158 159 /// Set account count 160 pub fn set_account_count(&self, count: u64) { 161 self.account_count.store(count, Ordering::Relaxed); 162 } 163 164 /// Increment trades executed 165 pub fn inc_trades_executed(&self) { 166 self.trades_executed.fetch_add(1, Ordering::Relaxed); 167 } 168 169 /// Set open orders count 170 pub fn set_open_orders(&self, count: u64) { 171 self.open_orders.store(count, Ordering::Relaxed); 172 } 173 174 /// Set 24h trading volume 175 pub fn set_trading_volume_24h(&self, volume: u64) { 176 self.trading_volume_24h.store(volume, Ordering::Relaxed); 177 } 178 179 /// Render metrics in Prometheus text format 180 pub fn render(&self) -> String { 181 let mut output = String::with_capacity(4096); 182 183 // Node info 184 output.push_str("# HELP deltaos_info Node information\n"); 185 output.push_str("# TYPE deltaos_info gauge\n"); 186 output.push_str("deltaos_info{version=\"0.1.0\"} 1\n\n"); 187 188 // Uptime 189 output.push_str("# HELP deltaos_uptime_seconds Node uptime in seconds\n"); 190 output.push_str("# TYPE deltaos_uptime_seconds counter\n"); 191 output.push_str(&format!( 192 "deltaos_uptime_seconds {}\n\n", 193 self.uptime_seconds() 194 )); 195 196 // Block height 197 output.push_str("# HELP deltaos_block_height Current block height\n"); 198 output.push_str("# TYPE deltaos_block_height gauge\n"); 199 output.push_str(&format!( 200 "deltaos_block_height {}\n\n", 201 self.block_height.load(Ordering::Relaxed) 202 )); 203 204 // Blocks produced 205 output.push_str("# HELP deltaos_blocks_produced_total Total blocks produced\n"); 206 output.push_str("# TYPE deltaos_blocks_produced_total counter\n"); 207 output.push_str(&format!( 208 "deltaos_blocks_produced_total {}\n\n", 209 self.blocks_produced.load(Ordering::Relaxed) 210 )); 211 212 // Blocks received 213 output.push_str("# HELP deltaos_blocks_received_total Total blocks received from peers\n"); 214 output.push_str("# TYPE deltaos_blocks_received_total counter\n"); 215 output.push_str(&format!( 216 "deltaos_blocks_received_total {}\n\n", 217 self.blocks_received.load(Ordering::Relaxed) 218 )); 219 220 // Transactions processed 221 output 222 .push_str("# HELP deltaos_transactions_processed_total Total transactions processed\n"); 223 output.push_str("# TYPE deltaos_transactions_processed_total counter\n"); 224 output.push_str(&format!( 225 "deltaos_transactions_processed_total {}\n\n", 226 self.transactions_processed.load(Ordering::Relaxed) 227 )); 228 229 // Transactions pending 230 output.push_str("# HELP deltaos_transactions_pending Pending transactions in mempool\n"); 231 output.push_str("# TYPE deltaos_transactions_pending gauge\n"); 232 output.push_str(&format!( 233 "deltaos_transactions_pending {}\n\n", 234 self.transactions_pending.load(Ordering::Relaxed) 235 )); 236 237 // Transactions failed 238 output.push_str("# HELP deltaos_transactions_failed_total Total failed transactions\n"); 239 output.push_str("# TYPE deltaos_transactions_failed_total counter\n"); 240 output.push_str(&format!( 241 "deltaos_transactions_failed_total {}\n\n", 242 self.transactions_failed.load(Ordering::Relaxed) 243 )); 244 245 // Peer count 246 output.push_str("# HELP deltaos_peer_count Connected peers\n"); 247 output.push_str("# TYPE deltaos_peer_count gauge\n"); 248 output.push_str(&format!( 249 "deltaos_peer_count {}\n\n", 250 self.peer_count.load(Ordering::Relaxed) 251 )); 252 253 // Network bytes 254 output.push_str("# HELP deltaos_network_bytes_total Total network bytes\n"); 255 output.push_str("# TYPE deltaos_network_bytes_total counter\n"); 256 output.push_str(&format!( 257 "deltaos_network_bytes_total{{direction=\"sent\"}} {}\n", 258 self.bytes_sent.load(Ordering::Relaxed) 259 )); 260 output.push_str(&format!( 261 "deltaos_network_bytes_total{{direction=\"received\"}} {}\n\n", 262 self.bytes_received.load(Ordering::Relaxed) 263 )); 264 265 // Network messages 266 output.push_str("# HELP deltaos_network_messages_total Total P2P messages\n"); 267 output.push_str("# TYPE deltaos_network_messages_total counter\n"); 268 output.push_str(&format!( 269 "deltaos_network_messages_total{{direction=\"sent\"}} {}\n", 270 self.messages_sent.load(Ordering::Relaxed) 271 )); 272 output.push_str(&format!( 273 "deltaos_network_messages_total{{direction=\"received\"}} {}\n\n", 274 self.messages_received.load(Ordering::Relaxed) 275 )); 276 277 // Storage size 278 output.push_str("# HELP deltaos_storage_bytes Storage size in bytes\n"); 279 output.push_str("# TYPE deltaos_storage_bytes gauge\n"); 280 output.push_str(&format!( 281 "deltaos_storage_bytes {}\n\n", 282 self.storage_size_bytes.load(Ordering::Relaxed) 283 )); 284 285 // Account count 286 output.push_str("# HELP deltaos_account_count Total accounts in state\n"); 287 output.push_str("# TYPE deltaos_account_count gauge\n"); 288 output.push_str(&format!( 289 "deltaos_account_count {}\n\n", 290 self.account_count.load(Ordering::Relaxed) 291 )); 292 293 // Exchange metrics 294 output.push_str("# HELP deltaos_trades_total Total trades executed\n"); 295 output.push_str("# TYPE deltaos_trades_total counter\n"); 296 output.push_str(&format!( 297 "deltaos_trades_total {}\n\n", 298 self.trades_executed.load(Ordering::Relaxed) 299 )); 300 301 output.push_str("# HELP deltaos_open_orders Current open orders\n"); 302 output.push_str("# TYPE deltaos_open_orders gauge\n"); 303 output.push_str(&format!( 304 "deltaos_open_orders {}\n\n", 305 self.open_orders.load(Ordering::Relaxed) 306 )); 307 308 output.push_str("# HELP deltaos_trading_volume_24h 24-hour trading volume\n"); 309 output.push_str("# TYPE deltaos_trading_volume_24h gauge\n"); 310 output.push_str(&format!( 311 "deltaos_trading_volume_24h {}\n", 312 self.trading_volume_24h.load(Ordering::Relaxed) 313 )); 314 315 output 316 } 317 } 318 319 /// Metrics server handle 320 pub struct MetricsServer { 321 metrics: Arc<Metrics>, 322 } 323 324 impl MetricsServer { 325 /// Create a new metrics server 326 pub fn new(metrics: Arc<Metrics>) -> Self { 327 Self { metrics } 328 } 329 330 /// Start the metrics server 331 pub async fn start(&self, port: u16) -> anyhow::Result<()> { 332 let app = Router::new() 333 .route("/metrics", get(metrics_handler)) 334 .route("/health", get(health_handler)) 335 .with_state(self.metrics.clone()); 336 337 let addr = format!("0.0.0.0:{}", port); 338 let listener = TcpListener::bind(&addr).await?; 339 340 tracing::info!("Metrics server listening on {}", addr); 341 342 axum::serve(listener, app).await?; 343 344 Ok(()) 345 } 346 } 347 348 /// Metrics endpoint handler 349 async fn metrics_handler(State(metrics): State<Arc<Metrics>>) -> impl IntoResponse { 350 ( 351 [("Content-Type", "text/plain; version=0.0.4; charset=utf-8")], 352 metrics.render(), 353 ) 354 } 355 356 /// Health check endpoint 357 async fn health_handler() -> impl IntoResponse { 358 "OK" 359 } 360 361 #[cfg(test)] 362 mod tests { 363 use super::*; 364 365 #[test] 366 fn test_metrics_creation() { 367 let metrics = Metrics::new(); 368 assert_eq!(metrics.block_height.load(Ordering::Relaxed), 0); 369 assert_eq!(metrics.peer_count.load(Ordering::Relaxed), 0); 370 } 371 372 #[test] 373 fn test_metrics_increment() { 374 let metrics = Metrics::new(); 375 376 metrics.set_block_height(100); 377 assert_eq!(metrics.block_height.load(Ordering::Relaxed), 100); 378 379 metrics.inc_blocks_produced(); 380 metrics.inc_blocks_produced(); 381 assert_eq!(metrics.blocks_produced.load(Ordering::Relaxed), 2); 382 383 metrics.set_peer_count(5); 384 assert_eq!(metrics.peer_count.load(Ordering::Relaxed), 5); 385 } 386 387 #[test] 388 fn test_metrics_render() { 389 let metrics = Metrics::new(); 390 metrics.set_block_height(1000); 391 metrics.set_peer_count(10); 392 393 let output = metrics.render(); 394 395 assert!(output.contains("deltaos_block_height 1000")); 396 assert!(output.contains("deltaos_peer_count 10")); 397 assert!(output.contains("# HELP")); 398 assert!(output.contains("# TYPE")); 399 } 400 401 #[test] 402 fn test_uptime() { 403 let metrics = Metrics::new(); 404 std::thread::sleep(std::time::Duration::from_millis(10)); 405 // uptime_seconds returns u64, so always >= 0 406 let _ = metrics.uptime_seconds(); 407 } 408 409 #[test] 410 fn test_network_metrics() { 411 let metrics = Metrics::new(); 412 413 metrics.add_bytes_sent(1000); 414 metrics.add_bytes_received(2000); 415 metrics.inc_messages_sent(); 416 metrics.inc_messages_received(); 417 metrics.inc_messages_received(); 418 419 assert_eq!(metrics.bytes_sent.load(Ordering::Relaxed), 1000); 420 assert_eq!(metrics.bytes_received.load(Ordering::Relaxed), 2000); 421 assert_eq!(metrics.messages_sent.load(Ordering::Relaxed), 1); 422 assert_eq!(metrics.messages_received.load(Ordering::Relaxed), 2); 423 } 424 425 #[test] 426 fn test_exchange_metrics() { 427 let metrics = Metrics::new(); 428 429 metrics.inc_trades_executed(); 430 metrics.inc_trades_executed(); 431 metrics.set_open_orders(50); 432 metrics.set_trading_volume_24h(1_000_000); 433 434 assert_eq!(metrics.trades_executed.load(Ordering::Relaxed), 2); 435 assert_eq!(metrics.open_orders.load(Ordering::Relaxed), 50); 436 assert_eq!( 437 metrics.trading_volume_24h.load(Ordering::Relaxed), 438 1_000_000 439 ); 440 } 441 }