/ node / metrics / src / lib.rs
lib.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  mod names;
 17  
 18  // Expose the names at the crate level for easy access.
 19  pub use names::*;
 20  
 21  // Re-export the snarkVM metrics.
 22  pub use alphavm::metrics::*;
 23  
 24  #[cfg(not(feature = "serial"))]
 25  use rayon::prelude::*;
 26  
 27  use alphavm::{
 28      ledger::narwhal::TransmissionID,
 29      prelude::{Block, Network, cfg_iter},
 30  };
 31  #[cfg(feature = "locktick")]
 32  use locktick::parking_lot::Mutex;
 33  #[cfg(not(feature = "locktick"))]
 34  use parking_lot::Mutex;
 35  use std::{
 36      collections::HashMap,
 37      net::SocketAddr,
 38      sync::{
 39          Arc,
 40          atomic::{AtomicUsize, Ordering},
 41      },
 42  };
 43  use time::OffsetDateTime;
 44  
 45  /// Initializes the metrics and returns a handle to the task running the metrics exporter.
 46  pub fn initialize_metrics(ip: Option<SocketAddr>) {
 47      // Build the Prometheus exporter.
 48      let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
 49      if let Some(ip) = ip { builder.with_http_listener(ip) } else { builder }
 50          .install()
 51          .expect("can't build the prometheus exporter");
 52  
 53      // Register the snarkVM metrics.
 54      alphavm::metrics::register_metrics();
 55  
 56      // Register the metrics so they exist on init.
 57      for name in crate::names::GAUGE_NAMES {
 58          register_gauge(name);
 59      }
 60      for name in crate::names::COUNTER_NAMES {
 61          register_counter(name);
 62      }
 63      for name in crate::names::HISTOGRAM_NAMES {
 64          register_histogram(name);
 65      }
 66  }
 67  
 68  pub fn update_block_metrics<N: Network>(block: &Block<N>) {
 69      use alphavm::ledger::ConfirmedTransaction;
 70  
 71      let accepted_deploy = AtomicUsize::new(0);
 72      let accepted_execute = AtomicUsize::new(0);
 73      let rejected_deploy = AtomicUsize::new(0);
 74      let rejected_execute = AtomicUsize::new(0);
 75  
 76      // Add transaction to atomic counter based on enum type match.
 77      cfg_iter!(block.transactions()).for_each(|tx| match tx {
 78          ConfirmedTransaction::AcceptedDeploy(_, _, _) => {
 79              accepted_deploy.fetch_add(1, Ordering::Relaxed);
 80          }
 81          ConfirmedTransaction::AcceptedExecute(_, _, _) => {
 82              accepted_execute.fetch_add(1, Ordering::Relaxed);
 83          }
 84          ConfirmedTransaction::RejectedDeploy(_, _, _, _) => {
 85              rejected_deploy.fetch_add(1, Ordering::Relaxed);
 86          }
 87          ConfirmedTransaction::RejectedExecute(_, _, _, _) => {
 88              rejected_execute.fetch_add(1, Ordering::Relaxed);
 89          }
 90      });
 91  
 92      increment_gauge(blocks::ACCEPTED_DEPLOY, accepted_deploy.load(Ordering::Relaxed) as f64);
 93      increment_gauge(blocks::ACCEPTED_EXECUTE, accepted_execute.load(Ordering::Relaxed) as f64);
 94      increment_gauge(blocks::REJECTED_DEPLOY, rejected_deploy.load(Ordering::Relaxed) as f64);
 95      increment_gauge(blocks::REJECTED_EXECUTE, rejected_execute.load(Ordering::Relaxed) as f64);
 96  
 97      // Update aborted transactions and solutions.
 98      increment_gauge(blocks::ABORTED_TRANSACTIONS, block.aborted_transaction_ids().len() as f64);
 99      increment_gauge(blocks::ABORTED_SOLUTIONS, block.aborted_solution_ids().len() as f64);
100  }
101  
102  pub fn add_transmission_latency_metric<N: Network>(
103      transmissions_tracker: &Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
104      block: &Block<N>,
105  ) {
106      // The age at which we consider a transmission "stale".
107      const AGE_THRESHOLD_SECONDS: i32 = 30 * 60; // 30 minutes
108  
109      // Retrieve all solution IDs (including aborted).
110      let solution_ids: std::collections::HashSet<_> =
111          block.solutions().solution_ids().chain(block.aborted_solution_ids()).collect();
112  
113      // Retrieve all transaction IDs (including aborted).
114      let transaction_ids: std::collections::HashSet<_> =
115          block.transaction_ids().chain(block.aborted_transaction_ids()).collect();
116  
117      let mut transmissions_tracker = transmissions_tracker.lock();
118      let ts_now = OffsetDateTime::now_utc().unix_timestamp();
119  
120      // Determine which keys to remove.
121      let keys_to_remove = cfg_iter!(&*transmissions_tracker)
122          .flat_map(|(transmission_id, timestamp)| {
123              let elapsed_time = std::time::Duration::from_secs((ts_now - *timestamp) as u64);
124  
125              if elapsed_time.as_secs() > AGE_THRESHOLD_SECONDS as u64 {
126                  // This entry is stale-- remove it from transmission queue and record it as a stale transmission.
127                  match transmission_id {
128                      TransmissionID::Solution(..) => increment_counter(consensus::STALE_UNCONFIRMED_SOLUTIONS),
129                      TransmissionID::Transaction(..) => increment_counter(consensus::STALE_UNCONFIRMED_TRANSACTIONS),
130                      TransmissionID::Ratification => {}
131                  }
132                  Some(*transmission_id)
133              } else {
134                  let transmission_type = match transmission_id {
135                      TransmissionID::Solution(solution_id, _) if solution_ids.contains(solution_id) => Some("solution"),
136                      TransmissionID::Transaction(transaction_id, _) if transaction_ids.contains(transaction_id) => {
137                          Some("transaction")
138                      }
139                      // Either a ratification, or the transmission was not included in the block
140                      _ => None,
141                  };
142  
143                  if let Some(transmission_type_string) = transmission_type {
144                      histogram_label(
145                          consensus::TRANSMISSION_LATENCY,
146                          "transmission_type",
147                          transmission_type_string.to_owned(),
148                          elapsed_time.as_secs_f64(),
149                      );
150                      Some(*transmission_id)
151                  } else {
152                      None
153                  }
154              }
155          })
156          .collect::<Vec<_>>();
157  
158      // Remove keys of stale or seen transmissions.
159      for key in keys_to_remove {
160          transmissions_tracker.remove(&key);
161      }
162  }