/ node / sync / src / ping.rs
ping.rs
  1  // Copyright (c) 2025-2026 ACDC Network
  2  // This file is part of the alphaos library.
  3  //
  4  // Alpha Chain | Delta Chain Protocol
  5  // International Monetary Graphite.
  6  //
  7  // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com).
  8  // They built world-class ZK infrastructure. We installed the EASY button.
  9  // Their cryptography: elegant. Our modifications: bureaucracy-compatible.
 10  // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours.
 11  //
 12  // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0
 13  // All modifications and new work: CC0 1.0 Universal Public Domain Dedication.
 14  // No rights reserved. No permission required. No warranty. No refunds.
 15  //
 16  // https://creativecommons.org/publicdomain/zero/1.0/
 17  // SPDX-License-Identifier: CC0-1.0
 18  
 19  use crate::locators::BlockLocators;
 20  use alphaos_node_router::Router;
 21  use alphavm::prelude::Network;
 22  
 23  #[cfg(feature = "locktick")]
 24  use locktick::parking_lot::Mutex;
 25  #[cfg(not(feature = "locktick"))]
 26  use parking_lot::Mutex;
 27  use std::{
 28      collections::BTreeMap,
 29      net::SocketAddr,
 30      sync::Arc,
 31      time::{Duration, Instant},
 32  };
 33  use tokio::{sync::Notify, time::timeout};
 34  
 35  /// Internal state of the ping logic
 36  ///
 37  /// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs.
 38  /// When a new peer connects or a Pong message is received, an entry in next ping is created
 39  /// for when a peer should next be pinged.
 40  ///
 41  /// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection?
 42  struct PingInner<N: Network> {
 43      /// The next time we should ping a peer.
 44      next_ping: BTreeMap<Instant, SocketAddr>,
 45      /// The most recent block locators.
 46      /// (or None if this node does not offer block sync)
 47      block_locators: Option<BlockLocators<N>>,
 48  }
 49  
 50  /// Manages sending Ping messages to all connected peers.
 51  pub struct Ping<N: Network> {
 52      router: Router<N>,
 53      inner: Arc<Mutex<PingInner<N>>>,
 54      notify: Arc<Notify>,
 55  }
 56  
 57  impl<N: Network> PingInner<N> {
 58      fn new(block_locators: Option<BlockLocators<N>>) -> Self {
 59          Self { block_locators, next_ping: Default::default() }
 60      }
 61  }
 62  
 63  impl<N: Network> Ping<N> {
 64      /// The duration in seconds to wait between sending ping requests to a peer.
 65      const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
 66  
 67      /// Create a new instance of the ping logic.
 68      /// There should only be one per node.
 69      ///
 70      /// # Usage
 71      /// Initialize this with the most up-to-date block locators and call
 72      /// update_block_locators, whenever a new block is received/created.
 73      pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
 74          let notify = Arc::new(Notify::default());
 75          let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
 76  
 77          {
 78              let inner = inner.clone();
 79              let router = router.clone();
 80              let notify = notify.clone();
 81  
 82              tokio::spawn(async move {
 83                  Self::ping_task(&inner, &router, &notify).await;
 84              });
 85          }
 86  
 87          Self { inner, router, notify }
 88      }
 89  
 90      /// Same as [`Self::new`] but for nodes that peers cannot sync from
 91      /// such as provers.
 92      pub fn new_nosync(router: Router<N>) -> Self {
 93          let notify = Arc::new(Notify::default());
 94          let inner = Arc::new(Mutex::new(PingInner::new(None)));
 95  
 96          {
 97              let inner = inner.clone();
 98              let router = router.clone();
 99              let notify = notify.clone();
100  
101              tokio::spawn(async move {
102                  Self::ping_task(&inner, &router, &notify).await;
103              });
104          }
105  
106          Self { inner, router, notify }
107      }
108  
109      /// Notify the ping logic that we received a Pong response.
110      pub fn on_pong_received(&self, peer_ip: SocketAddr) {
111          let now = Instant::now();
112          let mut inner = self.inner.lock();
113  
114          inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
115  
116          // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL
117      }
118  
119      /// Notify the ping logic that a new peer connected.
120      pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
121          // Send the first ping.
122          let locators = self.inner.lock().block_locators.clone();
123          if !self.router.send_ping(peer_ip, locators) {
124              warn!("Peer {peer_ip} connected and immediately disconnected?");
125          }
126      }
127  
128      /// Notify the ping logic that new blocks were created or synced.
129      pub fn update_block_locators(&self, locators: BlockLocators<N>) {
130          self.inner.lock().block_locators = Some(locators);
131  
132          // wake up the ping task
133          self.notify.notify_one();
134      }
135  
136      /// Background task that periodically sends out new ping messages.
137      async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
138          let mut new_block = false;
139  
140          loop {
141              // Do not hold the lock while waiting.
142              let sleep_time = {
143                  let mut inner = inner.lock();
144                  let now = Instant::now();
145  
146                  // Ping peers.
147                  if new_block {
148                      Self::ping_all_peers(&mut inner, router);
149                      new_block = false;
150                  } else {
151                      Self::ping_expired_peers(now, &mut inner, router);
152                  }
153  
154                  // Figure out how long to sleep.
155                  if let Some((time, _)) = inner.next_ping.first_key_value() {
156                      time.saturating_duration_since(now)
157                  } else {
158                      Self::MAX_PING_INTERVAL
159                  }
160              };
161  
162              // wait to be woke up, either by timer or notify
163              if timeout(sleep_time, notify.notified()).await.is_ok() {
164                  // If the timer is not expired, it means we got woken up by a new block.
165                  new_block = true;
166              }
167          }
168      }
169  
170      /// Ping all peers that have an expired timer.
171      fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
172          loop {
173              // Find next peer to contact.
174              let peer_ip = {
175                  let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
176                      return;
177                  };
178  
179                  if *time > now {
180                      return;
181                  }
182  
183                  *peer_ip
184              };
185  
186              // Send new ping
187              let locators = inner.block_locators.clone();
188              let success = router.send_ping(peer_ip, locators.clone());
189              inner.next_ping.pop_first();
190  
191              if !success {
192                  trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
193              }
194          }
195      }
196  
197      /// Ping all known peers.
198      fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
199          let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
200          inner.next_ping.clear();
201  
202          for peer_ip in peers {
203              let locators = inner.block_locators.clone();
204              let success = router.send_ping(peer_ip, locators);
205  
206              if !success {
207                  trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
208              }
209          }
210      }
211  }