/ node / src / lib.rs
lib.rs
  1  // Copyright (c) 2025 ADnet Contributors
  2  // DeltaOS node implementation
  3  // SPDX-License-Identifier: Apache-2.0
  4  
  5  //! DeltaOS Node Implementation
  6  //!
  7  //! This module provides the core node functionality for the DELTA chain,
  8  //! including consensus, networking, REST API, and state management.
  9  //!
 10  //! Architecture:
 11  //! - Uses deltavm for execution and state management
 12  //! - REST API for external interactions (including exchange endpoints)
 13  //! - Simplified consensus (PoS + BFT) for block production
 14  //! - Hybrid storage: account model for trading, record model for governance
 15  //! - Integrated exchange module for spot trading (F-D01 to F-D07)
 16  //! - P2P networking for peer communication and message propagation
 17  
 18  pub mod config;
 19  pub mod consensus;
 20  pub mod error;
 21  pub mod ipc_server;
 22  pub mod metrics;
 23  pub mod p2p;
 24  pub mod rest;
 25  pub mod signing;
 26  pub mod state;
 27  pub mod storage;
 28  
 29  pub use config::*;
 30  pub use consensus::*;
 31  pub use error::*;
 32  pub use ipc_server::{
 33      DeltaIpcServer, IpcServerConfig as DeltaIpcServerConfig, IpcServerState as DeltaIpcServerState,
 34  };
 35  pub use metrics::{Metrics, MetricsServer};
 36  pub use p2p::*;
 37  pub use rest::*;
 38  pub use signing::*;
 39  pub use state::*;
 40  pub use storage::*;
 41  
 42  use deltavm_execution::DeltaRuntime;
 43  use std::collections::HashMap;
 44  use std::net::SocketAddr;
 45  use std::sync::Arc;
 46  use tokio::sync::{RwLock, mpsc};
 47  
 48  // Re-export exchange types for convenience
 49  pub use deltavm_execution::{
 50      ExternalChain, MatchingEngine, Order, OrderId, OrderSide, OrderStatus, OrderType, TradingPair,
 51      TradingResult, TradingTx,
 52  };
 53  
 54  /// The DELTA chain node
 55  pub struct DeltaNode {
 56      /// Node configuration
 57      pub config: NodeConfig,
 58      /// The execution runtime
 59      pub runtime: Arc<RwLock<DeltaRuntime>>,
 60      /// Node state
 61      pub state: Arc<RwLock<NodeState>>,
 62      /// Persistent storage (RocksDB)
 63      pub storage: Option<Arc<Storage>>,
 64      /// P2P network handler
 65      pub p2p: Option<DeltaP2P>,
 66      /// IPC server for bridge communication
 67      pub ipc_server: Option<DeltaIpcServer>,
 68      /// Shutdown signal sender
 69      shutdown_tx: Option<mpsc::Sender<()>>,
 70  }
 71  
 72  impl DeltaNode {
 73      /// Create a new DELTA node
 74      pub fn new(config: NodeConfig) -> Self {
 75          Self {
 76              config,
 77              runtime: Arc::new(RwLock::new(DeltaRuntime::new())),
 78              state: Arc::new(RwLock::new(NodeState::default())),
 79              storage: None,
 80              p2p: None,
 81              ipc_server: None,
 82              shutdown_tx: None,
 83          }
 84      }
 85  
 86      /// Create a new DELTA node with IPC server enabled
 87      pub fn new_with_ipc(config: NodeConfig, ipc_config: DeltaIpcServerConfig) -> Self {
 88          let runtime = Arc::new(RwLock::new(DeltaRuntime::new()));
 89          let ipc_server = DeltaIpcServer::new(ipc_config, Some(runtime.clone()));
 90  
 91          Self {
 92              config,
 93              runtime,
 94              state: Arc::new(RwLock::new(NodeState::default())),
 95              storage: None,
 96              p2p: None,
 97              ipc_server: Some(ipc_server),
 98              shutdown_tx: None,
 99          }
100      }
101  
102      /// Start the node
103      pub async fn start(&mut self) -> Result<(), NodeError> {
104          tracing::info!("Starting DELTA node...");
105          tracing::info!("  Node type: {:?}", self.config.node_type);
106          tracing::info!("  Data dir: {:?}", self.config.data_dir);
107          tracing::info!("  REST port: {}", self.config.rest_port);
108          tracing::info!("  P2P port: {}", self.config.p2p_port);
109  
110          // Initialize persistent storage
111          let storage_path = self.config.data_dir.join("db");
112          std::fs::create_dir_all(&storage_path).map_err(|e| {
113              NodeError::StartupError(format!("Failed to create storage directory: {}", e))
114          })?;
115  
116          let storage = Storage::open(&storage_path)
117              .map_err(|e| NodeError::StartupError(format!("Failed to open storage: {}", e)))?;
118          let storage = Arc::new(storage);
119          self.storage = Some(storage.clone());
120  
121          // Recover state from storage or create genesis block
122          let node_address = format!("delta_validator_{}", self.config.rest_port);
123          let (starting_height, starting_hash) = match storage.get_chain_height() {
124              Ok(height) if height > 0 => {
125                  tracing::info!("  Recovered chain height: {}", height);
126                  // Get the last block hash for consensus continuity
127                  let last_hash = storage
128                      .get_last_block_hash()
129                      .map_err(|e| {
130                          NodeError::StartupError(format!("Failed to get last block hash: {}", e))
131                      })?
132                      .unwrap_or([0u8; 32]);
133  
134                  // Update runtime block height to match stored state
135                  {
136                      let mut runtime = self.runtime.write().await;
137                      runtime.block_height = height;
138                  }
139                  {
140                      let mut state = self.state.write().await;
141                      state.block_height = height;
142                  }
143                  (height, last_hash)
144              }
145              _ => {
146                  // No stored state - create genesis block
147                  tracing::info!("  Creating genesis block...");
148                  let genesis = BlockProducer::create_genesis_block(&node_address);
149                  let genesis_hash = genesis.hash;
150  
151                  // Store genesis block
152                  storage.put_block(&genesis).map_err(|e| {
153                      NodeError::StartupError(format!("Failed to store genesis block: {}", e))
154                  })?;
155  
156                  tracing::info!(
157                      "  Genesis block created and stored (hash: {})",
158                      hex::encode(&genesis_hash[..8])
159                  );
160  
161                  // Update state to reflect genesis
162                  {
163                      let mut runtime = self.runtime.write().await;
164                      runtime.block_height = 1;
165                  }
166                  {
167                      let mut state = self.state.write().await;
168                      state.block_height = 1;
169                  }
170  
171                  (1, genesis_hash)
172              }
173          };
174  
175          // Check for snapshots and recover account state
176          if let Ok(Some(snapshot)) = storage.load_latest_snapshot() {
177              tracing::info!(
178                  "  Loaded snapshot from height {} ({} accounts)",
179                  snapshot.height,
180                  snapshot.accounts.len()
181              );
182              // TODO: Apply account states to runtime ledger
183          }
184  
185          // Initialize exchange module
186          {
187              let runtime = self.runtime.read().await;
188              tracing::info!("  Exchange initialized with matching engine");
189              tracing::info!("    - Maker fee: {} bps", deltavm_execution::MAKER_FEE_BPS);
190              tracing::info!("    - Taker fee: {} bps", deltavm_execution::TAKER_FEE_BPS);
191              tracing::info!(
192                  "    - Min order size: {} AX",
193                  deltavm_execution::MIN_ORDER_SIZE_AX
194              );
195              tracing::info!(
196                  "    - Max orders per user per pair: {}",
197                  deltavm_execution::MAX_ORDERS_PER_USER_PER_PAIR
198              );
199              tracing::debug!("    - Current block height: {}", runtime.block_height);
200          }
201  
202          // Update state
203          {
204              let mut state = self.state.write().await;
205              state.is_running = true;
206          }
207  
208          // Start IPC server if configured
209          if let Some(ref mut ipc_server) = self.ipc_server {
210              match ipc_server.start().await {
211                  Ok(_) => {
212                      tracing::info!("IPC server started for bridge communication");
213                  }
214                  Err(e) => {
215                      tracing::warn!(
216                          "Failed to start IPC server: {} (bridge communication will be unavailable)",
217                          e
218                      );
219                  }
220              }
221          }
222  
223          // Create shutdown channel
224          let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
225          self.shutdown_tx = Some(shutdown_tx);
226  
227          // Create channels for P2P communication
228          let (block_tx, mut block_rx) = mpsc::channel::<DeltaBlock>(100);
229          let (tx_tx, mut tx_rx) = mpsc::channel::<DeltaTransaction>(1000);
230          let (attestation_tx, mut attestation_rx) = mpsc::channel::<CrossChainAttestation>(100);
231  
232          // Initialize P2P network
233          let p2p = DeltaP2P::new(
234              self.config.p2p.clone(),
235              block_tx.clone(),
236              tx_tx.clone(),
237              attestation_tx.clone(),
238          );
239  
240          // Wire storage into P2P for block sync
241          p2p.set_storage(storage.clone());
242  
243          // Start P2P listener
244          match p2p.start().await {
245              Ok(listen_addr) => {
246                  tracing::info!("P2P network started on {}", listen_addr);
247              }
248              Err(e) => {
249                  tracing::error!("Failed to start P2P network: {}", e);
250                  return Err(NodeError::StartupError(format!(
251                      "P2P startup failed: {}",
252                      e
253                  )));
254              }
255          }
256  
257          // Set up BFT consensus if this is a validator node
258          if self.config.node_type == NodeType::Validator {
259              let node_address = format!("delta_validator_{}", self.config.rest_port);
260              let mut validators = HashMap::new();
261              validators.insert(node_address.clone(), 1); // Self with voting power 1
262              let consensus = BftConsensus::new(node_address.clone(), validators);
263              p2p.set_consensus(consensus);
264              tracing::info!("BFT consensus initialized for validator: {}", node_address);
265          }
266  
267          // Store P2P handler
268          self.p2p = Some(p2p.clone());
269  
270          // Start consensus engine with recovered state
271          let consensus_config = ConsensusConfig::default();
272          let consensus_engine = ConsensusEngine::with_starting_state(
273              consensus_config,
274              shutdown_rx,
275              block_tx,
276              starting_height,
277              starting_hash,
278          );
279          let state_for_consensus = self.state.clone();
280          let runtime_for_consensus = self.runtime.clone();
281          let p2p_for_blocks = p2p.clone();
282          let storage_for_blocks = storage.clone();
283  
284          tokio::spawn(async move {
285              consensus_engine.run(node_address).await;
286          });
287  
288          // Block processor task - handles order matching and persistence during block production
289          tokio::spawn(async move {
290              while let Some(block) = block_rx.recv().await {
291                  let block_height = block.height;
292  
293                  // Persist block to storage
294                  if let Err(e) = storage_for_blocks.put_block(&block) {
295                      tracing::error!("Failed to persist block {}: {}", block_height, e);
296                  } else {
297                      tracing::debug!("Persisted block {} to storage", block_height);
298                  }
299  
300                  // Create snapshot if needed (every N blocks)
301                  if storage_for_blocks.should_snapshot(block_height) {
302                      // TODO: Collect account states from runtime ledger
303                      let accounts = vec![];
304                      if let Err(e) =
305                          storage_for_blocks.create_snapshot(block_height, block.hash, accounts)
306                      {
307                          tracing::warn!(
308                              "Failed to create snapshot at height {}: {}",
309                              block_height,
310                              e
311                          );
312                      } else {
313                          tracing::info!("Created snapshot at height {}", block_height);
314                      }
315                  }
316  
317                  // Update state with new block
318                  let mut state = state_for_consensus.write().await;
319                  state.block_height = block_height;
320  
321                  // Update P2P block height
322                  p2p_for_blocks.set_block_height(block_height);
323  
324                  // Advance runtime (this syncs block height with matching engine)
325                  let mut runtime = runtime_for_consensus.write().await;
326                  runtime.advance_block();
327  
328                  tracing::debug!(
329                      "Applied block {} to state (matching engine synced)",
330                      block_height
331                  );
332              }
333          });
334  
335          // Transaction processor task
336          let runtime_for_txs = self.runtime.clone();
337          tokio::spawn(async move {
338              while let Some(tx) = tx_rx.recv().await {
339                  tracing::trace!("Received transaction: {:?}", hex::encode(&tx.id[..8]));
340                  // In a full implementation, add to mempool and validate
341                  let _ = runtime_for_txs; // Placeholder for future implementation
342              }
343          });
344  
345          // Attestation processor task
346          tokio::spawn(async move {
347              while let Some(attestation) = attestation_rx.recv().await {
348                  tracing::debug!(
349                      "Received attestation from ALPHA height {}",
350                      attestation.alpha_height
351                  );
352                  // Process cross-chain attestation
353              }
354          });
355  
356          // Start REST API - share the node's runtime for exchange operations
357          // This ensures all exchange state (orders, trades) is shared
358          let rest_state = Arc::new(RestStateShared {
359              config: self.config.clone(),
360              node_state: self.state.clone(),
361              runtime: self.runtime.clone(),
362          });
363  
364          let router = rest::build_router_shared(rest_state);
365  
366          let addr = SocketAddr::from(([0, 0, 0, 0], self.config.rest_port));
367          tracing::info!("REST API listening on {}", addr);
368  
369          // Run the REST server
370          let listener = tokio::net::TcpListener::bind(addr)
371              .await
372              .map_err(|e| NodeError::StartupError(format!("Failed to bind REST API: {}", e)))?;
373  
374          axum::serve(listener, router)
375              .await
376              .map_err(|e| NodeError::StartupError(format!("REST API error: {}", e)))?;
377  
378          Ok(())
379      }
380  
381      /// Stop the node
382      pub async fn stop(&mut self) -> Result<(), NodeError> {
383          tracing::info!("Stopping DELTA node...");
384  
385          // Shutdown IPC server
386          if let Some(ref mut ipc_server) = self.ipc_server {
387              ipc_server.stop().await;
388              tracing::info!("IPC server stopped");
389          }
390  
391          // Shutdown P2P network
392          if let Some(ref p2p) = self.p2p {
393              p2p.shutdown().await;
394          }
395  
396          // Send shutdown signal
397          if let Some(tx) = self.shutdown_tx.take() {
398              let _ = tx.send(()).await;
399          }
400  
401          let mut state = self.state.write().await;
402          state.is_running = false;
403  
404          Ok(())
405      }
406  
407      /// Get current block height
408      pub async fn block_height(&self) -> u32 {
409          self.state.read().await.block_height
410      }
411  
412      /// Get runtime statistics
413      pub async fn bridge_stats(&self) -> deltavm_execution::BridgeStats {
414          (*self.runtime.read().await).bridge_stats()
415      }
416  
417      /// Get P2P network handler
418      pub fn p2p(&self) -> Option<&DeltaP2P> {
419          self.p2p.as_ref()
420      }
421  
422      /// Get connected peer count
423      pub fn peer_count(&self) -> usize {
424          self.p2p.as_ref().map(|p| p.peer_count()).unwrap_or(0)
425      }
426  
427      /// Get list of connected peers
428      pub fn connected_peers(&self) -> Vec<SocketAddr> {
429          self.p2p.as_ref().map(|p| p.get_peers()).unwrap_or_default()
430      }
431  
432      /// Broadcast a transaction to the network
433      pub fn broadcast_transaction(&self, tx: DeltaTransaction) -> Result<(), NodeError> {
434          if let Some(ref p2p) = self.p2p {
435              p2p.broadcast_transaction(tx).map_err(|e| {
436                  NodeError::NetworkError(format!("Failed to broadcast transaction: {}", e))
437              })
438          } else {
439              Err(NodeError::NetworkError("P2P not initialized".to_string()))
440          }
441      }
442  
443      /// Get storage statistics
444      pub fn storage_stats(&self) -> Option<StorageStats> {
445          self.storage.as_ref().map(|s| s.stats())
446      }
447  
448      /// Get a block from storage by height
449      pub fn get_block(&self, height: u32) -> Option<DeltaBlock> {
450          self.storage.as_ref().and_then(|s| s.get_block(height).ok())
451      }
452  }