lib.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 mod names; 17 18 // Expose the names at the crate level for easy access. 19 pub use names::*; 20 21 // Re-export the snarkVM metrics. 22 pub use alphavm::metrics::*; 23 24 #[cfg(not(feature = "serial"))] 25 use rayon::prelude::*; 26 27 use alphavm::{ 28 ledger::narwhal::TransmissionID, 29 prelude::{Block, Network, cfg_iter}, 30 }; 31 #[cfg(feature = "locktick")] 32 use locktick::parking_lot::Mutex; 33 #[cfg(not(feature = "locktick"))] 34 use parking_lot::Mutex; 35 use std::{ 36 collections::HashMap, 37 net::SocketAddr, 38 sync::{ 39 Arc, 40 atomic::{AtomicUsize, Ordering}, 41 }, 42 }; 43 use time::OffsetDateTime; 44 45 /// Initializes the metrics and returns a handle to the task running the metrics exporter. 46 pub fn initialize_metrics(ip: Option<SocketAddr>) { 47 // Build the Prometheus exporter. 48 let builder = metrics_exporter_prometheus::PrometheusBuilder::new(); 49 if let Some(ip) = ip { builder.with_http_listener(ip) } else { builder } 50 .install() 51 .expect("can't build the prometheus exporter"); 52 53 // Register the snarkVM metrics. 54 alphavm::metrics::register_metrics(); 55 56 // Register the metrics so they exist on init. 57 for name in crate::names::GAUGE_NAMES { 58 register_gauge(name); 59 } 60 for name in crate::names::COUNTER_NAMES { 61 register_counter(name); 62 } 63 for name in crate::names::HISTOGRAM_NAMES { 64 register_histogram(name); 65 } 66 } 67 68 pub fn update_block_metrics<N: Network>(block: &Block<N>) { 69 use alphavm::ledger::ConfirmedTransaction; 70 71 let accepted_deploy = AtomicUsize::new(0); 72 let accepted_execute = AtomicUsize::new(0); 73 let rejected_deploy = AtomicUsize::new(0); 74 let rejected_execute = AtomicUsize::new(0); 75 76 // Add transaction to atomic counter based on enum type match. 77 cfg_iter!(block.transactions()).for_each(|tx| match tx { 78 ConfirmedTransaction::AcceptedDeploy(_, _, _) => { 79 accepted_deploy.fetch_add(1, Ordering::Relaxed); 80 } 81 ConfirmedTransaction::AcceptedExecute(_, _, _) => { 82 accepted_execute.fetch_add(1, Ordering::Relaxed); 83 } 84 ConfirmedTransaction::RejectedDeploy(_, _, _, _) => { 85 rejected_deploy.fetch_add(1, Ordering::Relaxed); 86 } 87 ConfirmedTransaction::RejectedExecute(_, _, _, _) => { 88 rejected_execute.fetch_add(1, Ordering::Relaxed); 89 } 90 }); 91 92 increment_gauge(blocks::ACCEPTED_DEPLOY, accepted_deploy.load(Ordering::Relaxed) as f64); 93 increment_gauge(blocks::ACCEPTED_EXECUTE, accepted_execute.load(Ordering::Relaxed) as f64); 94 increment_gauge(blocks::REJECTED_DEPLOY, rejected_deploy.load(Ordering::Relaxed) as f64); 95 increment_gauge(blocks::REJECTED_EXECUTE, rejected_execute.load(Ordering::Relaxed) as f64); 96 97 // Update aborted transactions and solutions. 98 increment_gauge(blocks::ABORTED_TRANSACTIONS, block.aborted_transaction_ids().len() as f64); 99 increment_gauge(blocks::ABORTED_SOLUTIONS, block.aborted_solution_ids().len() as f64); 100 } 101 102 pub fn add_transmission_latency_metric<N: Network>( 103 transmissions_tracker: &Arc<Mutex<HashMap<TransmissionID<N>, i64>>>, 104 block: &Block<N>, 105 ) { 106 // The age at which we consider a transmission "stale". 107 const AGE_THRESHOLD_SECONDS: i32 = 30 * 60; // 30 minutes 108 109 // Retrieve all solution IDs (including aborted). 110 let solution_ids: std::collections::HashSet<_> = 111 block.solutions().solution_ids().chain(block.aborted_solution_ids()).collect(); 112 113 // Retrieve all transaction IDs (including aborted). 114 let transaction_ids: std::collections::HashSet<_> = 115 block.transaction_ids().chain(block.aborted_transaction_ids()).collect(); 116 117 let mut transmissions_tracker = transmissions_tracker.lock(); 118 let ts_now = OffsetDateTime::now_utc().unix_timestamp(); 119 120 // Determine which keys to remove. 121 let keys_to_remove = cfg_iter!(&*transmissions_tracker) 122 .flat_map(|(transmission_id, timestamp)| { 123 let elapsed_time = std::time::Duration::from_secs((ts_now - *timestamp) as u64); 124 125 if elapsed_time.as_secs() > AGE_THRESHOLD_SECONDS as u64 { 126 // This entry is stale-- remove it from transmission queue and record it as a stale transmission. 127 match transmission_id { 128 TransmissionID::Solution(..) => increment_counter(consensus::STALE_UNCONFIRMED_SOLUTIONS), 129 TransmissionID::Transaction(..) => increment_counter(consensus::STALE_UNCONFIRMED_TRANSACTIONS), 130 TransmissionID::Ratification => {} 131 } 132 Some(*transmission_id) 133 } else { 134 let transmission_type = match transmission_id { 135 TransmissionID::Solution(solution_id, _) if solution_ids.contains(solution_id) => Some("solution"), 136 TransmissionID::Transaction(transaction_id, _) if transaction_ids.contains(transaction_id) => { 137 Some("transaction") 138 } 139 // Either a ratification, or the transmission was not included in the block 140 _ => None, 141 }; 142 143 if let Some(transmission_type_string) = transmission_type { 144 histogram_label( 145 consensus::TRANSMISSION_LATENCY, 146 "transmission_type", 147 transmission_type_string.to_owned(), 148 elapsed_time.as_secs_f64(), 149 ); 150 Some(*transmission_id) 151 } else { 152 None 153 } 154 } 155 }) 156 .collect::<Vec<_>>(); 157 158 // Remove keys of stale or seen transmissions. 159 for key in keys_to_remove { 160 transmissions_tracker.remove(&key); 161 } 162 }