telemetry.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use alphavm::{ 20 ledger::{ 21 committee::Committee, 22 narwhal::{BatchCertificate, BatchHeader, Subdag}, 23 }, 24 prelude::{cfg_chunks, cfg_iter, Address, Field, Network}, 25 }; 26 27 use indexmap::{IndexMap, IndexSet}; 28 #[cfg(feature = "locktick")] 29 use locktick::parking_lot::RwLock; 30 #[cfg(not(feature = "locktick"))] 31 use parking_lot::RwLock; 32 #[cfg(not(feature = "serial"))] 33 use rayon::prelude::*; 34 use std::{collections::BTreeMap, sync::Arc}; 35 36 // TODO: Consider other metrics to track: 37 // - Response time 38 // - Sync rate 39 // - Latest height of each validator 40 // - Percentage of proposals that are converted into certificates 41 // - Fullness of proposals 42 // - Connectivity (how many other validators are they connected to) 43 // - Various stake weight considerations 44 // - The latest seen IP address of each validator (useful for debugging purposes) 45 46 /// The participation scores for each validator. 47 /// Certificate Score: The % of rounds the validator has a valid certificate 48 /// Signature Score: The % of certificates the validator has a valid signature for 49 /// Combined Score: The weighted score using the certificate and signature scores 50 type ParticipationScores = (f64, f64, f64); 51 52 /// Tracker for the participation metrics of validators. 53 #[derive(Clone, Debug)] 54 pub struct Telemetry<N: Network> { 55 /// The certificates seen for each round 56 /// A mapping of `round` to set of certificate IDs. 57 /// Note that this map is sorted to allow grouped iteration over rounds. 58 tracked_certificates: Arc<RwLock<BTreeMap<u64, IndexSet<Field<N>>>>>, 59 60 /// The total number of signatures seen for a validator, including for their own certificates. 61 /// A mapping of `address` to a mapping of `round` to `count`. 62 validator_signatures: Arc<RwLock<IndexMap<Address<N>, IndexMap<u64, u32>>>>, 63 64 /// The total number of certificates seen for a validator. 65 /// A mapping of `address` to a list of rounds. 66 validator_certificates: Arc<RwLock<IndexMap<Address<N>, IndexSet<u64>>>>, 67 68 /// The certificate, signature, and participation scores for each validator. 69 participation_scores: Arc<RwLock<IndexMap<Address<N>, ParticipationScores>>>, 70 } 71 72 impl<N: Network> Default for Telemetry<N> { 73 /// Initializes a new instance of telemetry. 74 fn default() -> Self { 75 Self::new() 76 } 77 } 78 79 impl<N: Network> Telemetry<N> { 80 /// Initializes a new instance of telemetry. 81 pub fn new() -> Self { 82 Self { 83 tracked_certificates: Default::default(), 84 validator_signatures: Default::default(), 85 validator_certificates: Default::default(), 86 participation_scores: Default::default(), 87 } 88 } 89 90 // TODO (raychu86): Consider using committee lookback here. 91 /// Fetch the participation scores for each validator in the committee set. 92 pub fn get_participation_scores(&self, committee: &Committee<N>) -> IndexMap<Address<N>, f64> { 93 // Fetch the participation scores. 94 let participation_scores = self.participation_scores.read(); 95 // Calculate the weighted score for each validator. 96 committee 97 .members() 98 .iter() 99 .map(|(address, _)| { 100 let score = 101 participation_scores.get(address).map(|(_, _, combined_score)| *combined_score).unwrap_or(0.0); 102 (*address, score) 103 }) 104 .collect() 105 } 106 107 /// Insert a subdag to the telemetry tracker. 108 /// Note: This currently assumes the subdag is fully formed and included in the block. 109 pub fn insert_subdag(&self, subdag: &Subdag<N>) { 110 // Garbage collect the old certificates. 111 let next_gc_round = subdag.anchor_round().saturating_sub(BatchHeader::<N>::MAX_GC_ROUNDS as u64); 112 self.garbage_collect_certificates(next_gc_round); 113 114 // Insert the subdag certificates. 115 cfg_iter!(subdag).for_each(|(_round, certificates)| { 116 cfg_iter!(certificates).for_each(|certificate| { 117 // TODO (raychu86): Can be greatly optimized by doing a one-shot update instead of individual certificates. 118 self.insert_certificate(certificate); 119 }) 120 }); 121 122 // Calculate the participation scores. 123 self.update_participation_scores(); 124 } 125 126 /// Insert a certificate to the telemetry tracker. 127 pub fn insert_certificate(&self, certificate: &BatchCertificate<N>) { 128 // Acquire the lock for `tracked_certificates`. 129 let mut tracked_certificates = self.tracked_certificates.write(); 130 131 // Retrieve the certificate round, author, and ID. 132 let certificate_round = certificate.round(); 133 let certificate_author = certificate.author(); 134 let certificate_id = certificate.id(); 135 136 // If the certificate already exists in the tracker, then return early. 137 if tracked_certificates.get(&certificate_round).is_some_and(|certs| certs.contains(&certificate_id)) { 138 return; 139 } 140 141 // Insert the certificate ID. 142 tracked_certificates.entry(certificate_round).or_default().insert(certificate_id); 143 144 // Acquire the lock for `validator_signatures` 145 let mut validator_signatures = self.validator_signatures.write(); 146 147 // Insert the certificate author and signers. 148 for address in 149 [certificate_author].into_iter().chain(certificate.signatures().map(|signature| signature.to_address())) 150 { 151 validator_signatures 152 .entry(address) 153 .or_default() 154 .entry(certificate_round) 155 .and_modify(|count| *count += 1) 156 .or_insert(1); 157 } 158 159 // Acquire the lock for `validator_certificates`. 160 let mut validator_certificates = self.validator_certificates.write(); 161 162 // Insert the certificate 163 validator_certificates.entry(certificate_author).or_default().insert(certificate_round); 164 } 165 166 /// Calculate and update the participation scores for each validator. 167 pub fn update_participation_scores(&self) { 168 // Calculate the combined score with custom weights: 169 // - 90% certificate participation score 170 // - 10% signature participation score 171 fn weighted_score(certificate_score: f64, signature_score: f64) -> f64 { 172 let score = (0.9 * certificate_score) + (0.1 * signature_score); 173 174 // Truncate to the last 2 decimal places. 175 (score * 100.0).round() / 100.0 176 } 177 178 // Fetch the certificates and signatures. 179 let tracked_certificates = self.tracked_certificates.read(); 180 let validator_signatures = self.validator_signatures.read(); 181 let validator_certificates = self.validator_certificates.read(); 182 183 // Fetch the total number of certificates. 184 let total_certificates = validator_certificates.values().map(|rounds| rounds.len()).sum::<usize>(); 185 186 // Calculate the signature participation scores for each validator. 187 let signature_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_signatures) 188 .map(|(address, signatures)| { 189 let total_signatures = signatures.values().sum::<u32>() as f64; 190 let score = total_signatures / total_certificates as f64 * 100.0; 191 (*address, score as u16) 192 }) 193 .collect(); 194 195 // Calculate the certificate participation scores for each validator. 196 // This score is based on how many certificates the validator has included in every two rounds. 197 let tracked_rounds: Vec<_> = tracked_certificates.keys().skip_while(|r| *r % 2 == 0).copied().collect(); 198 let certificate_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_certificates) 199 .map(|(address, certificate_rounds)| { 200 // Count the number of round pairs that are included in the certificate rounds. 201 let num_included_round_pairs = cfg_chunks!(&tracked_rounds, 2) 202 .filter(|chunk| chunk.iter().any(|r| certificate_rounds.contains(r))) 203 .count(); 204 // Calculate the number of round pairs. 205 let num_round_pairs = (tracked_rounds.len().saturating_add(1)).saturating_div(2); 206 // Calculate the score based on the number of certificate rounds the validator is a part of. 207 let score = num_included_round_pairs as f64 / num_round_pairs.max(1) as f64 * 100.0; 208 (*address, score as u16) 209 }) 210 .collect(); 211 212 // Calculate the final participation scores for each validator. 213 let validator_addresses: IndexSet<_> = 214 signature_participation_scores.keys().chain(certificate_participation_scores.keys()).copied().collect(); 215 let mut new_participation_scores = IndexMap::new(); 216 for address in validator_addresses { 217 let signature_score = *signature_participation_scores.get(&address).unwrap_or(&0) as f64; 218 let certificate_score = *certificate_participation_scores.get(&address).unwrap_or(&0) as f64; 219 let combined_score = weighted_score(certificate_score, signature_score); 220 new_participation_scores.insert(address, (certificate_score, signature_score, combined_score)); 221 } 222 223 // Update the participation scores. 224 *self.participation_scores.write() = new_participation_scores; 225 } 226 227 /// Remove the certificates from the telemetry tracker that are no longer relevant based on gc. 228 pub fn garbage_collect_certificates(&self, gc_round: u64) { 229 // Acquire the locks. 230 let mut tracked_certificates = self.tracked_certificates.write(); 231 let mut validator_signatures = self.validator_signatures.write(); 232 let mut validator_certificates = self.validator_certificates.write(); 233 234 // Remove certificates that are not longer relevant 235 tracked_certificates.retain(|&round, _| round > gc_round); 236 237 // Remove signatures that are no longer relevant. 238 validator_signatures.retain(|_, rounds| { 239 rounds.retain(|&round, _| round > gc_round); 240 // Remove the address if there are no more tracked signatures. 241 !rounds.is_empty() 242 }); 243 244 // Remove certificates that are no longer relevant. 245 validator_certificates.retain(|_, rounds| { 246 rounds.retain(|&round| round > gc_round); 247 // Remove the address if there are no more tracked certificates. 248 !rounds.is_empty() 249 }); 250 } 251 } 252 253 #[cfg(test)] 254 mod tests { 255 use super::*; 256 use alphavm::{ 257 ledger::{ 258 committee::test_helpers::sample_committee_for_round_and_members, 259 narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round, 260 }, 261 prelude::MainnetV0, 262 utilities::TestRng, 263 }; 264 265 use rand::Rng; 266 267 type CurrentNetwork = MainnetV0; 268 269 #[test] 270 fn test_insert_certificates() { 271 let rng = &mut TestRng::default(); 272 273 // Initialize the telemetry tracker. 274 let telemetry = Telemetry::<CurrentNetwork>::new(); 275 276 // Set the current round. 277 let current_round = 2; 278 279 // Sample the certificates. 280 let mut certificates = IndexSet::new(); 281 for _ in 0..10 { 282 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 283 } 284 285 // Insert the certificates. 286 assert!(telemetry.tracked_certificates.read().is_empty()); 287 for certificate in &certificates { 288 telemetry.insert_certificate(certificate); 289 } 290 assert_eq!(telemetry.tracked_certificates.read().get(¤t_round).unwrap().len(), certificates.len()); 291 } 292 293 #[test] 294 fn test_participation_scores() { 295 let rng = &mut TestRng::default(); 296 297 // Initialize the telemetry tracker. 298 let telemetry = Telemetry::<CurrentNetwork>::new(); 299 300 // Set the current round. 301 let current_round = 2; 302 303 // Sample the certificates. 304 let mut certificates = IndexSet::new(); 305 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 306 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 307 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 308 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 309 310 // Initialize the committee. 311 let committee = sample_committee_for_round_and_members( 312 current_round, 313 vec![ 314 certificates[0].author(), 315 certificates[1].author(), 316 certificates[2].author(), 317 certificates[3].author(), 318 ], 319 rng, 320 ); 321 322 // Insert the certificates. 323 assert!(telemetry.tracked_certificates.read().is_empty()); 324 for certificate in &certificates { 325 telemetry.insert_certificate(certificate); 326 } 327 328 // Fetch the participation scores. 329 let participation_scores = telemetry.get_participation_scores(&committee); 330 assert_eq!(participation_scores.len(), committee.members().len()); 331 for (address, _) in committee.members() { 332 assert_eq!(*participation_scores.get(address).unwrap(), 0.0); 333 } 334 335 // Calculate the participation scores. 336 telemetry.update_participation_scores(); 337 338 // Ensure that the participation scores are updated. 339 let participation_scores = telemetry.get_participation_scores(&committee); 340 for (address, _) in committee.members() { 341 assert!(*participation_scores.get(address).unwrap() > 0.0); 342 } 343 344 println!("{participation_scores:?}"); 345 } 346 347 #[test] 348 fn test_garbage_collection() { 349 let rng = &mut TestRng::default(); 350 351 // Initialize the telemetry tracker. 352 let telemetry = Telemetry::<CurrentNetwork>::new(); 353 354 // Set the current round. 355 let current_round = 2; 356 let next_round = current_round + 1; 357 358 // Sample the certificates for round `current_round` 359 let mut certificates = IndexSet::new(); 360 let num_initial_certificates = rng.gen_range(1..10); 361 for _ in 0..num_initial_certificates { 362 certificates.insert(sample_batch_certificate_for_round(current_round, rng)); 363 } 364 365 // Sample the certificates for round `next_round` 366 let num_new_certificates = rng.gen_range(1..10); 367 for _ in 0..num_new_certificates { 368 certificates.insert(sample_batch_certificate_for_round(next_round, rng)); 369 } 370 371 // Insert the certificates. 372 for certificate in &certificates { 373 telemetry.insert_certificate(certificate); 374 } 375 assert_eq!(telemetry.tracked_certificates.read().get(¤t_round).unwrap().len(), num_initial_certificates); 376 assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates); 377 378 // Garbage collect the certificates 379 telemetry.garbage_collect_certificates(current_round); 380 assert!(telemetry.tracked_certificates.read().get(¤t_round).is_none()); 381 assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates); 382 } 383 }