/ node / src / prover / mod.rs
mod.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // This file is part of the AlphaOS library.
  3  
  4  // Licensed under the Apache License, Version 2.0 (the "License");
  5  // you may not use this file except in compliance with the License.
  6  // You may obtain a copy of the License at:
  7  
  8  // http://www.apache.org/licenses/LICENSE-2.0
  9  
 10  // Unless required by applicable law or agreed to in writing, software
 11  // distributed under the License is distributed on an "AS IS" BASIS,
 12  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13  // See the License for the specific language governing permissions and
 14  // limitations under the License.
 15  
 16  mod router;
 17  
 18  use crate::{
 19      bft::ledger_service::ProverLedgerService,
 20      sync::{BlockSync, Ping},
 21      traits::NodeInterface,
 22  };
 23  
 24  use alphaos_account::Account;
 25  use alphaos_node_network::{NodeType, PeerPoolHandling};
 26  
 27  use alphaos_node_router::{
 28      Heartbeat,
 29      Inbound,
 30      Outbound,
 31      Router,
 32      Routing,
 33      messages::{Message, UnconfirmedSolution},
 34  };
 35  use alphaos_node_tcp::{
 36      P2P,
 37      protocols::{Disconnect, Handshake, OnConnect, Reading},
 38  };
 39  use alphaos_utilities::{SignalHandler, Stoppable};
 40  
 41  use alphavm::{
 42      ledger::narwhal::Data,
 43      prelude::{
 44          Network,
 45          block::{Block, Header},
 46          puzzle::{Puzzle, Solution},
 47          store::ConsensusStorage,
 48      },
 49      synthesizer::VM,
 50  };
 51  
 52  use alpha_std::StorageMode;
 53  use alphaos_node_bft::helpers::fmt_id;
 54  use anyhow::Result;
 55  use colored::Colorize;
 56  use core::{marker::PhantomData, time::Duration};
 57  #[cfg(feature = "locktick")]
 58  use locktick::parking_lot::{Mutex, RwLock};
 59  #[cfg(not(feature = "locktick"))]
 60  use parking_lot::{Mutex, RwLock};
 61  use rand::{CryptoRng, Rng, rngs::OsRng};
 62  use std::{
 63      net::SocketAddr,
 64      sync::{
 65          Arc,
 66          atomic::{AtomicU8, Ordering},
 67      },
 68  };
 69  use tokio::task::JoinHandle;
 70  
 71  /// A prover is a light node, capable of producing proofs for consensus.
 72  #[derive(Clone)]
 73  pub struct Prover<N: Network, C: ConsensusStorage<N>> {
 74      /// The router of the node.
 75      router: Router<N>,
 76      /// The sync module.
 77      sync: Arc<BlockSync<N>>,
 78      /// The genesis block.
 79      genesis: Block<N>,
 80      /// The puzzle.
 81      puzzle: Puzzle<N>,
 82      /// The latest epoch hash.
 83      latest_epoch_hash: Arc<RwLock<Option<N::BlockHash>>>,
 84      /// The latest block header.
 85      latest_block_header: Arc<RwLock<Option<Header<N>>>>,
 86      /// The number of puzzle instances.
 87      puzzle_instances: Arc<AtomicU8>,
 88      /// The maximum number of puzzle instances.
 89      max_puzzle_instances: u8,
 90      /// The spawned handles.
 91      handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
 92      /// Keeps track of sending pings.
 93      ping: Arc<Ping<N>>,
 94      /// The signal handling logic.
 95      signal_handler: Arc<SignalHandler>,
 96      /// PhantomData.
 97      _phantom: PhantomData<C>,
 98  }
 99  
100  impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
101      /// Initializes a new prover node.
102      pub async fn new(
103          node_ip: SocketAddr,
104          account: Account<N>,
105          trusted_peers: &[SocketAddr],
106          genesis: Block<N>,
107          storage_mode: StorageMode,
108          trusted_peers_only: bool,
109          dev: Option<u16>,
110          signal_handler: Arc<SignalHandler>,
111      ) -> Result<Self> {
112          // Initialize the ledger service.
113          let ledger_service = Arc::new(ProverLedgerService::new());
114  
115          // Initialize the node router.
116          let router = Router::new(
117              node_ip,
118              NodeType::Prover,
119              account,
120              ledger_service.clone(),
121              trusted_peers,
122              Self::MAXIMUM_NUMBER_OF_PEERS as u16,
123              trusted_peers_only,
124              storage_mode,
125              dev.is_some(),
126          )
127          .await?;
128  
129          // Initialize the sync module.
130          let sync = BlockSync::new(ledger_service.clone());
131  
132          // Set up the ping logic.
133          let ping = Arc::new(Ping::new_nosync(router.clone()));
134  
135          // Compute the maximum number of puzzle instances.
136          let max_puzzle_instances = num_cpus::get().saturating_sub(2).clamp(1, 6);
137          // Initialize the node.
138          let node = Self {
139              router,
140              sync: Arc::new(sync),
141              genesis,
142              puzzle: VM::<N, C>::new_puzzle()?,
143              latest_epoch_hash: Default::default(),
144              latest_block_header: Default::default(),
145              puzzle_instances: Default::default(),
146              max_puzzle_instances: u8::try_from(max_puzzle_instances)?,
147              handles: Default::default(),
148              ping,
149              signal_handler,
150              _phantom: Default::default(),
151          };
152          // Initialize the routing.
153          node.initialize_routing().await;
154          // Initialize the puzzle.
155          node.initialize_puzzle().await;
156          // Initialize the notification message loop.
157          node.handles.lock().push(crate::start_notification_message_loop());
158  
159          // Return the node.
160          Ok(node)
161      }
162  
163      pub fn router(&self) -> &Router<N> {
164          &self.router
165      }
166  }
167  
168  #[async_trait]
169  impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
170      /// Shuts down the node.
171      async fn shut_down(&self) {
172          info!("Shutting down...");
173  
174          // Shut down the puzzle.
175          debug!("Shutting down the puzzle...");
176  
177          // Abort the tasks.
178          debug!("Shutting down the prover...");
179          self.handles.lock().iter().for_each(|handle| handle.abort());
180  
181          // Shut down the router.
182          self.router.shut_down().await;
183  
184          info!("Node has shut down.");
185      }
186  }
187  
188  impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
189      /// Initialize a new instance of the puzzle.
190      async fn initialize_puzzle(&self) {
191          for _ in 0..self.max_puzzle_instances {
192              let prover = self.clone();
193              self.handles.lock().push(tokio::spawn(async move {
194                  prover.puzzle_loop().await;
195              }));
196          }
197      }
198  
199      /// Executes an instance of the puzzle.
200      async fn puzzle_loop(&self) {
201          loop {
202              // If the node is not connected to any peers, then skip this iteration.
203              if self.router.number_of_connected_peers() == 0 {
204                  debug!("Skipping an iteration of the puzzle (no connected peers)");
205                  tokio::time::sleep(Duration::from_secs(N::ANCHOR_TIME as u64)).await;
206                  continue;
207              }
208  
209              // If the number of instances of the puzzle exceeds the maximum, then skip this iteration.
210              if self.num_puzzle_instances() > self.max_puzzle_instances {
211                  // Sleep for a brief period of time.
212                  tokio::time::sleep(Duration::from_millis(500)).await;
213                  continue;
214              }
215  
216              // Read the latest epoch hash.
217              let latest_epoch_hash = *self.latest_epoch_hash.read();
218              // Read the latest state.
219              let latest_state = self
220                  .latest_block_header
221                  .read()
222                  .as_ref()
223                  .map(|header| (header.coinbase_target(), header.proof_target()));
224  
225              // If the latest epoch hash and latest state exists, then proceed to generate a solution.
226              if let (Some(epoch_hash), Some((coinbase_target, proof_target))) = (latest_epoch_hash, latest_state) {
227                  // Execute the puzzle.
228                  let prover = self.clone();
229                  let result = tokio::task::spawn_blocking(move || {
230                      prover.puzzle_iteration(epoch_hash, coinbase_target, proof_target, &mut OsRng)
231                  })
232                  .await;
233  
234                  // If the prover found a solution, then broadcast it.
235                  if let Ok(Some((solution_target, solution))) = result {
236                      info!("Found a Solution '{}' (Proof Target {solution_target})", solution.id());
237                      // Broadcast the solution.
238                      self.broadcast_solution(solution);
239                  }
240              } else {
241                  // Otherwise, sleep for a brief period of time, to await for puzzle state.
242                  tokio::time::sleep(Duration::from_secs(1)).await;
243              }
244  
245              // If the Ctrl-C handler registered the signal, stop the prover.
246              if self.signal_handler.is_stopped() {
247                  debug!("Shutting down the puzzle...");
248                  break;
249              }
250          }
251      }
252  
253      /// Performs one iteration of the puzzle.
254      fn puzzle_iteration<R: Rng + CryptoRng>(
255          &self,
256          epoch_hash: N::BlockHash,
257          coinbase_target: u64,
258          proof_target: u64,
259          rng: &mut R,
260      ) -> Option<(u64, Solution<N>)> {
261          // Increment the puzzle instances.
262          self.increment_puzzle_instances();
263  
264          debug!(
265              "Proving 'Puzzle' for Epoch '{}' {}",
266              fmt_id(epoch_hash),
267              format!("(Coinbase Target {coinbase_target}, Proof Target {proof_target})").dimmed()
268          );
269  
270          // Compute the solution.
271          let result =
272              self.puzzle.prove(epoch_hash, self.address(), rng.r#gen(), Some(proof_target)).ok().and_then(|solution| {
273                  self.puzzle.get_proof_target(&solution).ok().map(|solution_target| (solution_target, solution))
274              });
275  
276          // Decrement the puzzle instances.
277          self.decrement_puzzle_instances();
278          // Return the result.
279          result
280      }
281  
282      /// Broadcasts the solution to the network.
283      fn broadcast_solution(&self, solution: Solution<N>) {
284          // Prepare the unconfirmed solution message.
285          let message = Message::UnconfirmedSolution(UnconfirmedSolution {
286              solution_id: solution.id(),
287              solution: Data::Object(solution),
288          });
289          // Propagate the "UnconfirmedSolution".
290          self.propagate(message, &[]);
291      }
292  
293      /// Returns the current number of puzzle instances.
294      fn num_puzzle_instances(&self) -> u8 {
295          self.puzzle_instances.load(Ordering::Relaxed)
296      }
297  
298      /// Increments the number of puzzle instances.
299      fn increment_puzzle_instances(&self) {
300          self.puzzle_instances.fetch_add(1, Ordering::Relaxed);
301          #[cfg(debug_assertions)]
302          trace!("Number of Instances - {}", self.num_puzzle_instances());
303      }
304  
305      /// Decrements the number of puzzle instances.
306      fn decrement_puzzle_instances(&self) {
307          self.puzzle_instances.fetch_sub(1, Ordering::Relaxed);
308          #[cfg(debug_assertions)]
309          trace!("Number of Instances - {}", self.num_puzzle_instances());
310      }
311  }