/ node / src / consensus.rs
consensus.rs
   1  // Copyright (c) 2025 ADnet Contributors
   2  // SPDX-License-Identifier: Apache-2.0
   3  
   4  //! Consensus module for DELTA chain
   5  //!
   6  //! DELTA uses a simplified consensus mechanism since it doesn't need the full
   7  //! ZK circuit complexity of ALPHA. It uses a combination of:
   8  //! - Proof of Stake for validator selection
   9  //! - BFT for finality
  10  //! - Cross-chain attestations from ALPHA for bridge operations
  11  
  12  use crate::signing::{
  13      ValidatorIdentity, ValidatorKeyStore, transaction_signing_message, verify_signature,
  14  };
  15  use serde::{Deserialize, Serialize};
  16  use std::collections::HashMap;
  17  use std::time::{Duration, Instant};
  18  use tokio::sync::mpsc;
  19  
  20  /// Block in the DELTA chain
  21  #[derive(Clone, Debug, Serialize, Deserialize)]
  22  pub struct DeltaBlock {
  23      /// Block height
  24      pub height: u32,
  25      /// Previous block hash
  26      pub previous_hash: [u8; 32],
  27      /// Block timestamp (unix seconds)
  28      pub timestamp: u64,
  29      /// Block producer address
  30      pub producer: String,
  31      /// Transactions in this block
  32      pub transactions: Vec<DeltaTransaction>,
  33      /// Cross-chain attestations included
  34      pub attestations: Vec<CrossChainAttestation>,
  35      /// Block hash (computed)
  36      pub hash: [u8; 32],
  37  }
  38  
  39  /// Transaction in DELTA chain
  40  #[derive(Clone, Debug, Serialize, Deserialize)]
  41  pub struct DeltaTransaction {
  42      /// Transaction ID (hash of transaction contents)
  43      pub id: [u8; 32],
  44      /// Transaction type
  45      pub tx_type: TransactionType,
  46      /// Sender address (for account model txs)
  47      pub sender: Option<String>,
  48      /// Sender's public key for signature verification
  49      pub sender_pubkey: Option<Vec<u8>>,
  50      /// Transaction nonce (for replay protection)
  51      pub nonce: u64,
  52      /// Amount being transferred (if applicable)
  53      pub amount: u64,
  54      /// Gas/fee limit for this transaction
  55      pub fee_limit: u64,
  56      /// Raw transaction data
  57      pub data: Vec<u8>,
  58      /// Transaction signature
  59      pub signature: Vec<u8>,
  60  }
  61  
  62  impl DeltaTransaction {
  63      /// Compute the transaction ID from its contents
  64      pub fn compute_id(&self) -> [u8; 32] {
  65          let mut hasher = blake3::Hasher::new();
  66          hasher.update(&[self.tx_type.as_byte()]);
  67          if let Some(sender) = &self.sender {
  68              hasher.update(sender.as_bytes());
  69          }
  70          hasher.update(&self.nonce.to_le_bytes());
  71          hasher.update(&self.amount.to_le_bytes());
  72          hasher.update(&self.fee_limit.to_le_bytes());
  73          hasher.update(&self.data);
  74          hasher.finalize().into()
  75      }
  76  
  77      /// Verify the transaction ID matches the computed hash
  78      pub fn verify_id(&self) -> bool {
  79          self.id == self.compute_id()
  80      }
  81  }
  82  
  83  /// Transaction types
  84  #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
  85  pub enum TransactionType {
  86      /// Basic trading transaction (generic)
  87      Trading,
  88      /// Governance transaction
  89      Governance,
  90      /// Bridge transaction (ALPHA <-> DELTA)
  91      Bridge,
  92      /// Exchange: Place a limit order (F-D01)
  93      ExchangeLimitOrder,
  94      /// Exchange: Place a market order (F-D02)
  95      ExchangeMarketOrder,
  96      /// Exchange: Cancel an order (F-D03)
  97      ExchangeCancelOrder,
  98      /// Exchange: Settle matched trades (generated during block production)
  99      ExchangeSettlement,
 100  }
 101  
 102  impl TransactionType {
 103      /// Convert transaction type to a byte for hashing
 104      pub fn as_byte(&self) -> u8 {
 105          match self {
 106              TransactionType::Trading => 0,
 107              TransactionType::Governance => 1,
 108              TransactionType::Bridge => 2,
 109              TransactionType::ExchangeLimitOrder => 3,
 110              TransactionType::ExchangeMarketOrder => 4,
 111              TransactionType::ExchangeCancelOrder => 5,
 112              TransactionType::ExchangeSettlement => 6,
 113          }
 114      }
 115  
 116      /// Check if this is an exchange transaction type
 117      pub fn is_exchange_tx(&self) -> bool {
 118          matches!(
 119              self,
 120              TransactionType::ExchangeLimitOrder
 121                  | TransactionType::ExchangeMarketOrder
 122                  | TransactionType::ExchangeCancelOrder
 123                  | TransactionType::ExchangeSettlement
 124          )
 125      }
 126  }
 127  
 128  // ============================================================================
 129  // Transaction Validation
 130  // ============================================================================
 131  
 132  /// Result of transaction validation
 133  #[derive(Debug, Clone)]
 134  pub enum TxValidationResult {
 135      /// Transaction is valid
 136      Valid,
 137      /// Transaction is invalid with reason
 138      Invalid(TxValidationError),
 139  }
 140  
 141  /// Transaction validation errors
 142  #[derive(Debug, Clone)]
 143  pub enum TxValidationError {
 144      /// Invalid transaction format
 145      InvalidFormat(String),
 146      /// Transaction ID does not match computed hash
 147      InvalidId,
 148      /// Missing required field
 149      MissingField(String),
 150      /// Invalid signature
 151      InvalidSignature,
 152      /// Insufficient balance
 153      InsufficientBalance { required: u64, available: u64 },
 154      /// Invalid nonce (replay protection)
 155      InvalidNonce { expected: u64, got: u64 },
 156      /// Fee limit too low
 157      FeeTooLow { minimum: u64, provided: u64 },
 158      /// Transaction too large
 159      TooLarge { max_size: usize, actual_size: usize },
 160      /// Sender not found
 161      SenderNotFound,
 162      // Exchange-specific validation errors (F-D01 to F-D07)
 163      /// Order quantity below minimum (100 ALPHA equivalent)
 164      ExchangeOrderTooSmall { minimum: u64, provided: u64 },
 165      /// Too many open orders for this user/pair (max 20)
 166      ExchangeTooManyOrders { pair: String, count: u32, max: u32 },
 167      /// Invalid trading pair
 168      ExchangeInvalidPair(String),
 169      /// Invalid order type
 170      ExchangeInvalidOrderType(String),
 171      /// Order not found (for cancellation)
 172      ExchangeOrderNotFound(String),
 173      /// User not authorized to cancel this order
 174      ExchangeNotOrderOwner,
 175      /// Self-trade prevention triggered
 176      ExchangeSelfTrade,
 177      /// Market order rejected (no liquidity)
 178      ExchangeNoLiquidity,
 179  }
 180  
 181  impl std::fmt::Display for TxValidationError {
 182      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 183          match self {
 184              TxValidationError::InvalidFormat(msg) => write!(f, "Invalid format: {}", msg),
 185              TxValidationError::InvalidId => {
 186                  write!(f, "Transaction ID does not match content hash")
 187              }
 188              TxValidationError::MissingField(field) => {
 189                  write!(f, "Missing required field: {}", field)
 190              }
 191              TxValidationError::InvalidSignature => write!(f, "Invalid signature"),
 192              TxValidationError::InsufficientBalance {
 193                  required,
 194                  available,
 195              } => {
 196                  write!(
 197                      f,
 198                      "Insufficient balance: need {}, have {}",
 199                      required, available
 200                  )
 201              }
 202              TxValidationError::InvalidNonce { expected, got } => {
 203                  write!(f, "Invalid nonce: expected {}, got {}", expected, got)
 204              }
 205              TxValidationError::FeeTooLow { minimum, provided } => {
 206                  write!(f, "Fee too low: minimum {}, provided {}", minimum, provided)
 207              }
 208              TxValidationError::TooLarge {
 209                  max_size,
 210                  actual_size,
 211              } => {
 212                  write!(
 213                      f,
 214                      "Transaction too large: max {}, actual {}",
 215                      max_size, actual_size
 216                  )
 217              }
 218              TxValidationError::SenderNotFound => write!(f, "Sender account not found"),
 219              // Exchange-specific errors
 220              TxValidationError::ExchangeOrderTooSmall { minimum, provided } => {
 221                  write!(
 222                      f,
 223                      "Order too small: minimum {} ALPHA, provided {}",
 224                      minimum, provided
 225                  )
 226              }
 227              TxValidationError::ExchangeTooManyOrders { pair, count, max } => {
 228                  write!(
 229                      f,
 230                      "Too many orders for pair {}: {} orders, max {}",
 231                      pair, count, max
 232                  )
 233              }
 234              TxValidationError::ExchangeInvalidPair(pair) => {
 235                  write!(f, "Invalid trading pair: {}", pair)
 236              }
 237              TxValidationError::ExchangeInvalidOrderType(order_type) => {
 238                  write!(f, "Invalid order type: {}", order_type)
 239              }
 240              TxValidationError::ExchangeOrderNotFound(id) => {
 241                  write!(f, "Order not found: {}", id)
 242              }
 243              TxValidationError::ExchangeNotOrderOwner => {
 244                  write!(f, "Not authorized to cancel this order")
 245              }
 246              TxValidationError::ExchangeSelfTrade => {
 247                  write!(f, "Self-trade prevention triggered")
 248              }
 249              TxValidationError::ExchangeNoLiquidity => {
 250                  write!(f, "Market order rejected: no liquidity available")
 251              }
 252          }
 253      }
 254  }
 255  
 256  impl std::error::Error for TxValidationError {}
 257  
 258  /// Account state for transaction validation
 259  #[derive(Clone, Debug, Default, Serialize, Deserialize)]
 260  pub struct AccountState {
 261      /// Account balance
 262      pub balance: u64,
 263      /// Next expected nonce
 264      pub nonce: u64,
 265      /// Account public key (for signature verification)
 266      pub pubkey: Option<Vec<u8>>,
 267  }
 268  
 269  /// Transaction validator
 270  pub struct TxValidator {
 271      /// Minimum required fee
 272      pub min_fee: u64,
 273      /// Maximum transaction data size
 274      pub max_tx_size: usize,
 275      /// Account states (address -> state)
 276      pub accounts: HashMap<String, AccountState>,
 277  }
 278  
 279  impl TxValidator {
 280      /// Create a new transaction validator
 281      pub fn new() -> Self {
 282          Self {
 283              min_fee: 1000,      // Minimum 1000 units fee
 284              max_tx_size: 65536, // 64KB max transaction size
 285              accounts: HashMap::new(),
 286          }
 287      }
 288  
 289      /// Validate a transaction
 290      pub fn validate(&self, tx: &DeltaTransaction) -> TxValidationResult {
 291          // 1. Check transaction format
 292          if let Err(e) = self.validate_format(tx) {
 293              return TxValidationResult::Invalid(e);
 294          }
 295  
 296          // 2. Verify transaction ID
 297          if !tx.verify_id() {
 298              return TxValidationResult::Invalid(TxValidationError::InvalidId);
 299          }
 300  
 301          // 3. Verify signature
 302          if let Err(e) = self.verify_signature(tx) {
 303              return TxValidationResult::Invalid(e);
 304          }
 305  
 306          // 4. Check account balance and nonce
 307          if let Err(e) = self.validate_account(tx) {
 308              return TxValidationResult::Invalid(e);
 309          }
 310  
 311          TxValidationResult::Valid
 312      }
 313  
 314      /// Validate transaction format
 315      fn validate_format(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> {
 316          // Check transaction size
 317          let tx_size = tx.data.len() + tx.signature.len();
 318          if tx_size > self.max_tx_size {
 319              return Err(TxValidationError::TooLarge {
 320                  max_size: self.max_tx_size,
 321                  actual_size: tx_size,
 322              });
 323          }
 324  
 325          // Check fee limit
 326          if tx.fee_limit < self.min_fee {
 327              return Err(TxValidationError::FeeTooLow {
 328                  minimum: self.min_fee,
 329                  provided: tx.fee_limit,
 330              });
 331          }
 332  
 333          // Trading, Governance, and Exchange transactions require a sender
 334          match tx.tx_type {
 335              TransactionType::Trading | TransactionType::Governance => {
 336                  if tx.sender.is_none() {
 337                      return Err(TxValidationError::MissingField("sender".to_string()));
 338                  }
 339              }
 340              TransactionType::Bridge => {
 341                  // Bridge transactions may have special handling
 342              }
 343              // Exchange transaction types (F-D01 to F-D07)
 344              TransactionType::ExchangeLimitOrder
 345              | TransactionType::ExchangeMarketOrder
 346              | TransactionType::ExchangeCancelOrder => {
 347                  // All exchange operations require a sender
 348                  if tx.sender.is_none() {
 349                      return Err(TxValidationError::MissingField("sender".to_string()));
 350                  }
 351                  // Validate exchange-specific format in the data field
 352                  self.validate_exchange_format(tx)?;
 353              }
 354              TransactionType::ExchangeSettlement => {
 355                  // Settlement transactions are generated internally during block production
 356                  // They don't require a sender as they represent matched trades
 357              }
 358          }
 359  
 360          Ok(())
 361      }
 362  
 363      /// Validate exchange-specific transaction format (F-D01 to F-D07)
 364      fn validate_exchange_format(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> {
 365          // Exchange constants from the spec
 366          const MIN_ORDER_SIZE_ALPHA: u64 = 100;
 367  
 368          // The transaction amount represents the order quantity
 369          match tx.tx_type {
 370              TransactionType::ExchangeLimitOrder | TransactionType::ExchangeMarketOrder => {
 371                  // Check minimum order size
 372                  if tx.amount < MIN_ORDER_SIZE_ALPHA {
 373                      return Err(TxValidationError::ExchangeOrderTooSmall {
 374                          minimum: MIN_ORDER_SIZE_ALPHA,
 375                          provided: tx.amount,
 376                      });
 377                  }
 378              }
 379              TransactionType::ExchangeCancelOrder => {
 380                  // Cancel orders must have order ID in data field
 381                  if tx.data.len() < 32 {
 382                      return Err(TxValidationError::MissingField("order_id".to_string()));
 383                  }
 384              }
 385              _ => {}
 386          }
 387  
 388          Ok(())
 389      }
 390  
 391      /// Verify transaction signature using Ed25519
 392      fn verify_signature(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> {
 393          // For transactions that require signatures
 394          if tx.sender.is_some() {
 395              if tx.signature.is_empty() {
 396                  return Err(TxValidationError::InvalidSignature);
 397              }
 398  
 399              // Get the sender's public key
 400              let pubkey = if let Some(pk) = &tx.sender_pubkey {
 401                  pk.clone()
 402              } else if let Some(sender) = &tx.sender {
 403                  // Try to get from account state
 404                  if let Some(account) = self.accounts.get(sender) {
 405                      account.pubkey.clone().unwrap_or_default()
 406                  } else {
 407                      return Err(TxValidationError::SenderNotFound);
 408                  }
 409              } else {
 410                  return Err(TxValidationError::MissingField("sender_pubkey".to_string()));
 411              };
 412  
 413              // Public key must be 32 bytes for Ed25519
 414              if pubkey.len() != 32 {
 415                  return Err(TxValidationError::InvalidSignature);
 416              }
 417  
 418              // Signature must be 64 bytes for Ed25519
 419              if tx.signature.len() != 64 {
 420                  return Err(TxValidationError::InvalidSignature);
 421              }
 422  
 423              // Create the signing message (same format used when signing)
 424              let signing_message = transaction_signing_message(
 425                  tx.tx_type.as_byte(),
 426                  tx.sender.as_deref(),
 427                  tx.nonce,
 428                  tx.amount,
 429                  tx.fee_limit,
 430                  &tx.data,
 431              );
 432  
 433              // Verify the Ed25519 signature
 434              if !verify_signature(&pubkey, &signing_message, &tx.signature) {
 435                  return Err(TxValidationError::InvalidSignature);
 436              }
 437          }
 438  
 439          Ok(())
 440      }
 441  
 442      /// Validate account balance and nonce
 443      fn validate_account(&self, tx: &DeltaTransaction) -> Result<(), TxValidationError> {
 444          let sender = match &tx.sender {
 445              Some(s) => s,
 446              None => return Ok(()), // No sender means no account validation needed
 447          };
 448  
 449          let account = match self.accounts.get(sender) {
 450              Some(a) => a,
 451              None => {
 452                  // For new accounts, check if this is an initial deposit
 453                  if tx.tx_type == TransactionType::Bridge && tx.amount > 0 {
 454                      return Ok(()); // Bridge deposits can create accounts
 455                  }
 456                  return Err(TxValidationError::SenderNotFound);
 457              }
 458          };
 459  
 460          // Check nonce
 461          if tx.nonce != account.nonce {
 462              return Err(TxValidationError::InvalidNonce {
 463                  expected: account.nonce,
 464                  got: tx.nonce,
 465              });
 466          }
 467  
 468          // Check balance (amount + fee)
 469          let required = tx.amount.saturating_add(tx.fee_limit);
 470          if account.balance < required {
 471              return Err(TxValidationError::InsufficientBalance {
 472                  required,
 473                  available: account.balance,
 474              });
 475          }
 476  
 477          Ok(())
 478      }
 479  
 480      /// Update account state after transaction execution
 481      pub fn apply_transaction(&mut self, tx: &DeltaTransaction) {
 482          if let Some(sender) = &tx.sender
 483              && let Some(account) = self.accounts.get_mut(sender)
 484          {
 485              // Deduct amount and fee
 486              account.balance = account.balance.saturating_sub(tx.amount + tx.fee_limit);
 487              // Increment nonce
 488              account.nonce += 1;
 489          }
 490      }
 491  
 492      /// Add or update an account
 493      pub fn set_account(&mut self, address: String, state: AccountState) {
 494          self.accounts.insert(address, state);
 495      }
 496  
 497      /// Get account state
 498      pub fn get_account(&self, address: &str) -> Option<&AccountState> {
 499          self.accounts.get(address)
 500      }
 501  }
 502  
 503  impl Default for TxValidator {
 504      fn default() -> Self {
 505          Self::new()
 506      }
 507  }
 508  
 509  /// Cross-chain attestation from ALPHA
 510  #[derive(Clone, Debug, Serialize, Deserialize)]
 511  pub struct CrossChainAttestation {
 512      /// ALPHA block height
 513      pub alpha_height: u64,
 514      /// State root hash
 515      pub state_root: [u8; 32],
 516      /// Validator signatures
 517      pub signatures: Vec<Vec<u8>>,
 518  }
 519  
 520  // ============================================================================
 521  // BFT Consensus Implementation
 522  // ============================================================================
 523  
 524  /// BFT consensus round phase
 525  #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
 526  pub enum BftPhase {
 527      /// Waiting for block proposal from leader
 528      #[default]
 529      Propose,
 530      /// Pre-vote phase: validators vote on proposed block
 531      PreVote,
 532      /// Pre-commit phase: validators commit to voted block
 533      PreCommit,
 534      /// Commit phase: block is finalized
 535      Commit,
 536  }
 537  
 538  /// A vote in the BFT consensus
 539  #[derive(Clone, Debug, Serialize, Deserialize)]
 540  pub struct BftVote {
 541      /// Height of the block being voted on
 542      pub height: u32,
 543      /// Round number (increments on view change)
 544      pub round: u32,
 545      /// Block hash being voted on (None for nil vote)
 546      pub block_hash: Option<[u8; 32]>,
 547      /// Validator address
 548      pub validator: String,
 549      /// Signature over (height, round, block_hash)
 550      pub signature: Vec<u8>,
 551      /// Vote type
 552      pub vote_type: VoteType,
 553  }
 554  
 555  /// Type of BFT vote
 556  #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
 557  pub enum VoteType {
 558      PreVote,
 559      PreCommit,
 560  }
 561  
 562  /// View change request when leader fails
 563  #[derive(Clone, Debug, Serialize, Deserialize)]
 564  pub struct ViewChangeRequest {
 565      /// Current height
 566      pub height: u32,
 567      /// New round number
 568      pub new_round: u32,
 569      /// Requesting validator
 570      pub validator: String,
 571      /// Reason for view change
 572      pub reason: ViewChangeReason,
 573      /// Signature
 574      pub signature: Vec<u8>,
 575  }
 576  
 577  /// Reason for requesting a view change
 578  #[derive(Clone, Debug, Serialize, Deserialize)]
 579  pub enum ViewChangeReason {
 580      /// Leader timeout - no proposal received
 581      LeaderTimeout,
 582      /// Invalid block proposed
 583      InvalidBlock,
 584      /// Leader is byzantine (conflicting proposals)
 585      ByzantineLeader,
 586  }
 587  
 588  /// BFT round state
 589  #[derive(Clone, Debug)]
 590  pub struct BftRoundState {
 591      /// Current height
 592      pub height: u32,
 593      /// Current round (0-indexed, increments on view change)
 594      pub round: u32,
 595      /// Current phase
 596      pub phase: BftPhase,
 597      /// Proposed block for this round
 598      pub proposed_block: Option<DeltaBlock>,
 599      /// Pre-votes received (validator -> vote)
 600      pub pre_votes: HashMap<String, BftVote>,
 601      /// Pre-commits received (validator -> vote)
 602      pub pre_commits: HashMap<String, BftVote>,
 603      /// View change requests received
 604      pub view_changes: HashMap<String, ViewChangeRequest>,
 605      /// Round start time
 606      pub round_start: Instant,
 607      /// Locked block (if any) from previous round
 608      pub locked_block: Option<DeltaBlock>,
 609      /// Locked round
 610      pub locked_round: Option<u32>,
 611  }
 612  
 613  impl BftRoundState {
 614      /// Create a new round state
 615      pub fn new(height: u32, round: u32) -> Self {
 616          Self {
 617              height,
 618              round,
 619              phase: BftPhase::Propose,
 620              proposed_block: None,
 621              pre_votes: HashMap::new(),
 622              pre_commits: HashMap::new(),
 623              view_changes: HashMap::new(),
 624              round_start: Instant::now(),
 625              locked_block: None,
 626              locked_round: None,
 627          }
 628      }
 629  
 630      /// Reset for a new round (view change)
 631      pub fn new_round(&mut self) {
 632          self.round += 1;
 633          self.phase = BftPhase::Propose;
 634          self.proposed_block = None;
 635          self.pre_votes.clear();
 636          self.pre_commits.clear();
 637          self.view_changes.clear();
 638          self.round_start = Instant::now();
 639          // Note: locked_block and locked_round are preserved across rounds
 640      }
 641  
 642      /// Reset for a new height (after commit)
 643      pub fn new_height(&mut self) {
 644          self.height += 1;
 645          self.round = 0;
 646          self.phase = BftPhase::Propose;
 647          self.proposed_block = None;
 648          self.pre_votes.clear();
 649          self.pre_commits.clear();
 650          self.view_changes.clear();
 651          self.round_start = Instant::now();
 652          self.locked_block = None;
 653          self.locked_round = None;
 654      }
 655  }
 656  
 657  /// BFT consensus manager
 658  pub struct BftConsensus {
 659      /// Current round state
 660      pub state: BftRoundState,
 661      /// Known validators and their voting power
 662      pub validators: HashMap<String, u64>,
 663      /// Total voting power
 664      pub total_power: u64,
 665      /// This node's validator address
 666      pub node_address: String,
 667      /// This node's validator identity (for signing)
 668      pub identity: Option<ValidatorIdentity>,
 669      /// Key store for verifying other validators' signatures
 670      pub key_store: ValidatorKeyStore,
 671      /// Timeout for propose phase
 672      pub propose_timeout: Duration,
 673      /// Timeout for pre-vote phase
 674      pub prevote_timeout: Duration,
 675      /// Timeout for pre-commit phase
 676      pub precommit_timeout: Duration,
 677  }
 678  
 679  impl BftConsensus {
 680      /// Create a new BFT consensus manager (without signing capability)
 681      pub fn new(node_address: String, validators: HashMap<String, u64>) -> Self {
 682          let total_power: u64 = validators.values().sum();
 683          Self {
 684              state: BftRoundState::new(0, 0),
 685              validators,
 686              total_power,
 687              node_address,
 688              identity: None,
 689              key_store: ValidatorKeyStore::new(),
 690              propose_timeout: Duration::from_secs(3),
 691              prevote_timeout: Duration::from_secs(2),
 692              precommit_timeout: Duration::from_secs(2),
 693          }
 694      }
 695  
 696      /// Create a new BFT consensus manager with validator identity for signing
 697      pub fn with_identity(identity: ValidatorIdentity, validators: HashMap<String, u64>) -> Self {
 698          let total_power: u64 = validators.values().sum();
 699          let node_address = identity.address().to_string();
 700          Self {
 701              state: BftRoundState::new(0, 0),
 702              validators,
 703              total_power,
 704              node_address,
 705              identity: Some(identity),
 706              key_store: ValidatorKeyStore::new(),
 707              propose_timeout: Duration::from_secs(3),
 708              prevote_timeout: Duration::from_secs(2),
 709              precommit_timeout: Duration::from_secs(2),
 710          }
 711      }
 712  
 713      /// Register a validator's public key for signature verification
 714      pub fn register_validator_key(&mut self, address: String, public_key: [u8; 32]) {
 715          self.key_store.register(address, public_key);
 716      }
 717  
 718      /// Get the leader for the current round
 719      pub fn get_leader(&self) -> Option<String> {
 720          if self.validators.is_empty() {
 721              return None;
 722          }
 723          let mut validators: Vec<_> = self.validators.keys().cloned().collect();
 724          validators.sort();
 725          let leader_idx =
 726              ((self.state.height as usize) + (self.state.round as usize)) % validators.len();
 727          Some(validators[leader_idx].clone())
 728      }
 729  
 730      /// Check if this node is the current leader
 731      pub fn is_leader(&self) -> bool {
 732          self.get_leader()
 733              .map(|l| l == self.node_address)
 734              .unwrap_or(false)
 735      }
 736  
 737      /// Calculate required voting power for quorum (2f+1 where n=3f+1)
 738      pub fn quorum_power(&self) -> u64 {
 739          // For BFT: need > 2/3 of total power
 740          (self.total_power * 2 / 3) + 1
 741      }
 742  
 743      /// Handle a block proposal
 744      pub fn handle_proposal(&mut self, block: DeltaBlock, proposer: &str) -> Result<(), BftError> {
 745          // Verify proposer is the current leader
 746          let expected_leader = self.get_leader().ok_or(BftError::NoValidators)?;
 747          if proposer != expected_leader {
 748              return Err(BftError::InvalidProposer {
 749                  expected: expected_leader,
 750                  got: proposer.to_string(),
 751              });
 752          }
 753  
 754          // Verify we're in propose phase
 755          if self.state.phase != BftPhase::Propose {
 756              return Err(BftError::WrongPhase {
 757                  expected: BftPhase::Propose,
 758                  got: self.state.phase,
 759              });
 760          }
 761  
 762          // Verify block height matches
 763          if block.height != self.state.height + 1 {
 764              return Err(BftError::InvalidBlockHeight {
 765                  expected: self.state.height + 1,
 766                  got: block.height,
 767              });
 768          }
 769  
 770          // Check if we're locked on a different block
 771          if let (Some(locked_block), Some(locked_round)) =
 772              (&self.state.locked_block, self.state.locked_round)
 773              && locked_round < self.state.round
 774              && locked_block.hash != block.hash
 775          {
 776              // We're locked on a different block, reject proposal
 777              // unless we see 2f+1 pre-votes for this new block
 778              tracing::warn!(
 779                  "Received proposal for different block while locked. \
 780                   Locked on {:?} at round {}",
 781                  hex::encode(&locked_block.hash[..8]),
 782                  locked_round
 783              );
 784          }
 785  
 786          self.state.proposed_block = Some(block);
 787          self.state.phase = BftPhase::PreVote;
 788          tracing::info!(
 789              "Proposal accepted for height {} round {}",
 790              self.state.height + 1,
 791              self.state.round
 792          );
 793  
 794          Ok(())
 795      }
 796  
 797      /// Create a pre-vote for the current proposal
 798      pub fn create_prevote(&self, vote_for_block: bool) -> Option<BftVote> {
 799          if self.state.phase != BftPhase::PreVote {
 800              return None;
 801          }
 802  
 803          let block_hash = if vote_for_block {
 804              self.state.proposed_block.as_ref().map(|b| b.hash)
 805          } else {
 806              None // nil vote
 807          };
 808  
 809          Some(BftVote {
 810              height: self.state.height + 1,
 811              round: self.state.round,
 812              block_hash,
 813              validator: self.node_address.clone(),
 814              signature: self.sign_vote(
 815                  self.state.height + 1,
 816                  self.state.round,
 817                  block_hash,
 818                  VoteType::PreVote,
 819              ),
 820              vote_type: VoteType::PreVote,
 821          })
 822      }
 823  
 824      /// Handle a pre-vote
 825      pub fn handle_prevote(&mut self, vote: BftVote) -> Result<bool, BftError> {
 826          // Verify vote is for current height and round
 827          if vote.height != self.state.height + 1 || vote.round != self.state.round {
 828              return Err(BftError::VoteMismatch);
 829          }
 830  
 831          // Verify voter is a valid validator
 832          if !self.validators.contains_key(&vote.validator) {
 833              return Err(BftError::UnknownValidator(vote.validator));
 834          }
 835  
 836          // Store the vote
 837          self.state.pre_votes.insert(vote.validator.clone(), vote);
 838  
 839          // Check if we have quorum for any block
 840          Ok(self.check_prevote_quorum())
 841      }
 842  
 843      /// Check if we have pre-vote quorum
 844      fn check_prevote_quorum(&mut self) -> bool {
 845          let proposed_hash = match &self.state.proposed_block {
 846              Some(b) => Some(b.hash),
 847              None => return false,
 848          };
 849  
 850          let mut power_for_block = 0u64;
 851          let mut power_for_nil = 0u64;
 852  
 853          for (validator, vote) in &self.state.pre_votes {
 854              let power = self.validators.get(validator).copied().unwrap_or(0);
 855              if vote.block_hash == proposed_hash {
 856                  power_for_block += power;
 857              } else if vote.block_hash.is_none() {
 858                  power_for_nil += power;
 859              }
 860          }
 861  
 862          let quorum = self.quorum_power();
 863  
 864          if power_for_block >= quorum {
 865              tracing::info!(
 866                  "Pre-vote quorum reached for block at height {} round {}",
 867                  self.state.height + 1,
 868                  self.state.round
 869              );
 870              self.state.phase = BftPhase::PreCommit;
 871              // Lock on the block
 872              self.state.locked_block = self.state.proposed_block.clone();
 873              self.state.locked_round = Some(self.state.round);
 874              return true;
 875          }
 876  
 877          if power_for_nil >= quorum {
 878              tracing::info!(
 879                  "Pre-vote quorum for NIL at height {} round {}, triggering view change",
 880                  self.state.height + 1,
 881                  self.state.round
 882              );
 883              // Don't advance phase, will trigger timeout and view change
 884          }
 885  
 886          false
 887      }
 888  
 889      /// Create a pre-commit for the locked block
 890      pub fn create_precommit(&self) -> Option<BftVote> {
 891          if self.state.phase != BftPhase::PreCommit {
 892              return None;
 893          }
 894  
 895          let block_hash = self.state.locked_block.as_ref().map(|b| b.hash);
 896  
 897          Some(BftVote {
 898              height: self.state.height + 1,
 899              round: self.state.round,
 900              block_hash,
 901              validator: self.node_address.clone(),
 902              signature: self.sign_vote(
 903                  self.state.height + 1,
 904                  self.state.round,
 905                  block_hash,
 906                  VoteType::PreCommit,
 907              ),
 908              vote_type: VoteType::PreCommit,
 909          })
 910      }
 911  
 912      /// Handle a pre-commit
 913      pub fn handle_precommit(&mut self, vote: BftVote) -> Result<Option<DeltaBlock>, BftError> {
 914          // Verify vote is for current height and round
 915          if vote.height != self.state.height + 1 || vote.round != self.state.round {
 916              return Err(BftError::VoteMismatch);
 917          }
 918  
 919          // Verify voter is a valid validator
 920          if !self.validators.contains_key(&vote.validator) {
 921              return Err(BftError::UnknownValidator(vote.validator));
 922          }
 923  
 924          // Store the vote
 925          self.state.pre_commits.insert(vote.validator.clone(), vote);
 926  
 927          // Check if we have quorum
 928          self.check_precommit_quorum()
 929      }
 930  
 931      /// Check if we have pre-commit quorum
 932      fn check_precommit_quorum(&mut self) -> Result<Option<DeltaBlock>, BftError> {
 933          let locked_hash = match &self.state.locked_block {
 934              Some(b) => b.hash,
 935              None => return Ok(None),
 936          };
 937  
 938          let mut power_for_block = 0u64;
 939  
 940          for (validator, vote) in &self.state.pre_commits {
 941              let power = self.validators.get(validator).copied().unwrap_or(0);
 942              if vote.block_hash == Some(locked_hash) {
 943                  power_for_block += power;
 944              }
 945          }
 946  
 947          if power_for_block >= self.quorum_power() {
 948              tracing::info!(
 949                  "Pre-commit quorum reached! Committing block at height {} round {}",
 950                  self.state.height + 1,
 951                  self.state.round
 952              );
 953              self.state.phase = BftPhase::Commit;
 954              let block = self.state.locked_block.clone();
 955  
 956              // Advance to next height
 957              self.state.new_height();
 958  
 959              return Ok(block);
 960          }
 961  
 962          Ok(None)
 963      }
 964  
 965      /// Handle a view change request
 966      pub fn handle_view_change(&mut self, request: ViewChangeRequest) -> Result<bool, BftError> {
 967          // Verify request is for current height
 968          if request.height != self.state.height + 1 {
 969              return Err(BftError::VoteMismatch);
 970          }
 971  
 972          // Verify requester is a valid validator
 973          if !self.validators.contains_key(&request.validator) {
 974              return Err(BftError::UnknownValidator(request.validator));
 975          }
 976  
 977          // Store the request
 978          self.state
 979              .view_changes
 980              .insert(request.validator.clone(), request);
 981  
 982          // Check if we have enough view change requests
 983          self.check_view_change_quorum()
 984      }
 985  
 986      /// Check if we have view change quorum
 987      fn check_view_change_quorum(&mut self) -> Result<bool, BftError> {
 988          // Need f+1 validators to request view change (where n=3f+1)
 989          // This is weaker than quorum to prevent liveness issues
 990          let threshold = (self.total_power / 3) + 1;
 991  
 992          let mut power = 0u64;
 993          let mut max_round = self.state.round;
 994  
 995          for (validator, request) in &self.state.view_changes {
 996              let vp = self.validators.get(validator).copied().unwrap_or(0);
 997              power += vp;
 998              if request.new_round > max_round {
 999                  max_round = request.new_round;
1000              }
1001          }
1002  
1003          if power >= threshold {
1004              tracing::warn!(
1005                  "View change triggered at height {} from round {} to round {}",
1006                  self.state.height + 1,
1007                  self.state.round,
1008                  max_round
1009              );
1010              // Advance to new round
1011              while self.state.round < max_round {
1012                  self.state.new_round();
1013              }
1014              return Ok(true);
1015          }
1016  
1017          Ok(false)
1018      }
1019  
1020      /// Check for timeouts and trigger view change if needed
1021      pub fn check_timeouts(&mut self) -> Option<ViewChangeRequest> {
1022          let elapsed = self.state.round_start.elapsed();
1023  
1024          let should_timeout = match self.state.phase {
1025              BftPhase::Propose => elapsed > self.propose_timeout,
1026              BftPhase::PreVote => elapsed > self.propose_timeout + self.prevote_timeout,
1027              BftPhase::PreCommit => {
1028                  elapsed > self.propose_timeout + self.prevote_timeout + self.precommit_timeout
1029              }
1030              BftPhase::Commit => false, // No timeout in commit phase
1031          };
1032  
1033          if should_timeout {
1034              tracing::warn!(
1035                  "Timeout in {:?} phase at height {} round {}",
1036                  self.state.phase,
1037                  self.state.height + 1,
1038                  self.state.round
1039              );
1040  
1041              let reason = match self.state.phase {
1042                  BftPhase::Propose => ViewChangeReason::LeaderTimeout,
1043                  _ => ViewChangeReason::LeaderTimeout,
1044              };
1045  
1046              let height = self.state.height + 1;
1047              let new_round = self.state.round + 1;
1048              let signature = self.sign_view_change(height, new_round, &reason);
1049  
1050              return Some(ViewChangeRequest {
1051                  height,
1052                  new_round,
1053                  validator: self.node_address.clone(),
1054                  reason,
1055                  signature,
1056              });
1057          }
1058  
1059          None
1060      }
1061  
1062      /// Sign a vote using Ed25519
1063      fn sign_vote(
1064          &self,
1065          height: u32,
1066          round: u32,
1067          block_hash: Option<[u8; 32]>,
1068          vote_type: VoteType,
1069      ) -> Vec<u8> {
1070          if let Some(identity) = &self.identity {
1071              // Use proper Ed25519 signing
1072              let vote_type_byte = match vote_type {
1073                  VoteType::PreVote => 0u8,
1074                  VoteType::PreCommit => 1u8,
1075              };
1076              identity
1077                  .sign_vote(height, round, block_hash, vote_type_byte)
1078                  .to_vec()
1079          } else {
1080              // Fallback to blake3 hash if no identity (for backwards compatibility)
1081              let mut hasher = blake3::Hasher::new();
1082              hasher.update(&height.to_le_bytes());
1083              hasher.update(&round.to_le_bytes());
1084              if let Some(hash) = block_hash {
1085                  hasher.update(&hash);
1086              }
1087              hasher.update(self.node_address.as_bytes());
1088              hasher.finalize().as_bytes().to_vec()
1089          }
1090      }
1091  
1092      /// Verify a vote signature using Ed25519
1093      #[allow(dead_code)]
1094      fn verify_vote_signature(&self, vote: &BftVote) -> bool {
1095          let vote_type_byte = match vote.vote_type {
1096              VoteType::PreVote => 0u8,
1097              VoteType::PreCommit => 1u8,
1098          };
1099          self.key_store.verify_vote(
1100              &vote.validator,
1101              vote.height,
1102              vote.round,
1103              vote.block_hash,
1104              vote_type_byte,
1105              &vote.signature,
1106          )
1107      }
1108  
1109      /// Sign a view change request using Ed25519
1110      fn sign_view_change(&self, height: u32, new_round: u32, reason: &ViewChangeReason) -> Vec<u8> {
1111          let reason_byte = match reason {
1112              ViewChangeReason::LeaderTimeout => 0u8,
1113              ViewChangeReason::InvalidBlock => 1u8,
1114              ViewChangeReason::ByzantineLeader => 2u8,
1115          };
1116  
1117          if let Some(identity) = &self.identity {
1118              identity
1119                  .sign_view_change(height, new_round, reason_byte)
1120                  .to_vec()
1121          } else {
1122              // Fallback to empty signature if no identity
1123              vec![]
1124          }
1125      }
1126  
1127      /// Verify a view change signature using Ed25519
1128      #[allow(dead_code)]
1129      fn verify_view_change_signature(&self, request: &ViewChangeRequest) -> bool {
1130          let reason_byte = match &request.reason {
1131              ViewChangeReason::LeaderTimeout => 0u8,
1132              ViewChangeReason::InvalidBlock => 1u8,
1133              ViewChangeReason::ByzantineLeader => 2u8,
1134          };
1135          self.key_store.verify_view_change(
1136              &request.validator,
1137              request.height,
1138              request.new_round,
1139              reason_byte,
1140              &request.signature,
1141          )
1142      }
1143  }
1144  
1145  /// BFT consensus errors
1146  #[derive(Debug, Clone)]
1147  pub enum BftError {
1148      /// No validators configured
1149      NoValidators,
1150      /// Invalid block proposer
1151      InvalidProposer { expected: String, got: String },
1152      /// Wrong consensus phase
1153      WrongPhase { expected: BftPhase, got: BftPhase },
1154      /// Invalid block height
1155      InvalidBlockHeight { expected: u32, got: u32 },
1156      /// Vote height/round mismatch
1157      VoteMismatch,
1158      /// Unknown validator
1159      UnknownValidator(String),
1160  }
1161  
1162  impl std::fmt::Display for BftError {
1163      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1164          match self {
1165              BftError::NoValidators => write!(f, "No validators configured"),
1166              BftError::InvalidProposer { expected, got } => {
1167                  write!(f, "Invalid proposer: expected {}, got {}", expected, got)
1168              }
1169              BftError::WrongPhase { expected, got } => {
1170                  write!(f, "Wrong phase: expected {:?}, got {:?}", expected, got)
1171              }
1172              BftError::InvalidBlockHeight { expected, got } => {
1173                  write!(
1174                      f,
1175                      "Invalid block height: expected {}, got {}",
1176                      expected, got
1177                  )
1178              }
1179              BftError::VoteMismatch => write!(f, "Vote height/round mismatch"),
1180              BftError::UnknownValidator(v) => write!(f, "Unknown validator: {}", v),
1181          }
1182      }
1183  }
1184  
1185  impl std::error::Error for BftError {}
1186  
1187  /// Consensus configuration
1188  #[derive(Clone, Debug)]
1189  pub struct ConsensusConfig {
1190      /// Target block time
1191      pub block_time: Duration,
1192      /// Maximum transactions per block
1193      pub max_txs_per_block: usize,
1194      /// Minimum validators for consensus
1195      pub min_validators: usize,
1196  }
1197  
1198  impl Default for ConsensusConfig {
1199      fn default() -> Self {
1200          Self {
1201              block_time: Duration::from_secs(2), // 2 second blocks
1202              max_txs_per_block: 1000,
1203              min_validators: 1,
1204          }
1205      }
1206  }
1207  
1208  /// Block producer for DELTA chain
1209  pub struct BlockProducer {
1210      config: ConsensusConfig,
1211      current_height: u32,
1212      pending_txs: Vec<DeltaTransaction>,
1213      pending_attestations: Vec<CrossChainAttestation>,
1214      last_block_time: Instant,
1215      /// Hash of the last produced block (genesis has all zeros)
1216      last_block_hash: [u8; 32],
1217      /// Transaction validator
1218      validator: TxValidator,
1219      /// Pending exchange order transactions for matching
1220      pending_exchange_txs: Vec<DeltaTransaction>,
1221      /// Exchange settlement transactions generated from order matching
1222      settlement_txs: Vec<DeltaTransaction>,
1223  }
1224  
1225  impl BlockProducer {
1226      /// Create a new block producer
1227      pub fn new(config: ConsensusConfig) -> Self {
1228          Self {
1229              config,
1230              current_height: 0,
1231              pending_txs: Vec::new(),
1232              pending_attestations: Vec::new(),
1233              last_block_time: Instant::now(),
1234              last_block_hash: [0; 32], // Genesis block has zero hash
1235              validator: TxValidator::new(),
1236              pending_exchange_txs: Vec::new(),
1237              settlement_txs: Vec::new(),
1238          }
1239      }
1240  
1241      /// Create a block producer initialized from a stored state (for recovery)
1242      pub fn with_starting_state(config: ConsensusConfig, height: u32, last_hash: [u8; 32]) -> Self {
1243          Self {
1244              config,
1245              current_height: height,
1246              pending_txs: Vec::new(),
1247              pending_attestations: Vec::new(),
1248              last_block_time: Instant::now(),
1249              last_block_hash: last_hash,
1250              validator: TxValidator::new(),
1251              pending_exchange_txs: Vec::new(),
1252              settlement_txs: Vec::new(),
1253          }
1254      }
1255  
1256      /// Create the genesis block for the DELTA chain
1257      /// Returns (block, genesis_hash) for storage and P2P initialization
1258      pub fn create_genesis_block(producer_addr: &str) -> DeltaBlock {
1259          // Genesis block has height 1 and zero previous hash
1260          let height = 1u32;
1261          let previous_hash = [0u8; 32];
1262          let timestamp = std::time::SystemTime::now()
1263              .duration_since(std::time::UNIX_EPOCH)
1264              .unwrap_or_default()
1265              .as_secs();
1266  
1267          // Compute genesis block hash
1268          let mut hasher = blake3::Hasher::new();
1269          hasher.update(&previous_hash);
1270          hasher.update(&height.to_le_bytes());
1271          hasher.update(&timestamp.to_le_bytes());
1272          hasher.update(producer_addr.as_bytes());
1273          // Genesis has no transactions
1274          let hash: [u8; 32] = hasher.finalize().into();
1275  
1276          tracing::info!(
1277              "Created genesis block with hash: {}",
1278              hex::encode(&hash[..8])
1279          );
1280  
1281          DeltaBlock {
1282              height,
1283              previous_hash,
1284              timestamp,
1285              producer: producer_addr.to_string(),
1286              transactions: Vec::new(),
1287              attestations: Vec::new(),
1288              hash,
1289          }
1290      }
1291  
1292      /// Get current block height
1293      pub fn current_height(&self) -> u32 {
1294          self.current_height
1295      }
1296  
1297      /// Get last block hash
1298      pub fn last_block_hash(&self) -> [u8; 32] {
1299          self.last_block_hash
1300      }
1301  
1302      /// Add a transaction to the pending pool with validation
1303      pub fn add_transaction(&mut self, tx: DeltaTransaction) -> Result<(), TxValidationError> {
1304          // Check pool capacity
1305          if self.pending_txs.len() + self.pending_exchange_txs.len() >= self.config.max_txs_per_block
1306          {
1307              return Err(TxValidationError::InvalidFormat(
1308                  "Transaction pool is full".to_string(),
1309              ));
1310          }
1311  
1312          // Validate the transaction
1313          match self.validator.validate(&tx) {
1314              TxValidationResult::Valid => {
1315                  tracing::debug!(
1316                      "Transaction {:?} validated successfully",
1317                      hex::encode(&tx.id[..8])
1318                  );
1319  
1320                  // Route exchange transactions to separate pool for order matching
1321                  if tx.tx_type.is_exchange_tx() {
1322                      tracing::debug!(
1323                          "Adding exchange transaction {:?} to matching pool",
1324                          hex::encode(&tx.id[..8])
1325                      );
1326                      self.pending_exchange_txs.push(tx);
1327                  } else {
1328                      self.pending_txs.push(tx);
1329                  }
1330                  Ok(())
1331              }
1332              TxValidationResult::Invalid(e) => {
1333                  tracing::warn!(
1334                      "Transaction {:?} validation failed: {}",
1335                      hex::encode(&tx.id[..8]),
1336                      e
1337                  );
1338                  Err(e)
1339              }
1340          }
1341      }
1342  
1343      /// Add a transaction without validation (for internal use or pre-validated txs)
1344      pub fn add_transaction_unchecked(&mut self, tx: DeltaTransaction) {
1345          if self.pending_txs.len() < self.config.max_txs_per_block {
1346              self.pending_txs.push(tx);
1347          }
1348      }
1349  
1350      /// Get a reference to the transaction validator
1351      pub fn validator(&self) -> &TxValidator {
1352          &self.validator
1353      }
1354  
1355      /// Get a mutable reference to the transaction validator
1356      pub fn validator_mut(&mut self) -> &mut TxValidator {
1357          &mut self.validator
1358      }
1359  
1360      /// Add an attestation from ALPHA chain
1361      pub fn add_attestation(&mut self, attestation: CrossChainAttestation) {
1362          self.pending_attestations.push(attestation);
1363      }
1364  
1365      /// Check if it's time to produce a block
1366      pub fn should_produce_block(&self) -> bool {
1367          self.last_block_time.elapsed() >= self.config.block_time
1368      }
1369  
1370      /// Produce a new block
1371      pub fn produce_block(&mut self, producer: &str) -> DeltaBlock {
1372          let timestamp = std::time::SystemTime::now()
1373              .duration_since(std::time::UNIX_EPOCH)
1374              .unwrap_or_default()
1375              .as_secs();
1376  
1377          // Take regular transactions
1378          let mut transactions = std::mem::take(&mut self.pending_txs);
1379  
1380          // Process exchange transactions and generate settlements
1381          // This is where order matching happens
1382          let exchange_txs = std::mem::take(&mut self.pending_exchange_txs);
1383          let settlements = self.process_exchange_transactions(&exchange_txs, timestamp);
1384  
1385          // Add exchange transactions and their settlements to block
1386          transactions.extend(exchange_txs);
1387          transactions.extend(settlements);
1388  
1389          // Clear settlement buffer
1390          self.settlement_txs.clear();
1391  
1392          let attestations = std::mem::take(&mut self.pending_attestations);
1393  
1394          // Store previous hash before computing new one
1395          let previous_hash = self.last_block_hash;
1396  
1397          // Compute block hash including previous hash for chain integrity
1398          let mut hasher = blake3::Hasher::new();
1399          hasher.update(&previous_hash);
1400          hasher.update(&(self.current_height + 1).to_le_bytes());
1401          hasher.update(&timestamp.to_le_bytes());
1402          hasher.update(producer.as_bytes());
1403          // Include transaction hashes in block hash
1404          for tx in &transactions {
1405              hasher.update(&tx.id);
1406          }
1407          let hash: [u8; 32] = hasher.finalize().into();
1408  
1409          // Update state for next block
1410          self.current_height += 1;
1411          self.last_block_time = Instant::now();
1412          self.last_block_hash = hash;
1413  
1414          // Log exchange activity
1415          let exchange_count = transactions
1416              .iter()
1417              .filter(|tx| tx.tx_type.is_exchange_tx())
1418              .count();
1419          let settlement_count = transactions
1420              .iter()
1421              .filter(|tx| tx.tx_type == TransactionType::ExchangeSettlement)
1422              .count();
1423  
1424          if exchange_count > 0 || settlement_count > 0 {
1425              tracing::info!(
1426                  "Block {} includes {} exchange txs and {} settlements",
1427                  self.current_height,
1428                  exchange_count - settlement_count,
1429                  settlement_count
1430              );
1431          }
1432  
1433          DeltaBlock {
1434              height: self.current_height,
1435              previous_hash,
1436              timestamp,
1437              producer: producer.to_string(),
1438              transactions,
1439              attestations,
1440              hash,
1441          }
1442      }
1443  
1444      /// Process exchange transactions and generate settlement transactions
1445      /// This is where the order matching logic runs during block production
1446      fn process_exchange_transactions(
1447          &mut self,
1448          exchange_txs: &[DeltaTransaction],
1449          timestamp: u64,
1450      ) -> Vec<DeltaTransaction> {
1451          let mut settlements = Vec::new();
1452  
1453          // Exchange fee parameters from spec (will be used when matching is implemented)
1454          const _MAKER_FEE_BPS: u64 = 25; // 0.025%
1455          const _TAKER_FEE_BPS: u64 = 75; // 0.075%
1456          const _FEE_DENOMINATOR: u64 = 100_000; // Basis points denominator
1457  
1458          // Process each exchange transaction
1459          for tx in exchange_txs {
1460              match tx.tx_type {
1461                  TransactionType::ExchangeLimitOrder | TransactionType::ExchangeMarketOrder => {
1462                      // Order matching would happen here
1463                      // In a full implementation, this would:
1464                      // 1. Parse order details from tx.data
1465                      // 2. Check against existing order book
1466                      // 3. Match orders and generate trades
1467                      // 4. Create settlement transactions for matched trades
1468  
1469                      // For now, we generate a placeholder settlement if the order amount
1470                      // indicates it should match (simplified logic)
1471                      if tx.amount > 0 && self.should_simulate_match(tx) {
1472                          let settlement = self.create_settlement_tx(tx, timestamp);
1473                          settlements.push(settlement);
1474                      }
1475                  }
1476                  TransactionType::ExchangeCancelOrder => {
1477                      // Cancel order - would remove from order book
1478                      // No settlement needed for cancellations
1479                      tracing::debug!(
1480                          "Processing order cancellation: {:?}",
1481                          hex::encode(&tx.id[..8])
1482                      );
1483                  }
1484                  _ => {}
1485              }
1486          }
1487  
1488          settlements
1489      }
1490  
1491      /// Determine if an order should result in a simulated match
1492      /// In production, this would be replaced by actual order book matching
1493      fn should_simulate_match(&self, _tx: &DeltaTransaction) -> bool {
1494          // For demonstration, match 50% of orders
1495          // In production, this would check the order book for matching orders
1496          self.current_height.is_multiple_of(2)
1497      }
1498  
1499      /// Create a settlement transaction for a matched trade
1500      fn create_settlement_tx(
1501          &self,
1502          original_tx: &DeltaTransaction,
1503          timestamp: u64,
1504      ) -> DeltaTransaction {
1505          // Create settlement data
1506          let mut data = Vec::new();
1507          // Include original order ID
1508          data.extend_from_slice(&original_tx.id);
1509          // Include matched quantity (simplified: full fill)
1510          data.extend_from_slice(&original_tx.amount.to_le_bytes());
1511          // Include timestamp
1512          data.extend_from_slice(&timestamp.to_le_bytes());
1513  
1514          // Compute settlement transaction ID
1515          let mut hasher = blake3::Hasher::new();
1516          hasher.update(&[TransactionType::ExchangeSettlement.as_byte()]);
1517          hasher.update(&original_tx.id);
1518          hasher.update(&timestamp.to_le_bytes());
1519          let settlement_id: [u8; 32] = hasher.finalize().into();
1520  
1521          DeltaTransaction {
1522              id: settlement_id,
1523              tx_type: TransactionType::ExchangeSettlement,
1524              sender: original_tx.sender.clone(),
1525              sender_pubkey: original_tx.sender_pubkey.clone(),
1526              nonce: original_tx.nonce,
1527              amount: original_tx.amount,
1528              fee_limit: 0, // Settlements don't pay additional fees
1529              data,
1530              signature: vec![], // Internal transaction, no signature needed
1531          }
1532      }
1533  
1534      /// Get pending exchange transaction count
1535      pub fn pending_exchange_tx_count(&self) -> usize {
1536          self.pending_exchange_txs.len()
1537      }
1538  
1539      /// Get pending transaction count
1540      pub fn pending_tx_count(&self) -> usize {
1541          self.pending_txs.len()
1542      }
1543  }
1544  
1545  /// Consensus engine running in background
1546  pub struct ConsensusEngine {
1547      producer: BlockProducer,
1548      shutdown_rx: mpsc::Receiver<()>,
1549      block_tx: mpsc::Sender<DeltaBlock>,
1550  }
1551  
1552  impl ConsensusEngine {
1553      /// Create a new consensus engine (starts from height 0)
1554      pub fn new(
1555          config: ConsensusConfig,
1556          shutdown_rx: mpsc::Receiver<()>,
1557          block_tx: mpsc::Sender<DeltaBlock>,
1558      ) -> Self {
1559          Self {
1560              producer: BlockProducer::new(config),
1561              shutdown_rx,
1562              block_tx,
1563          }
1564      }
1565  
1566      /// Create a consensus engine initialized from a stored state (for recovery)
1567      pub fn with_starting_state(
1568          config: ConsensusConfig,
1569          shutdown_rx: mpsc::Receiver<()>,
1570          block_tx: mpsc::Sender<DeltaBlock>,
1571          height: u32,
1572          last_hash: [u8; 32],
1573      ) -> Self {
1574          tracing::info!(
1575              "Consensus engine resuming from height {} (last hash: {})",
1576              height,
1577              hex::encode(&last_hash[..8])
1578          );
1579          Self {
1580              producer: BlockProducer::with_starting_state(config, height, last_hash),
1581              shutdown_rx,
1582              block_tx,
1583          }
1584      }
1585  
1586      /// Run the consensus engine
1587      pub async fn run(mut self, node_address: String) {
1588          tracing::info!("Consensus engine started");
1589  
1590          loop {
1591              tokio::select! {
1592                  _ = self.shutdown_rx.recv() => {
1593                      tracing::info!("Consensus engine shutting down");
1594                      break;
1595                  }
1596                  _ = tokio::time::sleep(Duration::from_millis(100)) => {
1597                      if self.producer.should_produce_block() {
1598                          let block = self.producer.produce_block(&node_address);
1599                          tracing::info!(
1600                              "Produced block {} with {} txs",
1601                              block.height,
1602                              block.transactions.len()
1603                          );
1604  
1605                          if let Err(e) = self.block_tx.send(block).await {
1606                              tracing::error!("Failed to send block: {}", e);
1607                          }
1608                      }
1609                  }
1610              }
1611          }
1612      }
1613  
1614      /// Get block producer reference
1615      pub fn producer(&self) -> &BlockProducer {
1616          &self.producer
1617      }
1618  
1619      /// Get mutable block producer reference
1620      pub fn producer_mut(&mut self) -> &mut BlockProducer {
1621          &mut self.producer
1622      }
1623  }