consensus.rs
1 // Copyright (c) 2025 ADnet Contributors 2 // SPDX-License-Identifier: Apache-2.0 3 4 //! Consensus module for DELTA chain 5 //! 6 //! DELTA uses a simplified consensus mechanism since it doesn't need the full 7 //! ZK circuit complexity of ALPHA. It uses a combination of: 8 //! - Proof of Stake for validator selection 9 //! - BFT for finality 10 //! - Cross-chain attestations from ALPHA for bridge operations 11 12 use crate::signing::{ 13 ValidatorIdentity, ValidatorKeyStore, transaction_signing_message, verify_signature, 14 }; 15 use serde::{Deserialize, Serialize}; 16 use std::collections::HashMap; 17 use std::time::{Duration, Instant}; 18 use tokio::sync::mpsc; 19 20 /// Block in the DELTA chain 21 #[derive(Clone, Debug, Serialize, Deserialize)] 22 pub struct DeltaBlock { 23 /// Block height 24 pub height: u32, 25 /// Previous block hash 26 pub previous_hash: [u8; 32], 27 /// Block timestamp (unix seconds) 28 pub timestamp: u64, 29 /// Block producer address 30 pub producer: String, 31 /// Transactions in this block 32 pub transactions: Vec<DeltaTransaction>, 33 /// Cross-chain attestations included 34 pub attestations: Vec<CrossChainAttestation>, 35 /// Block hash (computed) 36 pub hash: [u8; 32], 37 } 38 39 /// Transaction in DELTA chain 40 #[derive(Clone, Debug, Serialize, Deserialize)] 41 pub struct DeltaTransaction { 42 /// Transaction ID (hash of transaction contents) 43 pub id: [u8; 32], 44 /// Transaction type 45 pub tx_type: TransactionType, 46 /// Sender address (for account model txs) 47 pub sender: Option<String>, 48 /// Sender's public key for signature verification 49 pub sender_pubkey: Option<Vec<u8>>, 50 /// Transaction nonce (for replay protection) 51 pub nonce: u64, 52 /// Amount being transferred (if applicable) 53 pub amount: u64, 54 /// Gas/fee limit for this transaction 55 pub fee_limit: u64, 56 /// Raw transaction data 57 pub data: Vec<u8>, 58 /// Transaction signature 59 pub signature: Vec<u8>, 60 } 61 62 impl DeltaTransaction { 63 /// Compute the transaction ID from its contents 64 pub fn compute_id(&self) -> [u8; 32] { 65 let mut hasher = blake3::Hasher::new(); 66 hasher.update(&[self.tx_type.as_byte()]); 67 if let Some(sender) = &self.sender { 68 hasher.update(sender.as_bytes()); 69 } 70 hasher.update(&self.nonce.to_le_bytes()); 71 hasher.update(&self.amount.to_le_bytes()); 72 hasher.update(&self.fee_limit.to_le_bytes()); 73 hasher.update(&self.data); 74 hasher.finalize().into() 75 } 76 77 /// Verify the transaction ID matches the computed hash 78 pub fn verify_id(&self) -> bool { 79 self.id == self.compute_id() 80 } 81 } 82 83 /// Transaction types 84 #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] 85 pub enum TransactionType { 86 /// Basic trading transaction (generic) 87 Trading, 88 /// Governance transaction 89 Governance, 90 /// Bridge transaction (ALPHA <-> DELTA) 91 Bridge, 92 /// Exchange: Place a limit order (F-D01) 93 ExchangeLimitOrder, 94 /// Exchange: Place a market order (F-D02) 95 ExchangeMarketOrder, 96 /// Exchange: Cancel an order (F-D03) 97 ExchangeCancelOrder, 98 /// Exchange: Settle matched trades (generated during block production) 99 ExchangeSettlement, 100 } 101 102 impl TransactionType { 103 /// Convert transaction type to a byte for hashing 104 pub fn as_byte(&self) -> u8 { 105 match self { 106 TransactionType::Trading => 0, 107 TransactionType::Governance => 1, 108 TransactionType::Bridge => 2, 109 TransactionType::ExchangeLimitOrder => 3, 110 TransactionType::ExchangeMarketOrder => 4, 111 TransactionType::ExchangeCancelOrder => 5, 112 TransactionType::ExchangeSettlement => 6, 113 } 114 } 115 116 /// Check if this is an exchange transaction type 117 pub fn is_exchange_tx(&self) -> bool { 118 matches!( 119 self, 120 TransactionType::ExchangeLimitOrder 121 | TransactionType::ExchangeMarketOrder 122 | TransactionType::ExchangeCancelOrder 123 | TransactionType::ExchangeSettlement 124 ) 125 } 126 } 127 128 // ============================================================================ 129 // Transaction Validation 130 // ============================================================================ 131 132 /// Result of transaction validation 133 #[derive(Debug, Clone)] 134 pub enum TxValidationResult { 135 /// Transaction is valid 136 Valid, 137 /// Transaction is invalid with reason 138 Invalid(TxValidationError), 139 } 140 141 /// Transaction validation errors 142 #[derive(Debug, Clone)] 143 pub enum TxValidationError { 144 /// Invalid transaction format 145 InvalidFormat(String), 146 /// Transaction ID does not match computed hash 147 InvalidId, 148 /// Missing required field 149 MissingField(String), 150 /// Invalid signature 151 InvalidSignature, 152 /// Insufficient balance 153 InsufficientBalance { required: u64, available: u64 }, 154 /// Invalid nonce (replay protection) 155 InvalidNonce { expected: u64, got: u64 }, 156 /// Fee limit too low 157 FeeTooLow { minimum: u64, provided: u64 }, 158 /// Transaction too large 159 TooLarge { max_size: usize, actual_size: usize }, 160 /// Sender not found 161 SenderNotFound, 162 // Exchange-specific validation errors (F-D01 to F-D07) 163 /// Order quantity below minimum (100 ALPHA equivalent) 164 ExchangeOrderTooSmall { minimum: u64, provided: u64 }, 165 /// Too many open orders for this user/pair (max 20) 166 ExchangeTooManyOrders { pair: String, count: u32, max: u32 }, 167 /// Invalid trading pair 168 ExchangeInvalidPair(String), 169 /// Invalid order type 170 ExchangeInvalidOrderType(String), 171 /// Order not found (for cancellation) 172 ExchangeOrderNotFound(String), 173 /// User not authorized to cancel this order 174 ExchangeNotOrderOwner, 175 /// Self-trade prevention triggered 176 ExchangeSelfTrade, 177 /// Market order rejected (no liquidity) 178 ExchangeNoLiquidity, 179 } 180 181 impl std::fmt::Display for TxValidationError { 182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 183 match self { 184 TxValidationError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg), 185 TxValidationError::InvalidId => { 186 write!(f, "Transaction ID does not match content hash") 187 } 188 TxValidationError::MissingField(field) => { 189 write!(f, "Missing required field: {}", field) 190 } 191 TxValidationError::InvalidSignature => write!(f, "Invalid signature"), 192 TxValidationError::InsufficientBalance { 193 required, 194 available, 195 } => { 196 write!( 197 f, 198 "Insufficient balance: need {}, have {}", 199 required, available 200 ) 201 } 202 TxValidationError::InvalidNonce { expected, got } => { 203 write!(f, "Invalid nonce: expected {}, got {}", expected, got) 204 } 205 TxValidationError::FeeTooLow { minimum, provided } => { 206 write!(f, "Fee too low: minimum {}, provided {}", minimum, provided) 207 } 208 TxValidationError::TooLarge { 209 max_size, 210 actual_size, 211 } => { 212 write!( 213 f, 214 "Transaction too large: max {}, actual {}", 215 max_size, actual_size 216 ) 217 } 218 TxValidationError::SenderNotFound => write!(f, "Sender account not found"), 219 // Exchange-specific errors 220 TxValidationError::ExchangeOrderTooSmall { minimum, provided } => { 221 write!( 222 f, 223 "Order too small: minimum {} ALPHA, provided {}", 224 minimum, provided 225 ) 226 } 227 TxValidationError::ExchangeTooManyOrders { pair, count, max } => { 228 write!( 229 f, 230 "Too many orders for pair {}: {} orders, max {}", 231 pair, count, max 232 ) 233 } 234 TxValidationError::ExchangeInvalidPair(pair) => { 235 write!(f, "Invalid trading pair: {}", pair) 236 } 237 TxValidationError::ExchangeInvalidOrderType(order_type) => { 238 write!(f, "Invalid order type: {}", order_type) 239 } 240 TxValidationError::ExchangeOrderNotFound(id) => { 241 write!(f, "Order not found: {}", id) 242 } 243 TxValidationError::ExchangeNotOrderOwner => { 244 write!(f, "Not authorized to cancel this order") 245 } 246 TxValidationError::ExchangeSelfTrade => { 247 write!(f, "Self-trade prevention triggered") 248 } 249 TxValidationError::ExchangeNoLiquidity => { 250 write!(f, "Market order rejected: no liquidity available") 251 } 252 } 253 } 254 } 255 256 impl std::error::Error for TxValidationError {} 257 258 /// Account state for transaction validation 259 #[derive(Clone, Debug, Default, Serialize, Deserialize)] 260 pub struct AccountState { 261 /// Account balance 262 pub balance: u64, 263 /// Next expected nonce 264 pub nonce: u64, 265 /// Account public key (for signature verification) 266 pub pubkey: Option<Vec<u8>>, 267 } 268 269 /// Transaction validator 270 pub struct TxValidator { 271 /// Minimum required fee 272 pub min_fee: u64, 273 /// Maximum transaction data size 274 pub max_tx_size: usize, 275 /// Account states (address -> state) 276 pub accounts: HashMap<String, AccountState>, 277 } 278 279 impl TxValidator { 280 /// Create a new transaction validator 281 pub fn new() -> Self { 282 Self { 283 min_fee: 1000, // Minimum 1000 units fee 284 max_tx_size: 65536, // 64KB max transaction size 285 accounts: HashMap::new(), 286 } 287 } 288 289 /// Validate a transaction 290 pub fn validate(&self, tx: &DeltaTransaction) -> TxValidationResult { 291 // 1. Check transaction format 292 if let Err(e) = self.validate_format(tx) { 293 return TxValidationResult::Invalid(e); 294 } 295 296 // 2. Verify transaction ID 297 if !tx.verify_id() { 298 return TxValidationResult::Invalid(TxValidationError::InvalidId); 299 } 300 301 // 3. Verify signature 302 if let Err(e) = self.verify_signature(tx) { 303 return TxValidationResult::Invalid(e); 304 } 305 306 // 4. Check account balance and nonce 307 if let Err(e) = self.validate_account(tx) { 308 return TxValidationResult::Invalid(e); 309 } 310 311 TxValidationResult::Valid 312 } 313 314 /// Validate transaction format 315 fn validate_format(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> { 316 // Check transaction size 317 let tx_size = tx.data.len() + tx.signature.len(); 318 if tx_size > self.max_tx_size { 319 return Err(TxValidationError::TooLarge { 320 max_size: self.max_tx_size, 321 actual_size: tx_size, 322 }); 323 } 324 325 // Check fee limit 326 if tx.fee_limit < self.min_fee { 327 return Err(TxValidationError::FeeTooLow { 328 minimum: self.min_fee, 329 provided: tx.fee_limit, 330 }); 331 } 332 333 // Trading, Governance, and Exchange transactions require a sender 334 match tx.tx_type { 335 TransactionType::Trading | TransactionType::Governance => { 336 if tx.sender.is_none() { 337 return Err(TxValidationError::MissingField("sender".to_string())); 338 } 339 } 340 TransactionType::Bridge => { 341 // Bridge transactions may have special handling 342 } 343 // Exchange transaction types (F-D01 to F-D07) 344 TransactionType::ExchangeLimitOrder 345 | TransactionType::ExchangeMarketOrder 346 | TransactionType::ExchangeCancelOrder => { 347 // All exchange operations require a sender 348 if tx.sender.is_none() { 349 return Err(TxValidationError::MissingField("sender".to_string())); 350 } 351 // Validate exchange-specific format in the data field 352 self.validate_exchange_format(tx)?; 353 } 354 TransactionType::ExchangeSettlement => { 355 // Settlement transactions are generated internally during block production 356 // They don't require a sender as they represent matched trades 357 } 358 } 359 360 Ok(()) 361 } 362 363 /// Validate exchange-specific transaction format (F-D01 to F-D07) 364 fn validate_exchange_format(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> { 365 // Exchange constants from the spec 366 const MIN_ORDER_SIZE_ALPHA: u64 = 100; 367 368 // The transaction amount represents the order quantity 369 match tx.tx_type { 370 TransactionType::ExchangeLimitOrder | TransactionType::ExchangeMarketOrder => { 371 // Check minimum order size 372 if tx.amount < MIN_ORDER_SIZE_ALPHA { 373 return Err(TxValidationError::ExchangeOrderTooSmall { 374 minimum: MIN_ORDER_SIZE_ALPHA, 375 provided: tx.amount, 376 }); 377 } 378 } 379 TransactionType::ExchangeCancelOrder => { 380 // Cancel orders must have order ID in data field 381 if tx.data.len() < 32 { 382 return Err(TxValidationError::MissingField("order_id".to_string())); 383 } 384 } 385 _ => {} 386 } 387 388 Ok(()) 389 } 390 391 /// Verify transaction signature using Ed25519 392 fn verify_signature(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> { 393 // For transactions that require signatures 394 if tx.sender.is_some() { 395 if tx.signature.is_empty() { 396 return Err(TxValidationError::InvalidSignature); 397 } 398 399 // Get the sender's public key 400 let pubkey = if let Some(pk) = &tx.sender_pubkey { 401 pk.clone() 402 } else if let Some(sender) = &tx.sender { 403 // Try to get from account state 404 if let Some(account) = self.accounts.get(sender) { 405 account.pubkey.clone().unwrap_or_default() 406 } else { 407 return Err(TxValidationError::SenderNotFound); 408 } 409 } else { 410 return Err(TxValidationError::MissingField("sender_pubkey".to_string())); 411 }; 412 413 // Public key must be 32 bytes for Ed25519 414 if pubkey.len() != 32 { 415 return Err(TxValidationError::InvalidSignature); 416 } 417 418 // Signature must be 64 bytes for Ed25519 419 if tx.signature.len() != 64 { 420 return Err(TxValidationError::InvalidSignature); 421 } 422 423 // Create the signing message (same format used when signing) 424 let signing_message = transaction_signing_message( 425 tx.tx_type.as_byte(), 426 tx.sender.as_deref(), 427 tx.nonce, 428 tx.amount, 429 tx.fee_limit, 430 &tx.data, 431 ); 432 433 // Verify the Ed25519 signature 434 if !verify_signature(&pubkey, &signing_message, &tx.signature) { 435 return Err(TxValidationError::InvalidSignature); 436 } 437 } 438 439 Ok(()) 440 } 441 442 /// Validate account balance and nonce 443 fn validate_account(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> { 444 let sender = match &tx.sender { 445 Some(s) => s, 446 None => return Ok(()), // No sender means no account validation needed 447 }; 448 449 let account = match self.accounts.get(sender) { 450 Some(a) => a, 451 None => { 452 // For new accounts, check if this is an initial deposit 453 if tx.tx_type == TransactionType::Bridge && tx.amount > 0 { 454 return Ok(()); // Bridge deposits can create accounts 455 } 456 return Err(TxValidationError::SenderNotFound); 457 } 458 }; 459 460 // Check nonce 461 if tx.nonce != account.nonce { 462 return Err(TxValidationError::InvalidNonce { 463 expected: account.nonce, 464 got: tx.nonce, 465 }); 466 } 467 468 // Check balance (amount + fee) 469 let required = tx.amount.saturating_add(tx.fee_limit); 470 if account.balance < required { 471 return Err(TxValidationError::InsufficientBalance { 472 required, 473 available: account.balance, 474 }); 475 } 476 477 Ok(()) 478 } 479 480 /// Update account state after transaction execution 481 pub fn apply_transaction(&mut self, tx: &DeltaTransaction) { 482 if let Some(sender) = &tx.sender 483 && let Some(account) = self.accounts.get_mut(sender) 484 { 485 // Deduct amount and fee 486 account.balance = account.balance.saturating_sub(tx.amount + tx.fee_limit); 487 // Increment nonce 488 account.nonce += 1; 489 } 490 } 491 492 /// Add or update an account 493 pub fn set_account(&mut self, address: String, state: AccountState) { 494 self.accounts.insert(address, state); 495 } 496 497 /// Get account state 498 pub fn get_account(&self, address: &str) -> Option<&AccountState> { 499 self.accounts.get(address) 500 } 501 } 502 503 impl Default for TxValidator { 504 fn default() -> Self { 505 Self::new() 506 } 507 } 508 509 /// Cross-chain attestation from ALPHA 510 #[derive(Clone, Debug, Serialize, Deserialize)] 511 pub struct CrossChainAttestation { 512 /// ALPHA block height 513 pub alpha_height: u64, 514 /// State root hash 515 pub state_root: [u8; 32], 516 /// Validator signatures 517 pub signatures: Vec<Vec<u8>>, 518 } 519 520 // ============================================================================ 521 // BFT Consensus Implementation 522 // ============================================================================ 523 524 /// BFT consensus round phase 525 #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] 526 pub enum BftPhase { 527 /// Waiting for block proposal from leader 528 #[default] 529 Propose, 530 /// Pre-vote phase: validators vote on proposed block 531 PreVote, 532 /// Pre-commit phase: validators commit to voted block 533 PreCommit, 534 /// Commit phase: block is finalized 535 Commit, 536 } 537 538 /// A vote in the BFT consensus 539 #[derive(Clone, Debug, Serialize, Deserialize)] 540 pub struct BftVote { 541 /// Height of the block being voted on 542 pub height: u32, 543 /// Round number (increments on view change) 544 pub round: u32, 545 /// Block hash being voted on (None for nil vote) 546 pub block_hash: Option<[u8; 32]>, 547 /// Validator address 548 pub validator: String, 549 /// Signature over (height, round, block_hash) 550 pub signature: Vec<u8>, 551 /// Vote type 552 pub vote_type: VoteType, 553 } 554 555 /// Type of BFT vote 556 #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] 557 pub enum VoteType { 558 PreVote, 559 PreCommit, 560 } 561 562 /// View change request when leader fails 563 #[derive(Clone, Debug, Serialize, Deserialize)] 564 pub struct ViewChangeRequest { 565 /// Current height 566 pub height: u32, 567 /// New round number 568 pub new_round: u32, 569 /// Requesting validator 570 pub validator: String, 571 /// Reason for view change 572 pub reason: ViewChangeReason, 573 /// Signature 574 pub signature: Vec<u8>, 575 } 576 577 /// Reason for requesting a view change 578 #[derive(Clone, Debug, Serialize, Deserialize)] 579 pub enum ViewChangeReason { 580 /// Leader timeout - no proposal received 581 LeaderTimeout, 582 /// Invalid block proposed 583 InvalidBlock, 584 /// Leader is byzantine (conflicting proposals) 585 ByzantineLeader, 586 } 587 588 /// BFT round state 589 #[derive(Clone, Debug)] 590 pub struct BftRoundState { 591 /// Current height 592 pub height: u32, 593 /// Current round (0-indexed, increments on view change) 594 pub round: u32, 595 /// Current phase 596 pub phase: BftPhase, 597 /// Proposed block for this round 598 pub proposed_block: Option<DeltaBlock>, 599 /// Pre-votes received (validator -> vote) 600 pub pre_votes: HashMap<String, BftVote>, 601 /// Pre-commits received (validator -> vote) 602 pub pre_commits: HashMap<String, BftVote>, 603 /// View change requests received 604 pub view_changes: HashMap<String, ViewChangeRequest>, 605 /// Round start time 606 pub round_start: Instant, 607 /// Locked block (if any) from previous round 608 pub locked_block: Option<DeltaBlock>, 609 /// Locked round 610 pub locked_round: Option<u32>, 611 } 612 613 impl BftRoundState { 614 /// Create a new round state 615 pub fn new(height: u32, round: u32) -> Self { 616 Self { 617 height, 618 round, 619 phase: BftPhase::Propose, 620 proposed_block: None, 621 pre_votes: HashMap::new(), 622 pre_commits: HashMap::new(), 623 view_changes: HashMap::new(), 624 round_start: Instant::now(), 625 locked_block: None, 626 locked_round: None, 627 } 628 } 629 630 /// Reset for a new round (view change) 631 pub fn new_round(&mut self) { 632 self.round += 1; 633 self.phase = BftPhase::Propose; 634 self.proposed_block = None; 635 self.pre_votes.clear(); 636 self.pre_commits.clear(); 637 self.view_changes.clear(); 638 self.round_start = Instant::now(); 639 // Note: locked_block and locked_round are preserved across rounds 640 } 641 642 /// Reset for a new height (after commit) 643 pub fn new_height(&mut self) { 644 self.height += 1; 645 self.round = 0; 646 self.phase = BftPhase::Propose; 647 self.proposed_block = None; 648 self.pre_votes.clear(); 649 self.pre_commits.clear(); 650 self.view_changes.clear(); 651 self.round_start = Instant::now(); 652 self.locked_block = None; 653 self.locked_round = None; 654 } 655 } 656 657 /// BFT consensus manager 658 pub struct BftConsensus { 659 /// Current round state 660 pub state: BftRoundState, 661 /// Known validators and their voting power 662 pub validators: HashMap<String, u64>, 663 /// Total voting power 664 pub total_power: u64, 665 /// This node's validator address 666 pub node_address: String, 667 /// This node's validator identity (for signing) 668 pub identity: Option<ValidatorIdentity>, 669 /// Key store for verifying other validators' signatures 670 pub key_store: ValidatorKeyStore, 671 /// Timeout for propose phase 672 pub propose_timeout: Duration, 673 /// Timeout for pre-vote phase 674 pub prevote_timeout: Duration, 675 /// Timeout for pre-commit phase 676 pub precommit_timeout: Duration, 677 } 678 679 impl BftConsensus { 680 /// Create a new BFT consensus manager (without signing capability) 681 pub fn new(node_address: String, validators: HashMap<String, u64>) -> Self { 682 let total_power: u64 = validators.values().sum(); 683 Self { 684 state: BftRoundState::new(0, 0), 685 validators, 686 total_power, 687 node_address, 688 identity: None, 689 key_store: ValidatorKeyStore::new(), 690 propose_timeout: Duration::from_secs(3), 691 prevote_timeout: Duration::from_secs(2), 692 precommit_timeout: Duration::from_secs(2), 693 } 694 } 695 696 /// Create a new BFT consensus manager with validator identity for signing 697 pub fn with_identity(identity: ValidatorIdentity, validators: HashMap<String, u64>) -> Self { 698 let total_power: u64 = validators.values().sum(); 699 let node_address = identity.address().to_string(); 700 Self { 701 state: BftRoundState::new(0, 0), 702 validators, 703 total_power, 704 node_address, 705 identity: Some(identity), 706 key_store: ValidatorKeyStore::new(), 707 propose_timeout: Duration::from_secs(3), 708 prevote_timeout: Duration::from_secs(2), 709 precommit_timeout: Duration::from_secs(2), 710 } 711 } 712 713 /// Register a validator's public key for signature verification 714 pub fn register_validator_key(&mut self, address: String, public_key: [u8; 32]) { 715 self.key_store.register(address, public_key); 716 } 717 718 /// Get the leader for the current round 719 pub fn get_leader(&self) -> Option<String> { 720 if self.validators.is_empty() { 721 return None; 722 } 723 let mut validators: Vec<_> = self.validators.keys().cloned().collect(); 724 validators.sort(); 725 let leader_idx = 726 ((self.state.height as usize) + (self.state.round as usize)) % validators.len(); 727 Some(validators[leader_idx].clone()) 728 } 729 730 /// Check if this node is the current leader 731 pub fn is_leader(&self) -> bool { 732 self.get_leader() 733 .map(|l| l == self.node_address) 734 .unwrap_or(false) 735 } 736 737 /// Calculate required voting power for quorum (2f+1 where n=3f+1) 738 pub fn quorum_power(&self) -> u64 { 739 // For BFT: need > 2/3 of total power 740 (self.total_power * 2 / 3) + 1 741 } 742 743 /// Handle a block proposal 744 pub fn handle_proposal(&mut self, block: DeltaBlock, proposer: &str) -> Result<(), BftError> { 745 // Verify proposer is the current leader 746 let expected_leader = self.get_leader().ok_or(BftError::NoValidators)?; 747 if proposer != expected_leader { 748 return Err(BftError::InvalidProposer { 749 expected: expected_leader, 750 got: proposer.to_string(), 751 }); 752 } 753 754 // Verify we're in propose phase 755 if self.state.phase != BftPhase::Propose { 756 return Err(BftError::WrongPhase { 757 expected: BftPhase::Propose, 758 got: self.state.phase, 759 }); 760 } 761 762 // Verify block height matches 763 if block.height != self.state.height + 1 { 764 return Err(BftError::InvalidBlockHeight { 765 expected: self.state.height + 1, 766 got: block.height, 767 }); 768 } 769 770 // Check if we're locked on a different block 771 if let (Some(locked_block), Some(locked_round)) = 772 (&self.state.locked_block, self.state.locked_round) 773 && locked_round < self.state.round 774 && locked_block.hash != block.hash 775 { 776 // We're locked on a different block, reject proposal 777 // unless we see 2f+1 pre-votes for this new block 778 tracing::warn!( 779 "Received proposal for different block while locked. \ 780 Locked on {:?} at round {}", 781 hex::encode(&locked_block.hash[..8]), 782 locked_round 783 ); 784 } 785 786 self.state.proposed_block = Some(block); 787 self.state.phase = BftPhase::PreVote; 788 tracing::info!( 789 "Proposal accepted for height {} round {}", 790 self.state.height + 1, 791 self.state.round 792 ); 793 794 Ok(()) 795 } 796 797 /// Create a pre-vote for the current proposal 798 pub fn create_prevote(&self, vote_for_block: bool) -> Option<BftVote> { 799 if self.state.phase != BftPhase::PreVote { 800 return None; 801 } 802 803 let block_hash = if vote_for_block { 804 self.state.proposed_block.as_ref().map(|b| b.hash) 805 } else { 806 None // nil vote 807 }; 808 809 Some(BftVote { 810 height: self.state.height + 1, 811 round: self.state.round, 812 block_hash, 813 validator: self.node_address.clone(), 814 signature: self.sign_vote( 815 self.state.height + 1, 816 self.state.round, 817 block_hash, 818 VoteType::PreVote, 819 ), 820 vote_type: VoteType::PreVote, 821 }) 822 } 823 824 /// Handle a pre-vote 825 pub fn handle_prevote(&mut self, vote: BftVote) -> Result<bool, BftError> { 826 // Verify vote is for current height and round 827 if vote.height != self.state.height + 1 || vote.round != self.state.round { 828 return Err(BftError::VoteMismatch); 829 } 830 831 // Verify voter is a valid validator 832 if !self.validators.contains_key(&vote.validator) { 833 return Err(BftError::UnknownValidator(vote.validator)); 834 } 835 836 // Store the vote 837 self.state.pre_votes.insert(vote.validator.clone(), vote); 838 839 // Check if we have quorum for any block 840 Ok(self.check_prevote_quorum()) 841 } 842 843 /// Check if we have pre-vote quorum 844 fn check_prevote_quorum(&mut self) -> bool { 845 let proposed_hash = match &self.state.proposed_block { 846 Some(b) => Some(b.hash), 847 None => return false, 848 }; 849 850 let mut power_for_block = 0u64; 851 let mut power_for_nil = 0u64; 852 853 for (validator, vote) in &self.state.pre_votes { 854 let power = self.validators.get(validator).copied().unwrap_or(0); 855 if vote.block_hash == proposed_hash { 856 power_for_block += power; 857 } else if vote.block_hash.is_none() { 858 power_for_nil += power; 859 } 860 } 861 862 let quorum = self.quorum_power(); 863 864 if power_for_block >= quorum { 865 tracing::info!( 866 "Pre-vote quorum reached for block at height {} round {}", 867 self.state.height + 1, 868 self.state.round 869 ); 870 self.state.phase = BftPhase::PreCommit; 871 // Lock on the block 872 self.state.locked_block = self.state.proposed_block.clone(); 873 self.state.locked_round = Some(self.state.round); 874 return true; 875 } 876 877 if power_for_nil >= quorum { 878 tracing::info!( 879 "Pre-vote quorum for NIL at height {} round {}, triggering view change", 880 self.state.height + 1, 881 self.state.round 882 ); 883 // Don't advance phase, will trigger timeout and view change 884 } 885 886 false 887 } 888 889 /// Create a pre-commit for the locked block 890 pub fn create_precommit(&self) -> Option<BftVote> { 891 if self.state.phase != BftPhase::PreCommit { 892 return None; 893 } 894 895 let block_hash = self.state.locked_block.as_ref().map(|b| b.hash); 896 897 Some(BftVote { 898 height: self.state.height + 1, 899 round: self.state.round, 900 block_hash, 901 validator: self.node_address.clone(), 902 signature: self.sign_vote( 903 self.state.height + 1, 904 self.state.round, 905 block_hash, 906 VoteType::PreCommit, 907 ), 908 vote_type: VoteType::PreCommit, 909 }) 910 } 911 912 /// Handle a pre-commit 913 pub fn handle_precommit(&mut self, vote: BftVote) -> Result<Option<DeltaBlock>, BftError> { 914 // Verify vote is for current height and round 915 if vote.height != self.state.height + 1 || vote.round != self.state.round { 916 return Err(BftError::VoteMismatch); 917 } 918 919 // Verify voter is a valid validator 920 if !self.validators.contains_key(&vote.validator) { 921 return Err(BftError::UnknownValidator(vote.validator)); 922 } 923 924 // Store the vote 925 self.state.pre_commits.insert(vote.validator.clone(), vote); 926 927 // Check if we have quorum 928 self.check_precommit_quorum() 929 } 930 931 /// Check if we have pre-commit quorum 932 fn check_precommit_quorum(&mut self) -> Result<Option<DeltaBlock>, BftError> { 933 let locked_hash = match &self.state.locked_block { 934 Some(b) => b.hash, 935 None => return Ok(None), 936 }; 937 938 let mut power_for_block = 0u64; 939 940 for (validator, vote) in &self.state.pre_commits { 941 let power = self.validators.get(validator).copied().unwrap_or(0); 942 if vote.block_hash == Some(locked_hash) { 943 power_for_block += power; 944 } 945 } 946 947 if power_for_block >= self.quorum_power() { 948 tracing::info!( 949 "Pre-commit quorum reached! Committing block at height {} round {}", 950 self.state.height + 1, 951 self.state.round 952 ); 953 self.state.phase = BftPhase::Commit; 954 let block = self.state.locked_block.clone(); 955 956 // Advance to next height 957 self.state.new_height(); 958 959 return Ok(block); 960 } 961 962 Ok(None) 963 } 964 965 /// Handle a view change request 966 pub fn handle_view_change(&mut self, request: ViewChangeRequest) -> Result<bool, BftError> { 967 // Verify request is for current height 968 if request.height != self.state.height + 1 { 969 return Err(BftError::VoteMismatch); 970 } 971 972 // Verify requester is a valid validator 973 if !self.validators.contains_key(&request.validator) { 974 return Err(BftError::UnknownValidator(request.validator)); 975 } 976 977 // Store the request 978 self.state 979 .view_changes 980 .insert(request.validator.clone(), request); 981 982 // Check if we have enough view change requests 983 self.check_view_change_quorum() 984 } 985 986 /// Check if we have view change quorum 987 fn check_view_change_quorum(&mut self) -> Result<bool, BftError> { 988 // Need f+1 validators to request view change (where n=3f+1) 989 // This is weaker than quorum to prevent liveness issues 990 let threshold = (self.total_power / 3) + 1; 991 992 let mut power = 0u64; 993 let mut max_round = self.state.round; 994 995 for (validator, request) in &self.state.view_changes { 996 let vp = self.validators.get(validator).copied().unwrap_or(0); 997 power += vp; 998 if request.new_round > max_round { 999 max_round = request.new_round; 1000 } 1001 } 1002 1003 if power >= threshold { 1004 tracing::warn!( 1005 "View change triggered at height {} from round {} to round {}", 1006 self.state.height + 1, 1007 self.state.round, 1008 max_round 1009 ); 1010 // Advance to new round 1011 while self.state.round < max_round { 1012 self.state.new_round(); 1013 } 1014 return Ok(true); 1015 } 1016 1017 Ok(false) 1018 } 1019 1020 /// Check for timeouts and trigger view change if needed 1021 pub fn check_timeouts(&mut self) -> Option<ViewChangeRequest> { 1022 let elapsed = self.state.round_start.elapsed(); 1023 1024 let should_timeout = match self.state.phase { 1025 BftPhase::Propose => elapsed > self.propose_timeout, 1026 BftPhase::PreVote => elapsed > self.propose_timeout + self.prevote_timeout, 1027 BftPhase::PreCommit => { 1028 elapsed > self.propose_timeout + self.prevote_timeout + self.precommit_timeout 1029 } 1030 BftPhase::Commit => false, // No timeout in commit phase 1031 }; 1032 1033 if should_timeout { 1034 tracing::warn!( 1035 "Timeout in {:?} phase at height {} round {}", 1036 self.state.phase, 1037 self.state.height + 1, 1038 self.state.round 1039 ); 1040 1041 let reason = match self.state.phase { 1042 BftPhase::Propose => ViewChangeReason::LeaderTimeout, 1043 _ => ViewChangeReason::LeaderTimeout, 1044 }; 1045 1046 let height = self.state.height + 1; 1047 let new_round = self.state.round + 1; 1048 let signature = self.sign_view_change(height, new_round, &reason); 1049 1050 return Some(ViewChangeRequest { 1051 height, 1052 new_round, 1053 validator: self.node_address.clone(), 1054 reason, 1055 signature, 1056 }); 1057 } 1058 1059 None 1060 } 1061 1062 /// Sign a vote using Ed25519 1063 fn sign_vote( 1064 &self, 1065 height: u32, 1066 round: u32, 1067 block_hash: Option<[u8; 32]>, 1068 vote_type: VoteType, 1069 ) -> Vec<u8> { 1070 if let Some(identity) = &self.identity { 1071 // Use proper Ed25519 signing 1072 let vote_type_byte = match vote_type { 1073 VoteType::PreVote => 0u8, 1074 VoteType::PreCommit => 1u8, 1075 }; 1076 identity 1077 .sign_vote(height, round, block_hash, vote_type_byte) 1078 .to_vec() 1079 } else { 1080 // Fallback to blake3 hash if no identity (for backwards compatibility) 1081 let mut hasher = blake3::Hasher::new(); 1082 hasher.update(&height.to_le_bytes()); 1083 hasher.update(&round.to_le_bytes()); 1084 if let Some(hash) = block_hash { 1085 hasher.update(&hash); 1086 } 1087 hasher.update(self.node_address.as_bytes()); 1088 hasher.finalize().as_bytes().to_vec() 1089 } 1090 } 1091 1092 /// Verify a vote signature using Ed25519 1093 #[allow(dead_code)] 1094 fn verify_vote_signature(&self, vote: &BftVote) -> bool { 1095 let vote_type_byte = match vote.vote_type { 1096 VoteType::PreVote => 0u8, 1097 VoteType::PreCommit => 1u8, 1098 }; 1099 self.key_store.verify_vote( 1100 &vote.validator, 1101 vote.height, 1102 vote.round, 1103 vote.block_hash, 1104 vote_type_byte, 1105 &vote.signature, 1106 ) 1107 } 1108 1109 /// Sign a view change request using Ed25519 1110 fn sign_view_change(&self, height: u32, new_round: u32, reason: &ViewChangeReason) -> Vec<u8> { 1111 let reason_byte = match reason { 1112 ViewChangeReason::LeaderTimeout => 0u8, 1113 ViewChangeReason::InvalidBlock => 1u8, 1114 ViewChangeReason::ByzantineLeader => 2u8, 1115 }; 1116 1117 if let Some(identity) = &self.identity { 1118 identity 1119 .sign_view_change(height, new_round, reason_byte) 1120 .to_vec() 1121 } else { 1122 // Fallback to empty signature if no identity 1123 vec![] 1124 } 1125 } 1126 1127 /// Verify a view change signature using Ed25519 1128 #[allow(dead_code)] 1129 fn verify_view_change_signature(&self, request: &ViewChangeRequest) -> bool { 1130 let reason_byte = match &request.reason { 1131 ViewChangeReason::LeaderTimeout => 0u8, 1132 ViewChangeReason::InvalidBlock => 1u8, 1133 ViewChangeReason::ByzantineLeader => 2u8, 1134 }; 1135 self.key_store.verify_view_change( 1136 &request.validator, 1137 request.height, 1138 request.new_round, 1139 reason_byte, 1140 &request.signature, 1141 ) 1142 } 1143 } 1144 1145 /// BFT consensus errors 1146 #[derive(Debug, Clone)] 1147 pub enum BftError { 1148 /// No validators configured 1149 NoValidators, 1150 /// Invalid block proposer 1151 InvalidProposer { expected: String, got: String }, 1152 /// Wrong consensus phase 1153 WrongPhase { expected: BftPhase, got: BftPhase }, 1154 /// Invalid block height 1155 InvalidBlockHeight { expected: u32, got: u32 }, 1156 /// Vote height/round mismatch 1157 VoteMismatch, 1158 /// Unknown validator 1159 UnknownValidator(String), 1160 } 1161 1162 impl std::fmt::Display for BftError { 1163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1164 match self { 1165 BftError::NoValidators => write!(f, "No validators configured"), 1166 BftError::InvalidProposer { expected, got } => { 1167 write!(f, "Invalid proposer: expected {}, got {}", expected, got) 1168 } 1169 BftError::WrongPhase { expected, got } => { 1170 write!(f, "Wrong phase: expected {:?}, got {:?}", expected, got) 1171 } 1172 BftError::InvalidBlockHeight { expected, got } => { 1173 write!( 1174 f, 1175 "Invalid block height: expected {}, got {}", 1176 expected, got 1177 ) 1178 } 1179 BftError::VoteMismatch => write!(f, "Vote height/round mismatch"), 1180 BftError::UnknownValidator(v) => write!(f, "Unknown validator: {}", v), 1181 } 1182 } 1183 } 1184 1185 impl std::error::Error for BftError {} 1186 1187 /// Consensus configuration 1188 #[derive(Clone, Debug)] 1189 pub struct ConsensusConfig { 1190 /// Target block time 1191 pub block_time: Duration, 1192 /// Maximum transactions per block 1193 pub max_txs_per_block: usize, 1194 /// Minimum validators for consensus 1195 pub min_validators: usize, 1196 } 1197 1198 impl Default for ConsensusConfig { 1199 fn default() -> Self { 1200 Self { 1201 block_time: Duration::from_secs(2), // 2 second blocks 1202 max_txs_per_block: 1000, 1203 min_validators: 1, 1204 } 1205 } 1206 } 1207 1208 /// Block producer for DELTA chain 1209 pub struct BlockProducer { 1210 config: ConsensusConfig, 1211 current_height: u32, 1212 pending_txs: Vec<DeltaTransaction>, 1213 pending_attestations: Vec<CrossChainAttestation>, 1214 last_block_time: Instant, 1215 /// Hash of the last produced block (genesis has all zeros) 1216 last_block_hash: [u8; 32], 1217 /// Transaction validator 1218 validator: TxValidator, 1219 /// Pending exchange order transactions for matching 1220 pending_exchange_txs: Vec<DeltaTransaction>, 1221 /// Exchange settlement transactions generated from order matching 1222 settlement_txs: Vec<DeltaTransaction>, 1223 } 1224 1225 impl BlockProducer { 1226 /// Create a new block producer 1227 pub fn new(config: ConsensusConfig) -> Self { 1228 Self { 1229 config, 1230 current_height: 0, 1231 pending_txs: Vec::new(), 1232 pending_attestations: Vec::new(), 1233 last_block_time: Instant::now(), 1234 last_block_hash: [0; 32], // Genesis block has zero hash 1235 validator: TxValidator::new(), 1236 pending_exchange_txs: Vec::new(), 1237 settlement_txs: Vec::new(), 1238 } 1239 } 1240 1241 /// Create a block producer initialized from a stored state (for recovery) 1242 pub fn with_starting_state(config: ConsensusConfig, height: u32, last_hash: [u8; 32]) -> Self { 1243 Self { 1244 config, 1245 current_height: height, 1246 pending_txs: Vec::new(), 1247 pending_attestations: Vec::new(), 1248 last_block_time: Instant::now(), 1249 last_block_hash: last_hash, 1250 validator: TxValidator::new(), 1251 pending_exchange_txs: Vec::new(), 1252 settlement_txs: Vec::new(), 1253 } 1254 } 1255 1256 /// Create the genesis block for the DELTA chain 1257 /// Returns (block, genesis_hash) for storage and P2P initialization 1258 pub fn create_genesis_block(producer_addr: &str) -> DeltaBlock { 1259 // Genesis block has height 1 and zero previous hash 1260 let height = 1u32; 1261 let previous_hash = [0u8; 32]; 1262 let timestamp = std::time::SystemTime::now() 1263 .duration_since(std::time::UNIX_EPOCH) 1264 .unwrap_or_default() 1265 .as_secs(); 1266 1267 // Compute genesis block hash 1268 let mut hasher = blake3::Hasher::new(); 1269 hasher.update(&previous_hash); 1270 hasher.update(&height.to_le_bytes()); 1271 hasher.update(×tamp.to_le_bytes()); 1272 hasher.update(producer_addr.as_bytes()); 1273 // Genesis has no transactions 1274 let hash: [u8; 32] = hasher.finalize().into(); 1275 1276 tracing::info!( 1277 "Created genesis block with hash: {}", 1278 hex::encode(&hash[..8]) 1279 ); 1280 1281 DeltaBlock { 1282 height, 1283 previous_hash, 1284 timestamp, 1285 producer: producer_addr.to_string(), 1286 transactions: Vec::new(), 1287 attestations: Vec::new(), 1288 hash, 1289 } 1290 } 1291 1292 /// Get current block height 1293 pub fn current_height(&self) -> u32 { 1294 self.current_height 1295 } 1296 1297 /// Get last block hash 1298 pub fn last_block_hash(&self) -> [u8; 32] { 1299 self.last_block_hash 1300 } 1301 1302 /// Add a transaction to the pending pool with validation 1303 pub fn add_transaction(&mut self, tx: DeltaTransaction) -> Result<(), TxValidationError> { 1304 // Check pool capacity 1305 if self.pending_txs.len() + self.pending_exchange_txs.len() >= self.config.max_txs_per_block 1306 { 1307 return Err(TxValidationError::InvalidFormat( 1308 "Transaction pool is full".to_string(), 1309 )); 1310 } 1311 1312 // Validate the transaction 1313 match self.validator.validate(&tx) { 1314 TxValidationResult::Valid => { 1315 tracing::debug!( 1316 "Transaction {:?} validated successfully", 1317 hex::encode(&tx.id[..8]) 1318 ); 1319 1320 // Route exchange transactions to separate pool for order matching 1321 if tx.tx_type.is_exchange_tx() { 1322 tracing::debug!( 1323 "Adding exchange transaction {:?} to matching pool", 1324 hex::encode(&tx.id[..8]) 1325 ); 1326 self.pending_exchange_txs.push(tx); 1327 } else { 1328 self.pending_txs.push(tx); 1329 } 1330 Ok(()) 1331 } 1332 TxValidationResult::Invalid(e) => { 1333 tracing::warn!( 1334 "Transaction {:?} validation failed: {}", 1335 hex::encode(&tx.id[..8]), 1336 e 1337 ); 1338 Err(e) 1339 } 1340 } 1341 } 1342 1343 /// Add a transaction without validation (for internal use or pre-validated txs) 1344 pub fn add_transaction_unchecked(&mut self, tx: DeltaTransaction) { 1345 if self.pending_txs.len() < self.config.max_txs_per_block { 1346 self.pending_txs.push(tx); 1347 } 1348 } 1349 1350 /// Get a reference to the transaction validator 1351 pub fn validator(&self) -> &TxValidator { 1352 &self.validator 1353 } 1354 1355 /// Get a mutable reference to the transaction validator 1356 pub fn validator_mut(&mut self) -> &mut TxValidator { 1357 &mut self.validator 1358 } 1359 1360 /// Add an attestation from ALPHA chain 1361 pub fn add_attestation(&mut self, attestation: CrossChainAttestation) { 1362 self.pending_attestations.push(attestation); 1363 } 1364 1365 /// Check if it's time to produce a block 1366 pub fn should_produce_block(&self) -> bool { 1367 self.last_block_time.elapsed() >= self.config.block_time 1368 } 1369 1370 /// Produce a new block 1371 pub fn produce_block(&mut self, producer: &str) -> DeltaBlock { 1372 let timestamp = std::time::SystemTime::now() 1373 .duration_since(std::time::UNIX_EPOCH) 1374 .unwrap_or_default() 1375 .as_secs(); 1376 1377 // Take regular transactions 1378 let mut transactions = std::mem::take(&mut self.pending_txs); 1379 1380 // Process exchange transactions and generate settlements 1381 // This is where order matching happens 1382 let exchange_txs = std::mem::take(&mut self.pending_exchange_txs); 1383 let settlements = self.process_exchange_transactions(&exchange_txs, timestamp); 1384 1385 // Add exchange transactions and their settlements to block 1386 transactions.extend(exchange_txs); 1387 transactions.extend(settlements); 1388 1389 // Clear settlement buffer 1390 self.settlement_txs.clear(); 1391 1392 let attestations = std::mem::take(&mut self.pending_attestations); 1393 1394 // Store previous hash before computing new one 1395 let previous_hash = self.last_block_hash; 1396 1397 // Compute block hash including previous hash for chain integrity 1398 let mut hasher = blake3::Hasher::new(); 1399 hasher.update(&previous_hash); 1400 hasher.update(&(self.current_height + 1).to_le_bytes()); 1401 hasher.update(×tamp.to_le_bytes()); 1402 hasher.update(producer.as_bytes()); 1403 // Include transaction hashes in block hash 1404 for tx in &transactions { 1405 hasher.update(&tx.id); 1406 } 1407 let hash: [u8; 32] = hasher.finalize().into(); 1408 1409 // Update state for next block 1410 self.current_height += 1; 1411 self.last_block_time = Instant::now(); 1412 self.last_block_hash = hash; 1413 1414 // Log exchange activity 1415 let exchange_count = transactions 1416 .iter() 1417 .filter(|tx| tx.tx_type.is_exchange_tx()) 1418 .count(); 1419 let settlement_count = transactions 1420 .iter() 1421 .filter(|tx| tx.tx_type == TransactionType::ExchangeSettlement) 1422 .count(); 1423 1424 if exchange_count > 0 || settlement_count > 0 { 1425 tracing::info!( 1426 "Block {} includes {} exchange txs and {} settlements", 1427 self.current_height, 1428 exchange_count - settlement_count, 1429 settlement_count 1430 ); 1431 } 1432 1433 DeltaBlock { 1434 height: self.current_height, 1435 previous_hash, 1436 timestamp, 1437 producer: producer.to_string(), 1438 transactions, 1439 attestations, 1440 hash, 1441 } 1442 } 1443 1444 /// Process exchange transactions and generate settlement transactions 1445 /// This is where the order matching logic runs during block production 1446 fn process_exchange_transactions( 1447 &mut self, 1448 exchange_txs: &[DeltaTransaction], 1449 timestamp: u64, 1450 ) -> Vec<DeltaTransaction> { 1451 let mut settlements = Vec::new(); 1452 1453 // Exchange fee parameters from spec (will be used when matching is implemented) 1454 const _MAKER_FEE_BPS: u64 = 25; // 0.025% 1455 const _TAKER_FEE_BPS: u64 = 75; // 0.075% 1456 const _FEE_DENOMINATOR: u64 = 100_000; // Basis points denominator 1457 1458 // Process each exchange transaction 1459 for tx in exchange_txs { 1460 match tx.tx_type { 1461 TransactionType::ExchangeLimitOrder | TransactionType::ExchangeMarketOrder => { 1462 // Order matching would happen here 1463 // In a full implementation, this would: 1464 // 1. Parse order details from tx.data 1465 // 2. Check against existing order book 1466 // 3. Match orders and generate trades 1467 // 4. Create settlement transactions for matched trades 1468 1469 // For now, we generate a placeholder settlement if the order amount 1470 // indicates it should match (simplified logic) 1471 if tx.amount > 0 && self.should_simulate_match(tx) { 1472 let settlement = self.create_settlement_tx(tx, timestamp); 1473 settlements.push(settlement); 1474 } 1475 } 1476 TransactionType::ExchangeCancelOrder => { 1477 // Cancel order - would remove from order book 1478 // No settlement needed for cancellations 1479 tracing::debug!( 1480 "Processing order cancellation: {:?}", 1481 hex::encode(&tx.id[..8]) 1482 ); 1483 } 1484 _ => {} 1485 } 1486 } 1487 1488 settlements 1489 } 1490 1491 /// Determine if an order should result in a simulated match 1492 /// In production, this would be replaced by actual order book matching 1493 fn should_simulate_match(&self, _tx: &DeltaTransaction) -> bool { 1494 // For demonstration, match 50% of orders 1495 // In production, this would check the order book for matching orders 1496 self.current_height.is_multiple_of(2) 1497 } 1498 1499 /// Create a settlement transaction for a matched trade 1500 fn create_settlement_tx( 1501 &self, 1502 original_tx: &DeltaTransaction, 1503 timestamp: u64, 1504 ) -> DeltaTransaction { 1505 // Create settlement data 1506 let mut data = Vec::new(); 1507 // Include original order ID 1508 data.extend_from_slice(&original_tx.id); 1509 // Include matched quantity (simplified: full fill) 1510 data.extend_from_slice(&original_tx.amount.to_le_bytes()); 1511 // Include timestamp 1512 data.extend_from_slice(×tamp.to_le_bytes()); 1513 1514 // Compute settlement transaction ID 1515 let mut hasher = blake3::Hasher::new(); 1516 hasher.update(&[TransactionType::ExchangeSettlement.as_byte()]); 1517 hasher.update(&original_tx.id); 1518 hasher.update(×tamp.to_le_bytes()); 1519 let settlement_id: [u8; 32] = hasher.finalize().into(); 1520 1521 DeltaTransaction { 1522 id: settlement_id, 1523 tx_type: TransactionType::ExchangeSettlement, 1524 sender: original_tx.sender.clone(), 1525 sender_pubkey: original_tx.sender_pubkey.clone(), 1526 nonce: original_tx.nonce, 1527 amount: original_tx.amount, 1528 fee_limit: 0, // Settlements don't pay additional fees 1529 data, 1530 signature: vec![], // Internal transaction, no signature needed 1531 } 1532 } 1533 1534 /// Get pending exchange transaction count 1535 pub fn pending_exchange_tx_count(&self) -> usize { 1536 self.pending_exchange_txs.len() 1537 } 1538 1539 /// Get pending transaction count 1540 pub fn pending_tx_count(&self) -> usize { 1541 self.pending_txs.len() 1542 } 1543 } 1544 1545 /// Consensus engine running in background 1546 pub struct ConsensusEngine { 1547 producer: BlockProducer, 1548 shutdown_rx: mpsc::Receiver<()>, 1549 block_tx: mpsc::Sender<DeltaBlock>, 1550 } 1551 1552 impl ConsensusEngine { 1553 /// Create a new consensus engine (starts from height 0) 1554 pub fn new( 1555 config: ConsensusConfig, 1556 shutdown_rx: mpsc::Receiver<()>, 1557 block_tx: mpsc::Sender<DeltaBlock>, 1558 ) -> Self { 1559 Self { 1560 producer: BlockProducer::new(config), 1561 shutdown_rx, 1562 block_tx, 1563 } 1564 } 1565 1566 /// Create a consensus engine initialized from a stored state (for recovery) 1567 pub fn with_starting_state( 1568 config: ConsensusConfig, 1569 shutdown_rx: mpsc::Receiver<()>, 1570 block_tx: mpsc::Sender<DeltaBlock>, 1571 height: u32, 1572 last_hash: [u8; 32], 1573 ) -> Self { 1574 tracing::info!( 1575 "Consensus engine resuming from height {} (last hash: {})", 1576 height, 1577 hex::encode(&last_hash[..8]) 1578 ); 1579 Self { 1580 producer: BlockProducer::with_starting_state(config, height, last_hash), 1581 shutdown_rx, 1582 block_tx, 1583 } 1584 } 1585 1586 /// Run the consensus engine 1587 pub async fn run(mut self, node_address: String) { 1588 tracing::info!("Consensus engine started"); 1589 1590 loop { 1591 tokio::select! { 1592 _ = self.shutdown_rx.recv() => { 1593 tracing::info!("Consensus engine shutting down"); 1594 break; 1595 } 1596 _ = tokio::time::sleep(Duration::from_millis(100)) => { 1597 if self.producer.should_produce_block() { 1598 let block = self.producer.produce_block(&node_address); 1599 tracing::info!( 1600 "Produced block {} with {} txs", 1601 block.height, 1602 block.transactions.len() 1603 ); 1604 1605 if let Err(e) = self.block_tx.send(block).await { 1606 tracing::error!("Failed to send block: {}", e); 1607 } 1608 } 1609 } 1610 } 1611 } 1612 } 1613 1614 /// Get block producer reference 1615 pub fn producer(&self) -> &BlockProducer { 1616 &self.producer 1617 } 1618 1619 /// Get mutable block producer reference 1620 pub fn producer_mut(&mut self) -> &mut BlockProducer { 1621 &mut self.producer 1622 } 1623 }