lib.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // DeltaOS node implementation 3 // SPDX-License-Identifier: Apache-2.0 4 5 //! DeltaOS Node Implementation 6 //! 7 //! This module provides the core node functionality for the DELTA chain, 8 //! including consensus, networking, REST API, and state management. 9 //! 10 //! Architecture: 11 //! - Uses deltavm for execution and state management 12 //! - REST API for external interactions (including exchange endpoints) 13 //! - Simplified consensus (PoS + BFT) for block production 14 //! - Hybrid storage: account model for trading, record model for governance 15 //! - Integrated exchange module for spot trading (F-D01 to F-D07) 16 //! - P2P networking for peer communication and message propagation 17 18 pub mod config; 19 pub mod consensus; 20 pub mod error; 21 pub mod ipc_server; 22 pub mod metrics; 23 pub mod p2p; 24 pub mod rest; 25 pub mod signing; 26 pub mod state; 27 pub mod storage; 28 29 pub use config::*; 30 pub use consensus::*; 31 pub use error::*; 32 pub use ipc_server::{ 33 DeltaIpcServer, IpcServerConfig as DeltaIpcServerConfig, IpcServerState as DeltaIpcServerState, 34 }; 35 pub use metrics::{Metrics, MetricsServer}; 36 pub use p2p::*; 37 pub use rest::*; 38 pub use signing::*; 39 pub use state::*; 40 pub use storage::*; 41 42 use deltavm_execution::DeltaRuntime; 43 use std::collections::HashMap; 44 use std::net::SocketAddr; 45 use std::sync::Arc; 46 use tokio::sync::{RwLock, mpsc}; 47 48 // Re-export exchange types for convenience 49 pub use deltavm_execution::{ 50 ExternalChain, MatchingEngine, Order, OrderId, OrderSide, OrderStatus, OrderType, TradingPair, 51 TradingResult, TradingTx, 52 }; 53 54 /// The DELTA chain node 55 pub struct DeltaNode { 56 /// Node configuration 57 pub config: NodeConfig, 58 /// The execution runtime 59 pub runtime: Arc<RwLock<DeltaRuntime>>, 60 /// Node state 61 pub state: Arc<RwLock<NodeState>>, 62 /// Persistent storage (RocksDB) 63 pub storage: Option<Arc<Storage>>, 64 /// P2P network handler 65 pub p2p: Option<DeltaP2P>, 66 /// IPC server for bridge communication 67 pub ipc_server: Option<DeltaIpcServer>, 68 /// Shutdown signal sender 69 shutdown_tx: Option<mpsc::Sender<()>>, 70 } 71 72 impl DeltaNode { 73 /// Create a new DELTA node 74 pub fn new(config: NodeConfig) -> Self { 75 Self { 76 config, 77 runtime: Arc::new(RwLock::new(DeltaRuntime::new())), 78 state: Arc::new(RwLock::new(NodeState::default())), 79 storage: None, 80 p2p: None, 81 ipc_server: None, 82 shutdown_tx: None, 83 } 84 } 85 86 /// Create a new DELTA node with IPC server enabled 87 pub fn new_with_ipc(config: NodeConfig, ipc_config: DeltaIpcServerConfig) -> Self { 88 let runtime = Arc::new(RwLock::new(DeltaRuntime::new())); 89 let ipc_server = DeltaIpcServer::new(ipc_config, Some(runtime.clone())); 90 91 Self { 92 config, 93 runtime, 94 state: Arc::new(RwLock::new(NodeState::default())), 95 storage: None, 96 p2p: None, 97 ipc_server: Some(ipc_server), 98 shutdown_tx: None, 99 } 100 } 101 102 /// Start the node 103 pub async fn start(&mut self) -> Result<(), NodeError> { 104 tracing::info!("Starting DELTA node..."); 105 tracing::info!(" Node type: {:?}", self.config.node_type); 106 tracing::info!(" Data dir: {:?}", self.config.data_dir); 107 tracing::info!(" REST port: {}", self.config.rest_port); 108 tracing::info!(" P2P port: {}", self.config.p2p_port); 109 110 // Initialize persistent storage 111 let storage_path = self.config.data_dir.join("db"); 112 std::fs::create_dir_all(&storage_path).map_err(|e| { 113 NodeError::StartupError(format!("Failed to create storage directory: {}", e)) 114 })?; 115 116 let storage = Storage::open(&storage_path) 117 .map_err(|e| NodeError::StartupError(format!("Failed to open storage: {}", e)))?; 118 let storage = Arc::new(storage); 119 self.storage = Some(storage.clone()); 120 121 // Recover state from storage or create genesis block 122 let node_address = format!("delta_validator_{}", self.config.rest_port); 123 let (starting_height, starting_hash) = match storage.get_chain_height() { 124 Ok(height) if height > 0 => { 125 tracing::info!(" Recovered chain height: {}", height); 126 // Get the last block hash for consensus continuity 127 let last_hash = storage 128 .get_last_block_hash() 129 .map_err(|e| { 130 NodeError::StartupError(format!("Failed to get last block hash: {}", e)) 131 })? 132 .unwrap_or([0u8; 32]); 133 134 // Update runtime block height to match stored state 135 { 136 let mut runtime = self.runtime.write().await; 137 runtime.block_height = height; 138 } 139 { 140 let mut state = self.state.write().await; 141 state.block_height = height; 142 } 143 (height, last_hash) 144 } 145 _ => { 146 // No stored state - create genesis block 147 tracing::info!(" Creating genesis block..."); 148 let genesis = BlockProducer::create_genesis_block(&node_address); 149 let genesis_hash = genesis.hash; 150 151 // Store genesis block 152 storage.put_block(&genesis).map_err(|e| { 153 NodeError::StartupError(format!("Failed to store genesis block: {}", e)) 154 })?; 155 156 tracing::info!( 157 " Genesis block created and stored (hash: {})", 158 hex::encode(&genesis_hash[..8]) 159 ); 160 161 // Update state to reflect genesis 162 { 163 let mut runtime = self.runtime.write().await; 164 runtime.block_height = 1; 165 } 166 { 167 let mut state = self.state.write().await; 168 state.block_height = 1; 169 } 170 171 (1, genesis_hash) 172 } 173 }; 174 175 // Check for snapshots and recover account state 176 if let Ok(Some(snapshot)) = storage.load_latest_snapshot() { 177 tracing::info!( 178 " Loaded snapshot from height {} ({} accounts)", 179 snapshot.height, 180 snapshot.accounts.len() 181 ); 182 // TODO: Apply account states to runtime ledger 183 } 184 185 // Initialize exchange module 186 { 187 let runtime = self.runtime.read().await; 188 tracing::info!(" Exchange initialized with matching engine"); 189 tracing::info!(" - Maker fee: {} bps", deltavm_execution::MAKER_FEE_BPS); 190 tracing::info!(" - Taker fee: {} bps", deltavm_execution::TAKER_FEE_BPS); 191 tracing::info!( 192 " - Min order size: {} AX", 193 deltavm_execution::MIN_ORDER_SIZE_AX 194 ); 195 tracing::info!( 196 " - Max orders per user per pair: {}", 197 deltavm_execution::MAX_ORDERS_PER_USER_PER_PAIR 198 ); 199 tracing::debug!(" - Current block height: {}", runtime.block_height); 200 } 201 202 // Update state 203 { 204 let mut state = self.state.write().await; 205 state.is_running = true; 206 } 207 208 // Start IPC server if configured 209 if let Some(ref mut ipc_server) = self.ipc_server { 210 match ipc_server.start().await { 211 Ok(_) => { 212 tracing::info!("IPC server started for bridge communication"); 213 } 214 Err(e) => { 215 tracing::warn!( 216 "Failed to start IPC server: {} (bridge communication will be unavailable)", 217 e 218 ); 219 } 220 } 221 } 222 223 // Create shutdown channel 224 let (shutdown_tx, shutdown_rx) = mpsc::channel(1); 225 self.shutdown_tx = Some(shutdown_tx); 226 227 // Create channels for P2P communication 228 let (block_tx, mut block_rx) = mpsc::channel::<DeltaBlock>(100); 229 let (tx_tx, mut tx_rx) = mpsc::channel::<DeltaTransaction>(1000); 230 let (attestation_tx, mut attestation_rx) = mpsc::channel::<CrossChainAttestation>(100); 231 232 // Initialize P2P network 233 let p2p = DeltaP2P::new( 234 self.config.p2p.clone(), 235 block_tx.clone(), 236 tx_tx.clone(), 237 attestation_tx.clone(), 238 ); 239 240 // Wire storage into P2P for block sync 241 p2p.set_storage(storage.clone()); 242 243 // Start P2P listener 244 match p2p.start().await { 245 Ok(listen_addr) => { 246 tracing::info!("P2P network started on {}", listen_addr); 247 } 248 Err(e) => { 249 tracing::error!("Failed to start P2P network: {}", e); 250 return Err(NodeError::StartupError(format!( 251 "P2P startup failed: {}", 252 e 253 ))); 254 } 255 } 256 257 // Set up BFT consensus if this is a validator node 258 if self.config.node_type == NodeType::Validator { 259 let node_address = format!("delta_validator_{}", self.config.rest_port); 260 let mut validators = HashMap::new(); 261 validators.insert(node_address.clone(), 1); // Self with voting power 1 262 let consensus = BftConsensus::new(node_address.clone(), validators); 263 p2p.set_consensus(consensus); 264 tracing::info!("BFT consensus initialized for validator: {}", node_address); 265 } 266 267 // Store P2P handler 268 self.p2p = Some(p2p.clone()); 269 270 // Start consensus engine with recovered state 271 let consensus_config = ConsensusConfig::default(); 272 let consensus_engine = ConsensusEngine::with_starting_state( 273 consensus_config, 274 shutdown_rx, 275 block_tx, 276 starting_height, 277 starting_hash, 278 ); 279 let state_for_consensus = self.state.clone(); 280 let runtime_for_consensus = self.runtime.clone(); 281 let p2p_for_blocks = p2p.clone(); 282 let storage_for_blocks = storage.clone(); 283 284 tokio::spawn(async move { 285 consensus_engine.run(node_address).await; 286 }); 287 288 // Block processor task - handles order matching and persistence during block production 289 tokio::spawn(async move { 290 while let Some(block) = block_rx.recv().await { 291 let block_height = block.height; 292 293 // Persist block to storage 294 if let Err(e) = storage_for_blocks.put_block(&block) { 295 tracing::error!("Failed to persist block {}: {}", block_height, e); 296 } else { 297 tracing::debug!("Persisted block {} to storage", block_height); 298 } 299 300 // Create snapshot if needed (every N blocks) 301 if storage_for_blocks.should_snapshot(block_height) { 302 // TODO: Collect account states from runtime ledger 303 let accounts = vec![]; 304 if let Err(e) = 305 storage_for_blocks.create_snapshot(block_height, block.hash, accounts) 306 { 307 tracing::warn!( 308 "Failed to create snapshot at height {}: {}", 309 block_height, 310 e 311 ); 312 } else { 313 tracing::info!("Created snapshot at height {}", block_height); 314 } 315 } 316 317 // Update state with new block 318 let mut state = state_for_consensus.write().await; 319 state.block_height = block_height; 320 321 // Update P2P block height 322 p2p_for_blocks.set_block_height(block_height); 323 324 // Advance runtime (this syncs block height with matching engine) 325 let mut runtime = runtime_for_consensus.write().await; 326 runtime.advance_block(); 327 328 tracing::debug!( 329 "Applied block {} to state (matching engine synced)", 330 block_height 331 ); 332 } 333 }); 334 335 // Transaction processor task 336 let runtime_for_txs = self.runtime.clone(); 337 tokio::spawn(async move { 338 while let Some(tx) = tx_rx.recv().await { 339 tracing::trace!("Received transaction: {:?}", hex::encode(&tx.id[..8])); 340 // In a full implementation, add to mempool and validate 341 let _ = runtime_for_txs; // Placeholder for future implementation 342 } 343 }); 344 345 // Attestation processor task 346 tokio::spawn(async move { 347 while let Some(attestation) = attestation_rx.recv().await { 348 tracing::debug!( 349 "Received attestation from ALPHA height {}", 350 attestation.alpha_height 351 ); 352 // Process cross-chain attestation 353 } 354 }); 355 356 // Start REST API - share the node's runtime for exchange operations 357 // This ensures all exchange state (orders, trades) is shared 358 let rest_state = Arc::new(RestStateShared { 359 config: self.config.clone(), 360 node_state: self.state.clone(), 361 runtime: self.runtime.clone(), 362 }); 363 364 let router = rest::build_router_shared(rest_state); 365 366 let addr = SocketAddr::from(([0, 0, 0, 0], self.config.rest_port)); 367 tracing::info!("REST API listening on {}", addr); 368 369 // Run the REST server 370 let listener = tokio::net::TcpListener::bind(addr) 371 .await 372 .map_err(|e| NodeError::StartupError(format!("Failed to bind REST API: {}", e)))?; 373 374 axum::serve(listener, router) 375 .await 376 .map_err(|e| NodeError::StartupError(format!("REST API error: {}", e)))?; 377 378 Ok(()) 379 } 380 381 /// Stop the node 382 pub async fn stop(&mut self) -> Result<(), NodeError> { 383 tracing::info!("Stopping DELTA node..."); 384 385 // Shutdown IPC server 386 if let Some(ref mut ipc_server) = self.ipc_server { 387 ipc_server.stop().await; 388 tracing::info!("IPC server stopped"); 389 } 390 391 // Shutdown P2P network 392 if let Some(ref p2p) = self.p2p { 393 p2p.shutdown().await; 394 } 395 396 // Send shutdown signal 397 if let Some(tx) = self.shutdown_tx.take() { 398 let _ = tx.send(()).await; 399 } 400 401 let mut state = self.state.write().await; 402 state.is_running = false; 403 404 Ok(()) 405 } 406 407 /// Get current block height 408 pub async fn block_height(&self) -> u32 { 409 self.state.read().await.block_height 410 } 411 412 /// Get runtime statistics 413 pub async fn bridge_stats(&self) -> deltavm_execution::BridgeStats { 414 (*self.runtime.read().await).bridge_stats() 415 } 416 417 /// Get P2P network handler 418 pub fn p2p(&self) -> Option<&DeltaP2P> { 419 self.p2p.as_ref() 420 } 421 422 /// Get connected peer count 423 pub fn peer_count(&self) -> usize { 424 self.p2p.as_ref().map(|p| p.peer_count()).unwrap_or(0) 425 } 426 427 /// Get list of connected peers 428 pub fn connected_peers(&self) -> Vec<SocketAddr> { 429 self.p2p.as_ref().map(|p| p.get_peers()).unwrap_or_default() 430 } 431 432 /// Broadcast a transaction to the network 433 pub fn broadcast_transaction(&self, tx: DeltaTransaction) -> Result<(), NodeError> { 434 if let Some(ref p2p) = self.p2p { 435 p2p.broadcast_transaction(tx).map_err(|e| { 436 NodeError::NetworkError(format!("Failed to broadcast transaction: {}", e)) 437 }) 438 } else { 439 Err(NodeError::NetworkError("P2P not initialized".to_string())) 440 } 441 } 442 443 /// Get storage statistics 444 pub fn storage_stats(&self) -> Option<StorageStats> { 445 self.storage.as_ref().map(|s| s.stats()) 446 } 447 448 /// Get a block from storage by height 449 pub fn get_block(&self, height: u32) -> Option<DeltaBlock> { 450 self.storage.as_ref().and_then(|s| s.get_block(height).ok()) 451 } 452 }