/ node / sync / src / ping.rs
ping.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  use crate::locators::BlockLocators;
 17  use alphaos_node_router::Router;
 18  use alphavm::prelude::Network;
 19  
 20  #[cfg(feature = "locktick")]
 21  use locktick::parking_lot::Mutex;
 22  #[cfg(not(feature = "locktick"))]
 23  use parking_lot::Mutex;
 24  use std::{
 25      collections::BTreeMap,
 26      net::SocketAddr,
 27      sync::Arc,
 28      time::{Duration, Instant},
 29  };
 30  use tokio::{sync::Notify, time::timeout};
 31  
 32  /// Internal state of the ping logic
 33  ///
 34  /// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs.
 35  /// When a new peer connects or a Pong message is received, an entry in next ping is created
 36  /// for when a peer should next be pinged.
 37  ///
 38  /// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection?
 39  struct PingInner<N: Network> {
 40      /// The next time we should ping a peer.
 41      next_ping: BTreeMap<Instant, SocketAddr>,
 42      /// The most recent block locators.
 43      /// (or None if this node does not offer block sync)
 44      block_locators: Option<BlockLocators<N>>,
 45  }
 46  
 47  /// Manages sending Ping messages to all connected peers.
 48  pub struct Ping<N: Network> {
 49      router: Router<N>,
 50      inner: Arc<Mutex<PingInner<N>>>,
 51      notify: Arc<Notify>,
 52  }
 53  
 54  impl<N: Network> PingInner<N> {
 55      fn new(block_locators: Option<BlockLocators<N>>) -> Self {
 56          Self { block_locators, next_ping: Default::default() }
 57      }
 58  }
 59  
 60  impl<N: Network> Ping<N> {
 61      /// The duration in seconds to wait between sending ping requests to a peer.
 62      const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
 63  
 64      /// Create a new instance of the ping logic.
 65      /// There should only be one per node.
 66      ///
 67      /// # Usage
 68      /// Initialize this with the most up-to-date block locators and call
 69      /// update_block_locators, whenever a new block is received/created.
 70      pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
 71          let notify = Arc::new(Notify::default());
 72          let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
 73  
 74          {
 75              let inner = inner.clone();
 76              let router = router.clone();
 77              let notify = notify.clone();
 78  
 79              tokio::spawn(async move {
 80                  Self::ping_task(&inner, &router, &notify).await;
 81              });
 82          }
 83  
 84          Self { inner, router, notify }
 85      }
 86  
 87      /// Same as [`Self::new`] but for nodes that peers cannot sync from
 88      /// such as provers.
 89      pub fn new_nosync(router: Router<N>) -> Self {
 90          let notify = Arc::new(Notify::default());
 91          let inner = Arc::new(Mutex::new(PingInner::new(None)));
 92  
 93          {
 94              let inner = inner.clone();
 95              let router = router.clone();
 96              let notify = notify.clone();
 97  
 98              tokio::spawn(async move {
 99                  Self::ping_task(&inner, &router, &notify).await;
100              });
101          }
102  
103          Self { inner, router, notify }
104      }
105  
106      /// Notify the ping logic that we received a Pong response.
107      pub fn on_pong_received(&self, peer_ip: SocketAddr) {
108          let now = Instant::now();
109          let mut inner = self.inner.lock();
110  
111          inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
112  
113          // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL
114      }
115  
116      /// Notify the ping logic that a new peer connected.
117      pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
118          // Send the first ping.
119          let locators = self.inner.lock().block_locators.clone();
120          if !self.router.send_ping(peer_ip, locators) {
121              warn!("Peer {peer_ip} connected and immediately disconnected?");
122          }
123      }
124  
125      /// Notify the ping logic that new blocks were created or synced.
126      pub fn update_block_locators(&self, locators: BlockLocators<N>) {
127          self.inner.lock().block_locators = Some(locators);
128  
129          // wake up the ping task
130          self.notify.notify_one();
131      }
132  
133      /// Background task that periodically sends out new ping messages.
134      async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
135          let mut new_block = false;
136  
137          loop {
138              // Do not hold the lock while waiting.
139              let sleep_time = {
140                  let mut inner = inner.lock();
141                  let now = Instant::now();
142  
143                  // Ping peers.
144                  if new_block {
145                      Self::ping_all_peers(&mut inner, router);
146                      new_block = false;
147                  } else {
148                      Self::ping_expired_peers(now, &mut inner, router);
149                  }
150  
151                  // Figure out how long to sleep.
152                  if let Some((time, _)) = inner.next_ping.first_key_value() {
153                      time.saturating_duration_since(now)
154                  } else {
155                      Self::MAX_PING_INTERVAL
156                  }
157              };
158  
159              // wait to be woke up, either by timer or notify
160              if timeout(sleep_time, notify.notified()).await.is_ok() {
161                  // If the timer is not expired, it means we got woken up by a new block.
162                  new_block = true;
163              }
164          }
165      }
166  
167      /// Ping all peers that have an expired timer.
168      fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
169          loop {
170              // Find next peer to contact.
171              let peer_ip = {
172                  let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
173                      return;
174                  };
175  
176                  if *time > now {
177                      return;
178                  }
179  
180                  *peer_ip
181              };
182  
183              // Send new ping
184              let locators = inner.block_locators.clone();
185              let success = router.send_ping(peer_ip, locators.clone());
186              inner.next_ping.pop_first();
187  
188              if !success {
189                  trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
190              }
191          }
192      }
193  
194      /// Ping all known peers.
195      fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
196          let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
197          inner.next_ping.clear();
198  
199          for peer_ip in peers {
200              let locators = inner.block_locators.clone();
201              let success = router.send_ping(peer_ip, locators);
202  
203              if !success {
204                  trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
205              }
206          }
207      }
208  }