ping.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::locators::BlockLocators; 17 use alphaos_node_router::Router; 18 use alphavm::prelude::Network; 19 20 #[cfg(feature = "locktick")] 21 use locktick::parking_lot::Mutex; 22 #[cfg(not(feature = "locktick"))] 23 use parking_lot::Mutex; 24 use std::{ 25 collections::BTreeMap, 26 net::SocketAddr, 27 sync::Arc, 28 time::{Duration, Instant}, 29 }; 30 use tokio::{sync::Notify, time::timeout}; 31 32 /// Internal state of the ping logic 33 /// 34 /// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs. 35 /// When a new peer connects or a Pong message is received, an entry in next ping is created 36 /// for when a peer should next be pinged. 37 /// 38 /// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection? 39 struct PingInner<N: Network> { 40 /// The next time we should ping a peer. 41 next_ping: BTreeMap<Instant, SocketAddr>, 42 /// The most recent block locators. 43 /// (or None if this node does not offer block sync) 44 block_locators: Option<BlockLocators<N>>, 45 } 46 47 /// Manages sending Ping messages to all connected peers. 48 pub struct Ping<N: Network> { 49 router: Router<N>, 50 inner: Arc<Mutex<PingInner<N>>>, 51 notify: Arc<Notify>, 52 } 53 54 impl<N: Network> PingInner<N> { 55 fn new(block_locators: Option<BlockLocators<N>>) -> Self { 56 Self { block_locators, next_ping: Default::default() } 57 } 58 } 59 60 impl<N: Network> Ping<N> { 61 /// The duration in seconds to wait between sending ping requests to a peer. 62 const MAX_PING_INTERVAL: Duration = Duration::from_secs(20); 63 64 /// Create a new instance of the ping logic. 65 /// There should only be one per node. 66 /// 67 /// # Usage 68 /// Initialize this with the most up-to-date block locators and call 69 /// update_block_locators, whenever a new block is received/created. 70 pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self { 71 let notify = Arc::new(Notify::default()); 72 let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators)))); 73 74 { 75 let inner = inner.clone(); 76 let router = router.clone(); 77 let notify = notify.clone(); 78 79 tokio::spawn(async move { 80 Self::ping_task(&inner, &router, ¬ify).await; 81 }); 82 } 83 84 Self { inner, router, notify } 85 } 86 87 /// Same as [`Self::new`] but for nodes that peers cannot sync from 88 /// such as provers. 89 pub fn new_nosync(router: Router<N>) -> Self { 90 let notify = Arc::new(Notify::default()); 91 let inner = Arc::new(Mutex::new(PingInner::new(None))); 92 93 { 94 let inner = inner.clone(); 95 let router = router.clone(); 96 let notify = notify.clone(); 97 98 tokio::spawn(async move { 99 Self::ping_task(&inner, &router, ¬ify).await; 100 }); 101 } 102 103 Self { inner, router, notify } 104 } 105 106 /// Notify the ping logic that we received a Pong response. 107 pub fn on_pong_received(&self, peer_ip: SocketAddr) { 108 let now = Instant::now(); 109 let mut inner = self.inner.lock(); 110 111 inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip); 112 113 // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL 114 } 115 116 /// Notify the ping logic that a new peer connected. 117 pub fn on_peer_connected(&self, peer_ip: SocketAddr) { 118 // Send the first ping. 119 let locators = self.inner.lock().block_locators.clone(); 120 if !self.router.send_ping(peer_ip, locators) { 121 warn!("Peer {peer_ip} connected and immediately disconnected?"); 122 } 123 } 124 125 /// Notify the ping logic that new blocks were created or synced. 126 pub fn update_block_locators(&self, locators: BlockLocators<N>) { 127 self.inner.lock().block_locators = Some(locators); 128 129 // wake up the ping task 130 self.notify.notify_one(); 131 } 132 133 /// Background task that periodically sends out new ping messages. 134 async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) { 135 let mut new_block = false; 136 137 loop { 138 // Do not hold the lock while waiting. 139 let sleep_time = { 140 let mut inner = inner.lock(); 141 let now = Instant::now(); 142 143 // Ping peers. 144 if new_block { 145 Self::ping_all_peers(&mut inner, router); 146 new_block = false; 147 } else { 148 Self::ping_expired_peers(now, &mut inner, router); 149 } 150 151 // Figure out how long to sleep. 152 if let Some((time, _)) = inner.next_ping.first_key_value() { 153 time.saturating_duration_since(now) 154 } else { 155 Self::MAX_PING_INTERVAL 156 } 157 }; 158 159 // wait to be woke up, either by timer or notify 160 if timeout(sleep_time, notify.notified()).await.is_ok() { 161 // If the timer is not expired, it means we got woken up by a new block. 162 new_block = true; 163 } 164 } 165 } 166 167 /// Ping all peers that have an expired timer. 168 fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) { 169 loop { 170 // Find next peer to contact. 171 let peer_ip = { 172 let Some((time, peer_ip)) = inner.next_ping.first_key_value() else { 173 return; 174 }; 175 176 if *time > now { 177 return; 178 } 179 180 *peer_ip 181 }; 182 183 // Send new ping 184 let locators = inner.block_locators.clone(); 185 let success = router.send_ping(peer_ip, locators.clone()); 186 inner.next_ping.pop_first(); 187 188 if !success { 189 trace!("Failed to send ping to peer {peer_ip}. Disconnected."); 190 } 191 } 192 } 193 194 /// Ping all known peers. 195 fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) { 196 let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect(); 197 inner.next_ping.clear(); 198 199 for peer_ip in peers { 200 let locators = inner.block_locators.clone(); 201 let success = router.send_ping(peer_ip, locators); 202 203 if !success { 204 trace!("Failed to send ping to peer {peer_ip}. Disconnected."); 205 } 206 } 207 } 208 }