routing.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 use crate::{Heartbeat, Inbound, Outbound}; 17 use alphaos_node_tcp::{ 18 P2P, 19 protocols::{Disconnect, Handshake, OnConnect, Writing}, 20 }; 21 use alphavm::prelude::Network; 22 23 use std::time::{Duration, Instant}; 24 25 #[async_trait] 26 pub trait Routing<N: Network>: 27 P2P + Disconnect + OnConnect + Handshake + Inbound<N> + Outbound<N> + Heartbeat<N> 28 { 29 /// Initialize the routing. 30 async fn initialize_routing(&self) { 31 // Enable the TCP protocols. 32 self.enable_handshake().await; 33 self.enable_reading().await; 34 self.router().enable_writing().await; 35 self.enable_disconnect().await; 36 self.enable_on_connect().await; 37 // Enable the TCP listener. Note: This must be called after the above protocols. 38 self.enable_listener().await; 39 // Initialize the heartbeat. 40 self.initialize_heartbeat(); 41 } 42 43 // Start listening for inbound connections. 44 async fn enable_listener(&self) { 45 let listen_addr = self.tcp().enable_listener().await.expect("Failed to enable the TCP listener"); 46 debug!("Listening for peer connections at address {listen_addr:?}"); 47 } 48 49 /// Spawns the heartbeat background task for this instance of `Routing`. 50 fn initialize_heartbeat(&self) { 51 let self_clone = self.clone(); 52 self.router().spawn(async move { 53 // Sleep for `HEARTBEAT_IN_SECS` seconds. 54 let min_heartbeat_interval = Duration::from_secs(Self::HEARTBEAT_IN_SECS); 55 let mut last_update = Instant::now(); 56 57 loop { 58 // Process a heartbeat in the router. 59 self_clone.heartbeat().await; 60 61 // Figure out how long the heartbeat took 62 let now = Instant::now(); 63 let elapsed = now.saturating_duration_since(last_update); 64 last_update = now; 65 66 // (Potentially) sleep to avoid invoking heartbeat too frequently. 67 let sleep_time = min_heartbeat_interval.saturating_sub(elapsed); 68 if !sleep_time.is_zero() { 69 tokio::time::sleep(sleep_time).await; 70 } 71 } 72 }); 73 } 74 }