/ node / src / metrics.rs
metrics.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // SPDX-License-Identifier: Apache-2.0
  3  
  4  //! Prometheus-compatible metrics for DeltaOS node
  5  //!
  6  //! Exposes metrics at `/metrics` endpoint in Prometheus text format.
  7  
  8  use axum::{Router, extract::State, response::IntoResponse, routing::get};
  9  use std::sync::Arc;
 10  use std::sync::atomic::{AtomicU64, Ordering};
 11  use std::time::Instant;
 12  use tokio::net::TcpListener;
 13  
 14  /// Metrics registry holding all node metrics
 15  #[derive(Debug)]
 16  pub struct Metrics {
 17      /// Node start time for uptime calculation
 18      start_time: Instant,
 19  
 20      // Consensus metrics
 21      /// Current block height
 22      pub block_height: AtomicU64,
 23      /// Total blocks produced by this node
 24      pub blocks_produced: AtomicU64,
 25      /// Total blocks received from peers
 26      pub blocks_received: AtomicU64,
 27  
 28      // Transaction metrics
 29      /// Total transactions processed
 30      pub transactions_processed: AtomicU64,
 31      /// Pending transactions in mempool
 32      pub transactions_pending: AtomicU64,
 33      /// Failed transactions
 34      pub transactions_failed: AtomicU64,
 35  
 36      // Network metrics
 37      /// Connected peers count
 38      pub peer_count: AtomicU64,
 39      /// Total bytes sent
 40      pub bytes_sent: AtomicU64,
 41      /// Total bytes received
 42      pub bytes_received: AtomicU64,
 43      /// Total P2P messages sent
 44      pub messages_sent: AtomicU64,
 45      /// Total P2P messages received
 46      pub messages_received: AtomicU64,
 47  
 48      // Storage metrics
 49      /// Storage size in bytes
 50      pub storage_size_bytes: AtomicU64,
 51      /// Total accounts in state
 52      pub account_count: AtomicU64,
 53  
 54      // Exchange metrics (DELTA specific)
 55      /// Total trades executed
 56      pub trades_executed: AtomicU64,
 57      /// Open orders count
 58      pub open_orders: AtomicU64,
 59      /// 24h trading volume (in base units)
 60      pub trading_volume_24h: AtomicU64,
 61  }
 62  
 63  impl Default for Metrics {
 64      fn default() -> Self {
 65          Self::new()
 66      }
 67  }
 68  
 69  impl Metrics {
 70      /// Create a new metrics registry
 71      pub fn new() -> Self {
 72          Self {
 73              start_time: Instant::now(),
 74              block_height: AtomicU64::new(0),
 75              blocks_produced: AtomicU64::new(0),
 76              blocks_received: AtomicU64::new(0),
 77              transactions_processed: AtomicU64::new(0),
 78              transactions_pending: AtomicU64::new(0),
 79              transactions_failed: AtomicU64::new(0),
 80              peer_count: AtomicU64::new(0),
 81              bytes_sent: AtomicU64::new(0),
 82              bytes_received: AtomicU64::new(0),
 83              messages_sent: AtomicU64::new(0),
 84              messages_received: AtomicU64::new(0),
 85              storage_size_bytes: AtomicU64::new(0),
 86              account_count: AtomicU64::new(0),
 87              trades_executed: AtomicU64::new(0),
 88              open_orders: AtomicU64::new(0),
 89              trading_volume_24h: AtomicU64::new(0),
 90          }
 91      }
 92  
 93      /// Get uptime in seconds
 94      pub fn uptime_seconds(&self) -> u64 {
 95          self.start_time.elapsed().as_secs()
 96      }
 97  
 98      /// Increment block height
 99      pub fn set_block_height(&self, height: u64) {
100          self.block_height.store(height, Ordering::Relaxed);
101      }
102  
103      /// Increment blocks produced
104      pub fn inc_blocks_produced(&self) {
105          self.blocks_produced.fetch_add(1, Ordering::Relaxed);
106      }
107  
108      /// Increment blocks received
109      pub fn inc_blocks_received(&self) {
110          self.blocks_received.fetch_add(1, Ordering::Relaxed);
111      }
112  
113      /// Increment transactions processed
114      pub fn inc_transactions_processed(&self, count: u64) {
115          self.transactions_processed
116              .fetch_add(count, Ordering::Relaxed);
117      }
118  
119      /// Set pending transaction count
120      pub fn set_transactions_pending(&self, count: u64) {
121          self.transactions_pending.store(count, Ordering::Relaxed);
122      }
123  
124      /// Increment failed transactions
125      pub fn inc_transactions_failed(&self) {
126          self.transactions_failed.fetch_add(1, Ordering::Relaxed);
127      }
128  
129      /// Set peer count
130      pub fn set_peer_count(&self, count: u64) {
131          self.peer_count.store(count, Ordering::Relaxed);
132      }
133  
134      /// Add bytes sent
135      pub fn add_bytes_sent(&self, bytes: u64) {
136          self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
137      }
138  
139      /// Add bytes received
140      pub fn add_bytes_received(&self, bytes: u64) {
141          self.bytes_received.fetch_add(bytes, Ordering::Relaxed);
142      }
143  
144      /// Increment messages sent
145      pub fn inc_messages_sent(&self) {
146          self.messages_sent.fetch_add(1, Ordering::Relaxed);
147      }
148  
149      /// Increment messages received
150      pub fn inc_messages_received(&self) {
151          self.messages_received.fetch_add(1, Ordering::Relaxed);
152      }
153  
154      /// Set storage size
155      pub fn set_storage_size(&self, bytes: u64) {
156          self.storage_size_bytes.store(bytes, Ordering::Relaxed);
157      }
158  
159      /// Set account count
160      pub fn set_account_count(&self, count: u64) {
161          self.account_count.store(count, Ordering::Relaxed);
162      }
163  
164      /// Increment trades executed
165      pub fn inc_trades_executed(&self) {
166          self.trades_executed.fetch_add(1, Ordering::Relaxed);
167      }
168  
169      /// Set open orders count
170      pub fn set_open_orders(&self, count: u64) {
171          self.open_orders.store(count, Ordering::Relaxed);
172      }
173  
174      /// Set 24h trading volume
175      pub fn set_trading_volume_24h(&self, volume: u64) {
176          self.trading_volume_24h.store(volume, Ordering::Relaxed);
177      }
178  
179      /// Render metrics in Prometheus text format
180      pub fn render(&self) -> String {
181          let mut output = String::with_capacity(4096);
182  
183          // Node info
184          output.push_str("# HELP deltaos_info Node information\n");
185          output.push_str("# TYPE deltaos_info gauge\n");
186          output.push_str("deltaos_info{version=\"0.1.0\"} 1\n\n");
187  
188          // Uptime
189          output.push_str("# HELP deltaos_uptime_seconds Node uptime in seconds\n");
190          output.push_str("# TYPE deltaos_uptime_seconds counter\n");
191          output.push_str(&format!(
192              "deltaos_uptime_seconds {}\n\n",
193              self.uptime_seconds()
194          ));
195  
196          // Block height
197          output.push_str("# HELP deltaos_block_height Current block height\n");
198          output.push_str("# TYPE deltaos_block_height gauge\n");
199          output.push_str(&format!(
200              "deltaos_block_height {}\n\n",
201              self.block_height.load(Ordering::Relaxed)
202          ));
203  
204          // Blocks produced
205          output.push_str("# HELP deltaos_blocks_produced_total Total blocks produced\n");
206          output.push_str("# TYPE deltaos_blocks_produced_total counter\n");
207          output.push_str(&format!(
208              "deltaos_blocks_produced_total {}\n\n",
209              self.blocks_produced.load(Ordering::Relaxed)
210          ));
211  
212          // Blocks received
213          output.push_str("# HELP deltaos_blocks_received_total Total blocks received from peers\n");
214          output.push_str("# TYPE deltaos_blocks_received_total counter\n");
215          output.push_str(&format!(
216              "deltaos_blocks_received_total {}\n\n",
217              self.blocks_received.load(Ordering::Relaxed)
218          ));
219  
220          // Transactions processed
221          output
222              .push_str("# HELP deltaos_transactions_processed_total Total transactions processed\n");
223          output.push_str("# TYPE deltaos_transactions_processed_total counter\n");
224          output.push_str(&format!(
225              "deltaos_transactions_processed_total {}\n\n",
226              self.transactions_processed.load(Ordering::Relaxed)
227          ));
228  
229          // Transactions pending
230          output.push_str("# HELP deltaos_transactions_pending Pending transactions in mempool\n");
231          output.push_str("# TYPE deltaos_transactions_pending gauge\n");
232          output.push_str(&format!(
233              "deltaos_transactions_pending {}\n\n",
234              self.transactions_pending.load(Ordering::Relaxed)
235          ));
236  
237          // Transactions failed
238          output.push_str("# HELP deltaos_transactions_failed_total Total failed transactions\n");
239          output.push_str("# TYPE deltaos_transactions_failed_total counter\n");
240          output.push_str(&format!(
241              "deltaos_transactions_failed_total {}\n\n",
242              self.transactions_failed.load(Ordering::Relaxed)
243          ));
244  
245          // Peer count
246          output.push_str("# HELP deltaos_peer_count Connected peers\n");
247          output.push_str("# TYPE deltaos_peer_count gauge\n");
248          output.push_str(&format!(
249              "deltaos_peer_count {}\n\n",
250              self.peer_count.load(Ordering::Relaxed)
251          ));
252  
253          // Network bytes
254          output.push_str("# HELP deltaos_network_bytes_total Total network bytes\n");
255          output.push_str("# TYPE deltaos_network_bytes_total counter\n");
256          output.push_str(&format!(
257              "deltaos_network_bytes_total{{direction=\"sent\"}} {}\n",
258              self.bytes_sent.load(Ordering::Relaxed)
259          ));
260          output.push_str(&format!(
261              "deltaos_network_bytes_total{{direction=\"received\"}} {}\n\n",
262              self.bytes_received.load(Ordering::Relaxed)
263          ));
264  
265          // Network messages
266          output.push_str("# HELP deltaos_network_messages_total Total P2P messages\n");
267          output.push_str("# TYPE deltaos_network_messages_total counter\n");
268          output.push_str(&format!(
269              "deltaos_network_messages_total{{direction=\"sent\"}} {}\n",
270              self.messages_sent.load(Ordering::Relaxed)
271          ));
272          output.push_str(&format!(
273              "deltaos_network_messages_total{{direction=\"received\"}} {}\n\n",
274              self.messages_received.load(Ordering::Relaxed)
275          ));
276  
277          // Storage size
278          output.push_str("# HELP deltaos_storage_bytes Storage size in bytes\n");
279          output.push_str("# TYPE deltaos_storage_bytes gauge\n");
280          output.push_str(&format!(
281              "deltaos_storage_bytes {}\n\n",
282              self.storage_size_bytes.load(Ordering::Relaxed)
283          ));
284  
285          // Account count
286          output.push_str("# HELP deltaos_account_count Total accounts in state\n");
287          output.push_str("# TYPE deltaos_account_count gauge\n");
288          output.push_str(&format!(
289              "deltaos_account_count {}\n\n",
290              self.account_count.load(Ordering::Relaxed)
291          ));
292  
293          // Exchange metrics
294          output.push_str("# HELP deltaos_trades_total Total trades executed\n");
295          output.push_str("# TYPE deltaos_trades_total counter\n");
296          output.push_str(&format!(
297              "deltaos_trades_total {}\n\n",
298              self.trades_executed.load(Ordering::Relaxed)
299          ));
300  
301          output.push_str("# HELP deltaos_open_orders Current open orders\n");
302          output.push_str("# TYPE deltaos_open_orders gauge\n");
303          output.push_str(&format!(
304              "deltaos_open_orders {}\n\n",
305              self.open_orders.load(Ordering::Relaxed)
306          ));
307  
308          output.push_str("# HELP deltaos_trading_volume_24h 24-hour trading volume\n");
309          output.push_str("# TYPE deltaos_trading_volume_24h gauge\n");
310          output.push_str(&format!(
311              "deltaos_trading_volume_24h {}\n",
312              self.trading_volume_24h.load(Ordering::Relaxed)
313          ));
314  
315          output
316      }
317  }
318  
319  /// Metrics server handle
320  pub struct MetricsServer {
321      metrics: Arc<Metrics>,
322  }
323  
324  impl MetricsServer {
325      /// Create a new metrics server
326      pub fn new(metrics: Arc<Metrics>) -> Self {
327          Self { metrics }
328      }
329  
330      /// Start the metrics server
331      pub async fn start(&self, port: u16) -> anyhow::Result<()> {
332          let app = Router::new()
333              .route("/metrics", get(metrics_handler))
334              .route("/health", get(health_handler))
335              .with_state(self.metrics.clone());
336  
337          let addr = format!("0.0.0.0:{}", port);
338          let listener = TcpListener::bind(&addr).await?;
339  
340          tracing::info!("Metrics server listening on {}", addr);
341  
342          axum::serve(listener, app).await?;
343  
344          Ok(())
345      }
346  }
347  
348  /// Metrics endpoint handler
349  async fn metrics_handler(State(metrics): State<Arc<Metrics>>) -> impl IntoResponse {
350      (
351          [("Content-Type", "text/plain; version=0.0.4; charset=utf-8")],
352          metrics.render(),
353      )
354  }
355  
356  /// Health check endpoint
357  async fn health_handler() -> impl IntoResponse {
358      "OK"
359  }
360  
361  #[cfg(test)]
362  mod tests {
363      use super::*;
364  
365      #[test]
366      fn test_metrics_creation() {
367          let metrics = Metrics::new();
368          assert_eq!(metrics.block_height.load(Ordering::Relaxed), 0);
369          assert_eq!(metrics.peer_count.load(Ordering::Relaxed), 0);
370      }
371  
372      #[test]
373      fn test_metrics_increment() {
374          let metrics = Metrics::new();
375  
376          metrics.set_block_height(100);
377          assert_eq!(metrics.block_height.load(Ordering::Relaxed), 100);
378  
379          metrics.inc_blocks_produced();
380          metrics.inc_blocks_produced();
381          assert_eq!(metrics.blocks_produced.load(Ordering::Relaxed), 2);
382  
383          metrics.set_peer_count(5);
384          assert_eq!(metrics.peer_count.load(Ordering::Relaxed), 5);
385      }
386  
387      #[test]
388      fn test_metrics_render() {
389          let metrics = Metrics::new();
390          metrics.set_block_height(1000);
391          metrics.set_peer_count(10);
392  
393          let output = metrics.render();
394  
395          assert!(output.contains("deltaos_block_height 1000"));
396          assert!(output.contains("deltaos_peer_count 10"));
397          assert!(output.contains("# HELP"));
398          assert!(output.contains("# TYPE"));
399      }
400  
401      #[test]
402      fn test_uptime() {
403          let metrics = Metrics::new();
404          std::thread::sleep(std::time::Duration::from_millis(10));
405          // uptime_seconds returns u64, so always >= 0
406          let _ = metrics.uptime_seconds();
407      }
408  
409      #[test]
410      fn test_network_metrics() {
411          let metrics = Metrics::new();
412  
413          metrics.add_bytes_sent(1000);
414          metrics.add_bytes_received(2000);
415          metrics.inc_messages_sent();
416          metrics.inc_messages_received();
417          metrics.inc_messages_received();
418  
419          assert_eq!(metrics.bytes_sent.load(Ordering::Relaxed), 1000);
420          assert_eq!(metrics.bytes_received.load(Ordering::Relaxed), 2000);
421          assert_eq!(metrics.messages_sent.load(Ordering::Relaxed), 1);
422          assert_eq!(metrics.messages_received.load(Ordering::Relaxed), 2);
423      }
424  
425      #[test]
426      fn test_exchange_metrics() {
427          let metrics = Metrics::new();
428  
429          metrics.inc_trades_executed();
430          metrics.inc_trades_executed();
431          metrics.set_open_orders(50);
432          metrics.set_trading_volume_24h(1_000_000);
433  
434          assert_eq!(metrics.trades_executed.load(Ordering::Relaxed), 2);
435          assert_eq!(metrics.open_orders.load(Ordering::Relaxed), 50);
436          assert_eq!(
437              metrics.trading_volume_24h.load(Ordering::Relaxed),
438              1_000_000
439          );
440      }
441  }