ping.rs
1 // Copyright (c) 2025-2026 ACDC Network 2 // This file is part of the alphaos library. 3 // 4 // Alpha Chain | Delta Chain Protocol 5 // International Monetary Graphite. 6 // 7 // Derived from Aleo (https://aleo.org) and ProvableHQ (https://provable.com). 8 // They built world-class ZK infrastructure. We installed the EASY button. 9 // Their cryptography: elegant. Our modifications: bureaucracy-compatible. 10 // Original brilliance: theirs. Robert's Rules: ours. Bugs: definitely ours. 11 // 12 // Original Aleo/ProvableHQ code subject to Apache 2.0 https://www.apache.org/licenses/LICENSE-2.0 13 // All modifications and new work: CC0 1.0 Universal Public Domain Dedication. 14 // No rights reserved. No permission required. No warranty. No refunds. 15 // 16 // https://creativecommons.org/publicdomain/zero/1.0/ 17 // SPDX-License-Identifier: CC0-1.0 18 19 use crate::locators::BlockLocators; 20 use alphaos_node_router::Router; 21 use alphavm::prelude::Network; 22 23 #[cfg(feature = "locktick")] 24 use locktick::parking_lot::Mutex; 25 #[cfg(not(feature = "locktick"))] 26 use parking_lot::Mutex; 27 use std::{ 28 collections::BTreeMap, 29 net::SocketAddr, 30 sync::Arc, 31 time::{Duration, Instant}, 32 }; 33 use tokio::{sync::Notify, time::timeout}; 34 35 /// Internal state of the ping logic 36 /// 37 /// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs. 38 /// When a new peer connects or a Pong message is received, an entry in next ping is created 39 /// for when a peer should next be pinged. 40 /// 41 /// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection? 42 struct PingInner<N: Network> { 43 /// The next time we should ping a peer. 44 next_ping: BTreeMap<Instant, SocketAddr>, 45 /// The most recent block locators. 46 /// (or None if this node does not offer block sync) 47 block_locators: Option<BlockLocators<N>>, 48 } 49 50 /// Manages sending Ping messages to all connected peers. 51 pub struct Ping<N: Network> { 52 router: Router<N>, 53 inner: Arc<Mutex<PingInner<N>>>, 54 notify: Arc<Notify>, 55 } 56 57 impl<N: Network> PingInner<N> { 58 fn new(block_locators: Option<BlockLocators<N>>) -> Self { 59 Self { block_locators, next_ping: Default::default() } 60 } 61 } 62 63 impl<N: Network> Ping<N> { 64 /// The duration in seconds to wait between sending ping requests to a peer. 65 const MAX_PING_INTERVAL: Duration = Duration::from_secs(20); 66 67 /// Create a new instance of the ping logic. 68 /// There should only be one per node. 69 /// 70 /// # Usage 71 /// Initialize this with the most up-to-date block locators and call 72 /// update_block_locators, whenever a new block is received/created. 73 pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self { 74 let notify = Arc::new(Notify::default()); 75 let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators)))); 76 77 { 78 let inner = inner.clone(); 79 let router = router.clone(); 80 let notify = notify.clone(); 81 82 tokio::spawn(async move { 83 Self::ping_task(&inner, &router, ¬ify).await; 84 }); 85 } 86 87 Self { inner, router, notify } 88 } 89 90 /// Same as [`Self::new`] but for nodes that peers cannot sync from 91 /// such as provers. 92 pub fn new_nosync(router: Router<N>) -> Self { 93 let notify = Arc::new(Notify::default()); 94 let inner = Arc::new(Mutex::new(PingInner::new(None))); 95 96 { 97 let inner = inner.clone(); 98 let router = router.clone(); 99 let notify = notify.clone(); 100 101 tokio::spawn(async move { 102 Self::ping_task(&inner, &router, ¬ify).await; 103 }); 104 } 105 106 Self { inner, router, notify } 107 } 108 109 /// Notify the ping logic that we received a Pong response. 110 pub fn on_pong_received(&self, peer_ip: SocketAddr) { 111 let now = Instant::now(); 112 let mut inner = self.inner.lock(); 113 114 inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip); 115 116 // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL 117 } 118 119 /// Notify the ping logic that a new peer connected. 120 pub fn on_peer_connected(&self, peer_ip: SocketAddr) { 121 // Send the first ping. 122 let locators = self.inner.lock().block_locators.clone(); 123 if !self.router.send_ping(peer_ip, locators) { 124 warn!("Peer {peer_ip} connected and immediately disconnected?"); 125 } 126 } 127 128 /// Notify the ping logic that new blocks were created or synced. 129 pub fn update_block_locators(&self, locators: BlockLocators<N>) { 130 self.inner.lock().block_locators = Some(locators); 131 132 // wake up the ping task 133 self.notify.notify_one(); 134 } 135 136 /// Background task that periodically sends out new ping messages. 137 async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) { 138 let mut new_block = false; 139 140 loop { 141 // Do not hold the lock while waiting. 142 let sleep_time = { 143 let mut inner = inner.lock(); 144 let now = Instant::now(); 145 146 // Ping peers. 147 if new_block { 148 Self::ping_all_peers(&mut inner, router); 149 new_block = false; 150 } else { 151 Self::ping_expired_peers(now, &mut inner, router); 152 } 153 154 // Figure out how long to sleep. 155 if let Some((time, _)) = inner.next_ping.first_key_value() { 156 time.saturating_duration_since(now) 157 } else { 158 Self::MAX_PING_INTERVAL 159 } 160 }; 161 162 // wait to be woke up, either by timer or notify 163 if timeout(sleep_time, notify.notified()).await.is_ok() { 164 // If the timer is not expired, it means we got woken up by a new block. 165 new_block = true; 166 } 167 } 168 } 169 170 /// Ping all peers that have an expired timer. 171 fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) { 172 loop { 173 // Find next peer to contact. 174 let peer_ip = { 175 let Some((time, peer_ip)) = inner.next_ping.first_key_value() else { 176 return; 177 }; 178 179 if *time > now { 180 return; 181 } 182 183 *peer_ip 184 }; 185 186 // Send new ping 187 let locators = inner.block_locators.clone(); 188 let success = router.send_ping(peer_ip, locators.clone()); 189 inner.next_ping.pop_first(); 190 191 if !success { 192 trace!("Failed to send ping to peer {peer_ip}. Disconnected."); 193 } 194 } 195 } 196 197 /// Ping all known peers. 198 fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) { 199 let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect(); 200 inner.next_ping.clear(); 201 202 for peer_ip in peers { 203 let locators = inner.block_locators.clone(); 204 let success = router.send_ping(peer_ip, locators); 205 206 if !success { 207 trace!("Failed to send ping to peer {peer_ip}. Disconnected."); 208 } 209 } 210 } 211 }