/ node / bft / src / helpers / telemetry.rs
telemetry.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use alphavm::{
 20      ledger::{
 21          committee::Committee,
 22          narwhal::{BatchCertificate, BatchHeader, Subdag},
 23      },
 24      prelude::{cfg_chunks, cfg_iter, Address, Field, Network},
 25  };
 26  
 27  use indexmap::{IndexMap, IndexSet};
 28  #[cfg(feature = "locktick")]
 29  use locktick::parking_lot::RwLock;
 30  #[cfg(not(feature = "locktick"))]
 31  use parking_lot::RwLock;
 32  #[cfg(not(feature = "serial"))]
 33  use rayon::prelude::*;
 34  use std::{collections::BTreeMap, sync::Arc};
 35  
 36  // TODO: Consider other metrics to track:
 37  //  - Response time
 38  //  - Sync rate
 39  //  - Latest height of each validator
 40  //  - Percentage of proposals that are converted into certificates
 41  //  - Fullness of proposals
 42  //  - Connectivity (how many other validators are they connected to)
 43  //  - Various stake weight considerations
 44  //  - The latest seen IP address of each validator (useful for debugging purposes)
 45  
 46  /// The participation scores for each validator.
 47  ///     Certificate Score: The % of rounds the validator has a valid certificate
 48  ///     Signature Score: The % of certificates the validator has a valid signature for
 49  ///     Combined Score: The weighted score using the certificate and signature scores
 50  type ParticipationScores = (f64, f64, f64);
 51  
 52  /// Tracker for the participation metrics of validators.
 53  #[derive(Clone, Debug)]
 54  pub struct Telemetry<N: Network> {
 55      /// The certificates seen for each round
 56      /// A mapping of `round` to set of certificate IDs.
 57      /// Note that this map is sorted to allow grouped iteration over rounds.
 58      tracked_certificates: Arc<RwLock<BTreeMap<u64, IndexSet<Field<N>>>>>,
 59  
 60      /// The total number of signatures seen for a validator, including for their own certificates.
 61      /// A mapping of `address` to a mapping of `round` to `count`.
 62      validator_signatures: Arc<RwLock<IndexMap<Address<N>, IndexMap<u64, u32>>>>,
 63  
 64      /// The total number of certificates seen for a validator.
 65      /// A mapping of `address` to a list of rounds.
 66      validator_certificates: Arc<RwLock<IndexMap<Address<N>, IndexSet<u64>>>>,
 67  
 68      /// The certificate, signature, and participation scores for each validator.
 69      participation_scores: Arc<RwLock<IndexMap<Address<N>, ParticipationScores>>>,
 70  }
 71  
 72  impl<N: Network> Default for Telemetry<N> {
 73      /// Initializes a new instance of telemetry.
 74      fn default() -> Self {
 75          Self::new()
 76      }
 77  }
 78  
 79  impl<N: Network> Telemetry<N> {
 80      /// Initializes a new instance of telemetry.
 81      pub fn new() -> Self {
 82          Self {
 83              tracked_certificates: Default::default(),
 84              validator_signatures: Default::default(),
 85              validator_certificates: Default::default(),
 86              participation_scores: Default::default(),
 87          }
 88      }
 89  
 90      // TODO (raychu86): Consider using committee lookback here.
 91      /// Fetch the participation scores for each validator in the committee set.
 92      pub fn get_participation_scores(&self, committee: &Committee<N>) -> IndexMap<Address<N>, f64> {
 93          // Fetch the participation scores.
 94          let participation_scores = self.participation_scores.read();
 95          // Calculate the weighted score for each validator.
 96          committee
 97              .members()
 98              .iter()
 99              .map(|(address, _)| {
100                  let score =
101                      participation_scores.get(address).map(|(_, _, combined_score)| *combined_score).unwrap_or(0.0);
102                  (*address, score)
103              })
104              .collect()
105      }
106  
107      /// Insert a subdag to the telemetry tracker.
108      /// Note: This currently assumes the subdag is fully formed and included in the block.
109      pub fn insert_subdag(&self, subdag: &Subdag<N>) {
110          // Garbage collect the old certificates.
111          let next_gc_round = subdag.anchor_round().saturating_sub(BatchHeader::<N>::MAX_GC_ROUNDS as u64);
112          self.garbage_collect_certificates(next_gc_round);
113  
114          // Insert the subdag certificates.
115          cfg_iter!(subdag).for_each(|(_round, certificates)| {
116              cfg_iter!(certificates).for_each(|certificate| {
117                  // TODO (raychu86): Can be greatly optimized by doing a one-shot update instead of individual certificates.
118                  self.insert_certificate(certificate);
119              })
120          });
121  
122          // Calculate the participation scores.
123          self.update_participation_scores();
124      }
125  
126      /// Insert a certificate to the telemetry tracker.
127      pub fn insert_certificate(&self, certificate: &BatchCertificate<N>) {
128          // Acquire the lock for `tracked_certificates`.
129          let mut tracked_certificates = self.tracked_certificates.write();
130  
131          // Retrieve the certificate round, author, and ID.
132          let certificate_round = certificate.round();
133          let certificate_author = certificate.author();
134          let certificate_id = certificate.id();
135  
136          // If the certificate already exists in the tracker, then return early.
137          if tracked_certificates.get(&certificate_round).is_some_and(|certs| certs.contains(&certificate_id)) {
138              return;
139          }
140  
141          // Insert the certificate ID.
142          tracked_certificates.entry(certificate_round).or_default().insert(certificate_id);
143  
144          // Acquire the lock for `validator_signatures`
145          let mut validator_signatures = self.validator_signatures.write();
146  
147          // Insert the certificate author and signers.
148          for address in
149              [certificate_author].into_iter().chain(certificate.signatures().map(|signature| signature.to_address()))
150          {
151              validator_signatures
152                  .entry(address)
153                  .or_default()
154                  .entry(certificate_round)
155                  .and_modify(|count| *count += 1)
156                  .or_insert(1);
157          }
158  
159          // Acquire the lock for `validator_certificates`.
160          let mut validator_certificates = self.validator_certificates.write();
161  
162          // Insert the certificate
163          validator_certificates.entry(certificate_author).or_default().insert(certificate_round);
164      }
165  
166      /// Calculate and update the participation scores for each validator.
167      pub fn update_participation_scores(&self) {
168          // Calculate the combined score with custom weights:
169          // - 90% certificate participation score
170          // - 10% signature participation score
171          fn weighted_score(certificate_score: f64, signature_score: f64) -> f64 {
172              let score = (0.9 * certificate_score) + (0.1 * signature_score);
173  
174              // Truncate to the last 2 decimal places.
175              (score * 100.0).round() / 100.0
176          }
177  
178          // Fetch the certificates and signatures.
179          let tracked_certificates = self.tracked_certificates.read();
180          let validator_signatures = self.validator_signatures.read();
181          let validator_certificates = self.validator_certificates.read();
182  
183          // Fetch the total number of certificates.
184          let total_certificates = validator_certificates.values().map(|rounds| rounds.len()).sum::<usize>();
185  
186          // Calculate the signature participation scores for each validator.
187          let signature_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_signatures)
188              .map(|(address, signatures)| {
189                  let total_signatures = signatures.values().sum::<u32>() as f64;
190                  let score = total_signatures / total_certificates as f64 * 100.0;
191                  (*address, score as u16)
192              })
193              .collect();
194  
195          // Calculate the certificate participation scores for each validator.
196          // This score is based on how many certificates the validator has included in every two rounds.
197          let tracked_rounds: Vec<_> = tracked_certificates.keys().skip_while(|r| *r % 2 == 0).copied().collect();
198          let certificate_participation_scores: IndexMap<_, _> = cfg_iter!(&*validator_certificates)
199              .map(|(address, certificate_rounds)| {
200                  // Count the number of round pairs that are included in the certificate rounds.
201                  let num_included_round_pairs = cfg_chunks!(&tracked_rounds, 2)
202                      .filter(|chunk| chunk.iter().any(|r| certificate_rounds.contains(r)))
203                      .count();
204                  // Calculate the number of round pairs.
205                  let num_round_pairs = (tracked_rounds.len().saturating_add(1)).saturating_div(2);
206                  // Calculate the score based on the number of certificate rounds the validator is a part of.
207                  let score = num_included_round_pairs as f64 / num_round_pairs.max(1) as f64 * 100.0;
208                  (*address, score as u16)
209              })
210              .collect();
211  
212          // Calculate the final participation scores for each validator.
213          let validator_addresses: IndexSet<_> =
214              signature_participation_scores.keys().chain(certificate_participation_scores.keys()).copied().collect();
215          let mut new_participation_scores = IndexMap::new();
216          for address in validator_addresses {
217              let signature_score = *signature_participation_scores.get(&address).unwrap_or(&0) as f64;
218              let certificate_score = *certificate_participation_scores.get(&address).unwrap_or(&0) as f64;
219              let combined_score = weighted_score(certificate_score, signature_score);
220              new_participation_scores.insert(address, (certificate_score, signature_score, combined_score));
221          }
222  
223          // Update the participation scores.
224          *self.participation_scores.write() = new_participation_scores;
225      }
226  
227      /// Remove the certificates from the telemetry tracker that are no longer relevant based on gc.
228      pub fn garbage_collect_certificates(&self, gc_round: u64) {
229          // Acquire the locks.
230          let mut tracked_certificates = self.tracked_certificates.write();
231          let mut validator_signatures = self.validator_signatures.write();
232          let mut validator_certificates = self.validator_certificates.write();
233  
234          // Remove certificates that are not longer relevant
235          tracked_certificates.retain(|&round, _| round > gc_round);
236  
237          // Remove signatures that are no longer relevant.
238          validator_signatures.retain(|_, rounds| {
239              rounds.retain(|&round, _| round > gc_round);
240              // Remove the address if there are no more tracked signatures.
241              !rounds.is_empty()
242          });
243  
244          // Remove certificates that are no longer relevant.
245          validator_certificates.retain(|_, rounds| {
246              rounds.retain(|&round| round > gc_round);
247              // Remove the address if there are no more tracked certificates.
248              !rounds.is_empty()
249          });
250      }
251  }
252  
253  #[cfg(test)]
254  mod tests {
255      use super::*;
256      use alphavm::{
257          ledger::{
258              committee::test_helpers::sample_committee_for_round_and_members,
259              narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round,
260          },
261          prelude::MainnetV0,
262          utilities::TestRng,
263      };
264  
265      use rand::Rng;
266  
267      type CurrentNetwork = MainnetV0;
268  
269      #[test]
270      fn test_insert_certificates() {
271          let rng = &mut TestRng::default();
272  
273          // Initialize the telemetry tracker.
274          let telemetry = Telemetry::<CurrentNetwork>::new();
275  
276          // Set the current round.
277          let current_round = 2;
278  
279          // Sample the certificates.
280          let mut certificates = IndexSet::new();
281          for _ in 0..10 {
282              certificates.insert(sample_batch_certificate_for_round(current_round, rng));
283          }
284  
285          // Insert the certificates.
286          assert!(telemetry.tracked_certificates.read().is_empty());
287          for certificate in &certificates {
288              telemetry.insert_certificate(certificate);
289          }
290          assert_eq!(telemetry.tracked_certificates.read().get(&current_round).unwrap().len(), certificates.len());
291      }
292  
293      #[test]
294      fn test_participation_scores() {
295          let rng = &mut TestRng::default();
296  
297          // Initialize the telemetry tracker.
298          let telemetry = Telemetry::<CurrentNetwork>::new();
299  
300          // Set the current round.
301          let current_round = 2;
302  
303          // Sample the certificates.
304          let mut certificates = IndexSet::new();
305          certificates.insert(sample_batch_certificate_for_round(current_round, rng));
306          certificates.insert(sample_batch_certificate_for_round(current_round, rng));
307          certificates.insert(sample_batch_certificate_for_round(current_round, rng));
308          certificates.insert(sample_batch_certificate_for_round(current_round, rng));
309  
310          // Initialize the committee.
311          let committee = sample_committee_for_round_and_members(
312              current_round,
313              vec![
314                  certificates[0].author(),
315                  certificates[1].author(),
316                  certificates[2].author(),
317                  certificates[3].author(),
318              ],
319              rng,
320          );
321  
322          // Insert the certificates.
323          assert!(telemetry.tracked_certificates.read().is_empty());
324          for certificate in &certificates {
325              telemetry.insert_certificate(certificate);
326          }
327  
328          // Fetch the participation scores.
329          let participation_scores = telemetry.get_participation_scores(&committee);
330          assert_eq!(participation_scores.len(), committee.members().len());
331          for (address, _) in committee.members() {
332              assert_eq!(*participation_scores.get(address).unwrap(), 0.0);
333          }
334  
335          // Calculate the participation scores.
336          telemetry.update_participation_scores();
337  
338          // Ensure that the participation scores are updated.
339          let participation_scores = telemetry.get_participation_scores(&committee);
340          for (address, _) in committee.members() {
341              assert!(*participation_scores.get(address).unwrap() > 0.0);
342          }
343  
344          println!("{participation_scores:?}");
345      }
346  
347      #[test]
348      fn test_garbage_collection() {
349          let rng = &mut TestRng::default();
350  
351          // Initialize the telemetry tracker.
352          let telemetry = Telemetry::<CurrentNetwork>::new();
353  
354          // Set the current round.
355          let current_round = 2;
356          let next_round = current_round + 1;
357  
358          // Sample the certificates for round `current_round`
359          let mut certificates = IndexSet::new();
360          let num_initial_certificates = rng.gen_range(1..10);
361          for _ in 0..num_initial_certificates {
362              certificates.insert(sample_batch_certificate_for_round(current_round, rng));
363          }
364  
365          // Sample the certificates for round `next_round`
366          let num_new_certificates = rng.gen_range(1..10);
367          for _ in 0..num_new_certificates {
368              certificates.insert(sample_batch_certificate_for_round(next_round, rng));
369          }
370  
371          // Insert the certificates.
372          for certificate in &certificates {
373              telemetry.insert_certificate(certificate);
374          }
375          assert_eq!(telemetry.tracked_certificates.read().get(&current_round).unwrap().len(), num_initial_certificates);
376          assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates);
377  
378          // Garbage collect the certificates
379          telemetry.garbage_collect_certificates(current_round);
380          assert!(telemetry.tracked_certificates.read().get(&current_round).is_none());
381          assert_eq!(telemetry.tracked_certificates.read().get(&next_round).unwrap().len(), num_new_certificates);
382      }
383  }