mod.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // This file is part of the AlphaOS library. 3 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at: 7 8 // http://www.apache.org/licenses/LICENSE-2.0 9 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 16 mod router; 17 18 use crate::{ 19 bft::ledger_service::ProverLedgerService, 20 sync::{BlockSync, Ping}, 21 traits::NodeInterface, 22 }; 23 24 use alphaos_account::Account; 25 use alphaos_node_network::{NodeType, PeerPoolHandling}; 26 27 use alphaos_node_router::{ 28 Heartbeat, 29 Inbound, 30 Outbound, 31 Router, 32 Routing, 33 messages::{Message, UnconfirmedSolution}, 34 }; 35 use alphaos_node_tcp::{ 36 P2P, 37 protocols::{Disconnect, Handshake, OnConnect, Reading}, 38 }; 39 use alphaos_utilities::{SignalHandler, Stoppable}; 40 41 use alphavm::{ 42 ledger::narwhal::Data, 43 prelude::{ 44 Network, 45 block::{Block, Header}, 46 puzzle::{Puzzle, Solution}, 47 store::ConsensusStorage, 48 }, 49 synthesizer::VM, 50 }; 51 52 use alpha_std::StorageMode; 53 use alphaos_node_bft::helpers::fmt_id; 54 use anyhow::Result; 55 use colored::Colorize; 56 use core::{marker::PhantomData, time::Duration}; 57 #[cfg(feature = "locktick")] 58 use locktick::parking_lot::{Mutex, RwLock}; 59 #[cfg(not(feature = "locktick"))] 60 use parking_lot::{Mutex, RwLock}; 61 use rand::{CryptoRng, Rng, rngs::OsRng}; 62 use std::{ 63 net::SocketAddr, 64 sync::{ 65 Arc, 66 atomic::{AtomicU8, Ordering}, 67 }, 68 }; 69 use tokio::task::JoinHandle; 70 71 /// A prover is a light node, capable of producing proofs for consensus. 72 #[derive(Clone)] 73 pub struct Prover<N: Network, C: ConsensusStorage<N>> { 74 /// The router of the node. 75 router: Router<N>, 76 /// The sync module. 77 sync: Arc<BlockSync<N>>, 78 /// The genesis block. 79 genesis: Block<N>, 80 /// The puzzle. 81 puzzle: Puzzle<N>, 82 /// The latest epoch hash. 83 latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>, 84 /// The latest block header. 85 latest_block_header: Arc<RwLock<Option<Header<N>>>>, 86 /// The number of puzzle instances. 87 puzzle_instances: Arc<AtomicU8>, 88 /// The maximum number of puzzle instances. 89 max_puzzle_instances: u8, 90 /// The spawned handles. 91 handles: Arc<Mutex<Vec<JoinHandle<()>>>>, 92 /// Keeps track of sending pings. 93 ping: Arc<Ping<N>>, 94 /// The signal handling logic. 95 signal_handler: Arc<SignalHandler>, 96 /// PhantomData. 97 _phantom: PhantomData<C>, 98 } 99 100 impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> { 101 /// Initializes a new prover node. 102 pub async fn new( 103 node_ip: SocketAddr, 104 account: Account<N>, 105 trusted_peers: &[SocketAddr], 106 genesis: Block<N>, 107 storage_mode: StorageMode, 108 trusted_peers_only: bool, 109 dev: Option<u16>, 110 signal_handler: Arc<SignalHandler>, 111 ) -> Result<Self> { 112 // Initialize the ledger service. 113 let ledger_service = Arc::new(ProverLedgerService::new()); 114 115 // Initialize the node router. 116 let router = Router::new( 117 node_ip, 118 NodeType::Prover, 119 account, 120 ledger_service.clone(), 121 trusted_peers, 122 Self::MAXIMUM_NUMBER_OF_PEERS as u16, 123 trusted_peers_only, 124 storage_mode, 125 dev.is_some(), 126 ) 127 .await?; 128 129 // Initialize the sync module. 130 let sync = BlockSync::new(ledger_service.clone()); 131 132 // Set up the ping logic. 133 let ping = Arc::new(Ping::new_nosync(router.clone())); 134 135 // Compute the maximum number of puzzle instances. 136 let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6); 137 // Initialize the node. 138 let node = Self { 139 router, 140 sync: Arc::new(sync), 141 genesis, 142 puzzle: VM::<N, C>::new_puzzle()?, 143 latest_epoch_hash: Default::default(), 144 latest_block_header: Default::default(), 145 puzzle_instances: Default::default(), 146 max_puzzle_instances: u8::try_from(max_puzzle_instances)?, 147 handles: Default::default(), 148 ping, 149 signal_handler, 150 _phantom: Default::default(), 151 }; 152 // Initialize the routing. 153 node.initialize_routing().await; 154 // Initialize the puzzle. 155 node.initialize_puzzle().await; 156 // Initialize the notification message loop. 157 node.handles.lock().push(crate::start_notification_message_loop()); 158 159 // Return the node. 160 Ok(node) 161 } 162 163 pub fn router(&self) -> &Router<N> { 164 &self.router 165 } 166 } 167 168 #[async_trait] 169 impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> { 170 /// Shuts down the node. 171 async fn shut_down(&self) { 172 info!("Shutting down..."); 173 174 // Shut down the puzzle. 175 debug!("Shutting down the puzzle..."); 176 177 // Abort the tasks. 178 debug!("Shutting down the prover..."); 179 self.handles.lock().iter().for_each(|handle| handle.abort()); 180 181 // Shut down the router. 182 self.router.shut_down().await; 183 184 info!("Node has shut down."); 185 } 186 } 187 188 impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> { 189 /// Initialize a new instance of the puzzle. 190 async fn initialize_puzzle(&self) { 191 for _ in 0..self.max_puzzle_instances { 192 let prover = self.clone(); 193 self.handles.lock().push(tokio::spawn(async move { 194 prover.puzzle_loop().await; 195 })); 196 } 197 } 198 199 /// Executes an instance of the puzzle. 200 async fn puzzle_loop(&self) { 201 loop { 202 // If the node is not connected to any peers, then skip this iteration. 203 if self.router.number_of_connected_peers() == 0 { 204 debug!("Skipping an iteration of the puzzle (no connected peers)"); 205 tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await; 206 continue; 207 } 208 209 // If the number of instances of the puzzle exceeds the maximum, then skip this iteration. 210 if self.num_puzzle_instances() > self.max_puzzle_instances { 211 // Sleep for a brief period of time. 212 tokio::time::sleep(Duration::from_millis(500)).await; 213 continue; 214 } 215 216 // Read the latest epoch hash. 217 let latest_epoch_hash = *self.latest_epoch_hash.read(); 218 // Read the latest state. 219 let latest_state = self 220 .latest_block_header 221 .read() 222 .as_ref() 223 .map(|header| (header.coinbase_target(), header.proof_target())); 224 225 // If the latest epoch hash and latest state exists, then proceed to generate a solution. 226 if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) { 227 // Execute the puzzle. 228 let prover = self.clone(); 229 let result = tokio::task::spawn_blocking(move || { 230 prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng) 231 }) 232 .await; 233 234 // If the prover found a solution, then broadcast it. 235 if let Ok(Some((solution_target, solution))) = result { 236 info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id()); 237 // Broadcast the solution. 238 self.broadcast_solution(solution); 239 } 240 } else { 241 // Otherwise, sleep for a brief period of time, to await for puzzle state. 242 tokio::time::sleep(Duration::from_secs(1)).await; 243 } 244 245 // If the Ctrl-C handler registered the signal, stop the prover. 246 if self.signal_handler.is_stopped() { 247 debug!("Shutting down the puzzle..."); 248 break; 249 } 250 } 251 } 252 253 /// Performs one iteration of the puzzle. 254 fn puzzle_iteration<R: Rng + CryptoRng>( 255 &self, 256 epoch_hash: N::BlockHash, 257 coinbase_target: u64, 258 proof_target: u64, 259 rng: &mut R, 260 ) -> Option<(u64, Solution<N>)> { 261 // Increment the puzzle instances. 262 self.increment_puzzle_instances(); 263 264 debug!( 265 "Proving 'Puzzle' for Epoch '{}' {}", 266 fmt_id(epoch_hash), 267 format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed() 268 ); 269 270 // Compute the solution. 271 let result = 272 self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| { 273 self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution)) 274 }); 275 276 // Decrement the puzzle instances. 277 self.decrement_puzzle_instances(); 278 // Return the result. 279 result 280 } 281 282 /// Broadcasts the solution to the network. 283 fn broadcast_solution(&self, solution: Solution<N>) { 284 // Prepare the unconfirmed solution message. 285 let message = Message::UnconfirmedSolution(UnconfirmedSolution { 286 solution_id: solution.id(), 287 solution: Data::Object(solution), 288 }); 289 // Propagate the "UnconfirmedSolution". 290 self.propagate(message, &[]); 291 } 292 293 /// Returns the current number of puzzle instances. 294 fn num_puzzle_instances(&self) -> u8 { 295 self.puzzle_instances.load(Ordering::Relaxed) 296 } 297 298 /// Increments the number of puzzle instances. 299 fn increment_puzzle_instances(&self) { 300 self.puzzle_instances.fetch_add(1, Ordering::Relaxed); 301 #[cfg(debug_assertions)] 302 trace!("Number of Instances - {}", self.num_puzzle_instances()); 303 } 304 305 /// Decrements the number of puzzle instances. 306 fn decrement_puzzle_instances(&self) { 307 self.puzzle_instances.fetch_sub(1, Ordering::Relaxed); 308 #[cfg(debug_assertions)] 309 trace!("Number of Instances - {}", self.num_puzzle_instances()); 310 } 311 }