/ abzu-core / src / node.rs
node.rs
   1  //! Node Context
   2  //!
   3  //! The core state container for an Abzu mesh node.
   4  
   5  use std::collections::HashMap;
   6  use std::sync::Arc;
   7  use std::sync::atomic::{AtomicU64, Ordering};
   8  
   9  use dashmap::DashMap;
  10  use ed25519_dalek::{SigningKey, VerifyingKey};
  11  
  12  use crate::identity::MachineIdentity;
  13  use serde::{Deserialize, Serialize};
  14  use sled::Db;
  15  use thiserror::Error;
  16  use tokio::sync::{Mutex, Notify, RwLock, mpsc};
  17  use tracing::{debug, info, warn};
  18  
  19  use abzu_router::{address_for_key, Address, PeerKey, RoutingTable, TreeCoords};
  20  use abzu_dht::{
  21      DhtKey, ValueStore, RoutingTable as DhtRoutingTable, StoredValue, ValueType, 
  22  };
  23  use abzu_transport::{AbzuInterface, AbzuInterfaceExt, AbzuFrame, TransportError};
  24  
  25  use crate::codec;
  26  use crate::config::{HomeNodeConfig, SecurityTier, NodeRole};
  27  use crate::persistence::PeerStore;
  28  use crate::replication::ReplicationService;
  29  use crate::storage::{SledBackend, UserRegistry, UserStore, StorageError};
  30  use crate::trust::{
  31      Circle, MemberRole, TrustPolicy, TrustEngine, TrustError,
  32  };
  33  
  34  /// Errors from node operations
  35  #[derive(Error, Debug)]
  36  pub enum NodeError {
  37      #[error("Storage error: {0}")]
  38      Storage(#[from] sled::Error),
  39  
  40      #[error("Crypto error: {0}")]
  41      Crypto(String),
  42  
  43      #[error("Transport error: {0}")]
  44      Transport(#[from] abzu_transport::TransportError),
  45  
  46      #[error("Routing error: {0}")]
  47      Routing(#[from] abzu_router::RoutingError),
  48  
  49      #[error("Peer not found: {0:?}")]
  50      PeerNotFound(PeerKey),
  51  
  52      #[error("Node not running")]
  53      NotRunning,
  54  
  55      #[error("Serialization error: {0}")]
  56      Serialization(String),
  57  
  58      #[error("Message not found: {0}")]
  59      MessageNotFound(u64),
  60  
  61      #[error("Connection error: {0}")]
  62      Connection(String),
  63  
  64      #[error("No peer connection")]
  65      NoPeerConnection,
  66  
  67      #[error("DHT error: {0}")]
  68      DhtError(String),
  69  
  70      #[error("Operation timed out")]
  71      Timeout,
  72      
  73      #[error("Operation rejected (rate limited)")]
  74      OperationRejected,
  75  
  76      #[error("Validation error: {0}")]
  77      Validation(String),
  78  
  79      #[error("Replication error: {0}")]
  80      Replication(String),
  81  
  82      #[error("Trust error: {0}")]
  83      Trust(#[from] TrustError),
  84  
  85      #[error("User storage error: {0}")]
  86      UserStorage(#[from] StorageError),
  87  }
  88  
  89  /// Service event for generic transport
  90  #[derive(Debug, Clone)]
  91  pub enum ServiceEvent {
  92      /// Incoming request from a peer
  93      Request {
  94          request_id: u64,
  95          requester: PeerKey,
  96          payload: Vec<u8>,
  97      },
  98      /// Response to a request
  99      Response {
 100          request_id: u64,
 101          status: u8,
 102          payload: Vec<u8>,
 103      },
 104  }
 105  
 106  /// Direction of a chat message
 107  #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
 108  pub enum MessageDirection {
 109      Inbound,
 110      Outbound,
 111  }
 112  
 113  /// Delivery status of a chat message
 114  #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
 115  pub enum MessageStatus {
 116      Pending,
 117      Delivered,
 118      Read,
 119      Failed,
 120  }
 121  
 122  /// A stored chat message
 123  #[derive(Debug, Clone, Serialize, Deserialize)]
 124  pub struct StoredMessage {
 125      /// Unique message ID
 126      pub id: u64,
 127      /// The other party (sender for inbound, recipient for outbound)
 128      pub peer: [u8; 32],
 129      /// Message content (plaintext - encryption at transport layer)
 130      pub content: Vec<u8>,
 131      /// Unix timestamp (milliseconds)
 132      pub timestamp: u64,
 133      /// Direction of the message
 134      pub direction: MessageDirection,
 135      /// Delivery status
 136      pub status: MessageStatus,
 137  }
 138  
 139  /// A contact in the address book
 140  #[derive(Debug, Clone, Serialize, Deserialize)]
 141  pub struct Contact {
 142      /// Human-readable alias
 143      pub alias: String,
 144      /// Public key (32 bytes)
 145      pub pubkey: [u8; 32],
 146      /// When the contact was added (Unix ms)
 147      pub added_at: u64,
 148  }
 149  
 150  // ─────────────────────────────────────────────────────────────────────────────
 151  // Storage Constants
 152  // ─────────────────────────────────────────────────────────────────────────────
 153  
 154  /// Key for persisting the next message ID counter (O(1) startup)
 155  const MSG_ID_COUNTER_KEY: &[u8] = b"__msg_id_counter__";
 156  
 157  /// A stored Circle message
 158  #[derive(Debug, Clone, Serialize, Deserialize)]
 159  pub struct StoredCircleMessage {
 160      /// Circle ID
 161      pub circle_id: [u8; 32],
 162      /// Unique message ID within the circle
 163      pub id: u64,
 164      /// Sender's public key
 165      pub sender: [u8; 32],
 166      /// Message content (plaintext)
 167      pub content: Vec<u8>,
 168      /// Unix timestamp (milliseconds)
 169      pub timestamp: u64,
 170      /// Direction (we sent or received)
 171      pub direction: MessageDirection,
 172  }
 173  
 174  /// Configuration for a node
 175  #[derive(Debug, Clone)]
 176  pub struct NodeConfig {
 177      /// Path to the storage directory
 178      pub storage_path: String,
 179      /// Listen address for incoming connections
 180      pub listen_addr: Option<String>,
 181      /// Initial peers to connect to
 182      pub bootstrap_peers: Vec<String>,
 183      /// Heartbeat interval in milliseconds
 184      pub heartbeat_ms: u64,
 185      /// Security tier (Off, Blend, Shadow, Ghost)
 186      pub security_tier: SecurityTier,
 187      /// Node role (Edge, Desktop, Infrastructure)
 188      pub node_role: NodeRole,
 189      /// Optional Home Node for storage/mailbox delegation (Edge nodes)
 190      pub home_node: Option<HomeNodeConfig>,
 191  }
 192  
 193  impl Default for NodeConfig {
 194      fn default() -> Self {
 195          Self {
 196              storage_path: "./abzu-data".to_string(),
 197              listen_addr: None,
 198              bootstrap_peers: Vec::new(),
 199              heartbeat_ms: 500,
 200              security_tier: SecurityTier::Blend,
 201              node_role: NodeRole::default(),
 202              home_node: None,
 203          }
 204      }
 205  }
 206  
 207  /// An active peer connection
 208  pub struct PeerConnection {
 209      /// The transport interface
 210      pub interface: Box<dyn AbzuInterface>,
 211      /// Last activity timestamp
 212      pub last_activity: std::time::Instant,
 213      /// Bytes sent
 214      pub tx_bytes: u64,
 215      /// Bytes received
 216      pub rx_bytes: u64,
 217  }
 218  
 219  impl PeerConnection {
 220      pub fn new(interface: Box<dyn AbzuInterface>) -> Self {
 221          Self {
 222              interface,
 223              last_activity: std::time::Instant::now(),
 224              tx_bytes: 0,
 225              rx_bytes: 0,
 226          }
 227      }
 228  
 229      /// Check if this connection needs a keepalive
 230      pub fn needs_keepalive(&self, idle_ms: u64) -> bool {
 231          self.last_activity.elapsed().as_millis() as u64 >= idle_ms
 232      }
 233  
 234      /// Mark activity
 235      pub fn touch(&mut self) {
 236          self.last_activity = std::time::Instant::now();
 237      }
 238  }
 239  
 240  /// The main node state container
 241  pub struct Node {
 242      /// Machine identity (infrastructure: routing, DHT, transport)
 243      /// This is NOT a user identity - it's tied to this device/node.
 244      machine_identity: MachineIdentity,
 245      /// Our derived address
 246      address: Address,
 247      /// The routing table (pure logic)
 248      router: Arc<RwLock<RoutingTable>>,
 249      /// Active peer connections
 250      peers: Arc<Mutex<HashMap<PeerKey, PeerConnection>>>,
 251      /// Content-addressed storage (sled)
 252      store: Db,
 253      /// User registry for multi-tenant storage
 254      user_registry: UserRegistry,
 255      /// Current user's store (bound on init for single-user nodes)
 256      current_user: Arc<UserStore>,
 257      /// Known peers for reconnection (submarine test)
 258      known_peers: PeerStore,
 259      /// Monotonic message ID counter
 260      next_msg_id: AtomicU64,
 261      /// Pending content fetches - notifies when content arrives
 262      pending_fetches: Arc<DashMap<[u8; 32], Arc<Notify>>>,
 263      /// Configuration
 264      config: NodeConfig,
 265      /// Shutdown signal
 266      shutdown: Arc<Notify>,
 267      /// Home Node configuration (for Edge nodes)
 268      home_node: Option<HomeNodeConfig>,
 269      /// Replication service for Home Node sync
 270      replication: Option<ReplicationService>,
 271      /// Trust Engine for Circle operations
 272      trust_engine: TrustEngine,
 273      /// DHT Value Store (content discovery)
 274      dht_store: Arc<RwLock<ValueStore>>,
 275      /// DHT Routing Table (peer discovery)
 276      dht_routing: Arc<RwLock<DhtRoutingTable>>,
 277      /// Service handlers (generic transport dispatch)
 278      /// service_id -> Sender<ServiceEvent>
 279      service_handlers: Arc<DashMap<u64, mpsc::Sender<ServiceEvent>>>,
 280  }
 281  
 282  impl Node {
 283      /// Create a new node with a fresh machine identity
 284      pub fn new(config: NodeConfig) -> Result<Self, NodeError> {
 285          let machine_identity = MachineIdentity::generate();
 286          Self::with_machine_identity(machine_identity, config)
 287      }
 288  
 289      /// Create a node with an existing identity (backwards compatibility)
 290      pub fn with_identity(identity: SigningKey, config: NodeConfig) -> Result<Self, NodeError> {
 291          Self::with_machine_identity(MachineIdentity::new(identity), config)
 292      }
 293  
 294      /// Create a node with an existing machine identity
 295      pub fn with_machine_identity(machine_identity: MachineIdentity, config: NodeConfig) -> Result<Self, NodeError> {
 296          let public_key = machine_identity.verifying_key();
 297          let address = address_for_key(&public_key);
 298  
 299          info!(
 300              address = %address,
 301              "Initializing Abzu node"
 302          );
 303  
 304          // Initialize storage
 305          let store = sled::open(&config.storage_path)?;
 306  
 307          // Initialize multi-tenant storage via UserRegistry
 308          let storage_backend = Arc::new(SledBackend::new(store.clone())?);
 309          let user_registry = UserRegistry::new(storage_backend);
 310          
 311          // For single-user nodes, bind machine identity as the user
 312          // Multi-user infrastructure nodes will manage binding explicitly
 313          let user_id = machine_identity.public_bytes();
 314          let current_user = user_registry.bind_user(user_id);
 315          
 316          // Legacy tree for trust engine (TODO: migrate)
 317          let circles_trust = store.open_tree("circles_trust")?;
 318  
 319          // Initialize service handlers map
 320          let service_handlers = Arc::new(DashMap::<u64, mpsc::Sender<ServiceEvent>>::new());
 321  
 322          // Load message ID counter from dedicated key (O(1) startup)
 323          let next_msg_id = store
 324              .get(MSG_ID_COUNTER_KEY)?
 325              .and_then(|bytes| bytes.as_ref().try_into().ok())
 326              .map(u64::from_be_bytes)
 327              .unwrap_or(1);
 328  
 329          // Initialize router
 330          let router = RoutingTable::new(&public_key, TreeCoords::root());
 331  
 332          // Initialize peer persistence for submarine test
 333          let known_peers_tree = store.open_tree("known_peers")?;
 334          let known_peers = PeerStore::new(known_peers_tree);
 335  
 336          // Log known peers count for restart awareness
 337          let peer_count = known_peers.count();
 338          if peer_count > 0 {
 339              info!(known_peers = peer_count, "Loaded persisted peers for reconnection");
 340          }
 341  
 342          // Initialize replication service if home node configured
 343          let (home_node, replication) = if let Some(ref home) = config.home_node {
 344              let repl_tree = store.open_tree("replication")?;
 345              let repl_svc = ReplicationService::new(repl_tree);
 346              info!(
 347                  home_peer = %hex::encode(&home.peer_id[..8]),
 348                  pending = repl_svc.pending_count(),
 349                  "Initialized Home Node replication"
 350              );
 351              (Some(home.clone()), Some(repl_svc))
 352          } else {
 353              (None, None)
 354          };
 355  
 356          // Initialize DHT components
 357          // Note: DhtRoutingTable uses the same Node ID (public key) but different logic (Kademlia)
 358          let dht_routing = DhtRoutingTable::new(machine_identity.public_bytes());
 359          let dht_store = ValueStore::new();
 360  
 361          Ok(Self {
 362              machine_identity,
 363              address,
 364              router: Arc::new(RwLock::new(router)),
 365              peers: Arc::new(Mutex::new(HashMap::new())),
 366              store,
 367              user_registry,
 368              current_user,
 369              known_peers,
 370              next_msg_id: AtomicU64::new(next_msg_id),
 371              pending_fetches: Arc::new(DashMap::new()),
 372              config,
 373              shutdown: Arc::new(Notify::new()),
 374              home_node,
 375              replication,
 376              trust_engine: TrustEngine::new(circles_trust),
 377              dht_store: Arc::new(RwLock::new(dht_store)),
 378              dht_routing: Arc::new(RwLock::new(dht_routing)),
 379              service_handlers,
 380          })
 381      }
 382  
 383      /// Get our public key (machine identity)
 384      pub fn public_key(&self) -> VerifyingKey {
 385          self.machine_identity.verifying_key()
 386      }
 387  
 388      /// Get our address
 389      pub fn address(&self) -> &Address {
 390          &self.address
 391      }
 392  
 393      /// Get our peer key (32-byte public key)
 394      pub fn peer_key(&self) -> PeerKey {
 395          self.machine_identity.public_bytes()
 396      }
 397      
 398      /// Get the machine identity
 399      pub fn machine_identity(&self) -> &MachineIdentity {
 400          &self.machine_identity
 401      }
 402  
 403      /// Get a reference to the Trust Engine
 404      pub fn trust_engine(&self) -> &TrustEngine {
 405          &self.trust_engine
 406      }
 407  
 408      /// Get the data directory path
 409      pub fn data_dir(&self) -> std::path::PathBuf {
 410          std::path::PathBuf::from(&self.config.storage_path)
 411      }
 412  
 413      /// Get a reference to the router
 414      pub fn router(&self) -> Arc<RwLock<RoutingTable>> {
 415          Arc::clone(&self.router)
 416      }
 417  
 418      /// Get a reference to the peer connections
 419      pub fn peers(&self) -> Arc<Mutex<HashMap<PeerKey, PeerConnection>>> {
 420          Arc::clone(&self.peers)
 421      }
 422  
 423      /// Register a service handler
 424      ///
 425      /// Returns a receiver for incoming service events.
 426      pub fn register_service(&self, service_id: u64) -> mpsc::Receiver<ServiceEvent> {
 427          let (tx, rx) = mpsc::channel(100); // Buffer 100 events
 428          self.service_handlers.insert(service_id, tx);
 429          rx
 430      }
 431  
 432      /// Get a service handler
 433      pub fn get_service_handler(&self, service_id: u64) -> Option<mpsc::Sender<ServiceEvent>> {
 434          self.service_handlers.get(&service_id).map(|ref_pair| ref_pair.value().clone())
 435      }
 436  
 437      /// Get a reference to the content store
 438      pub fn store(&self) -> &Db {
 439          &self.store
 440      }
 441  
 442      /// Get the shutdown notifier
 443      pub fn shutdown(&self) -> Arc<Notify> {
 444          Arc::clone(&self.shutdown)
 445      }
 446  
 447      /// Get the pending fetches map
 448      pub fn pending_fetches(&self) -> Arc<DashMap<[u8; 32], Arc<Notify>>> {
 449          Arc::clone(&self.pending_fetches)
 450      }
 451  
 452      /// Get Home Node configuration (if configured)
 453      pub fn home_node(&self) -> Option<&HomeNodeConfig> {
 454          self.home_node.as_ref()
 455      }
 456  
 457      /// Queue content for replication to Home Node
 458      ///
 459      /// Returns Ok(true) if queued, Ok(false) if no Home Node configured
 460      pub fn queue_home_replication(&self, cid: [u8; 32]) -> Result<bool, NodeError> {
 461          match (&self.replication, &self.home_node) {
 462              (Some(repl), Some(home)) if home.enable_storage_sync => {
 463                  repl.queue(cid, home.peer_id)
 464                      .map_err(|e| NodeError::Replication(e.to_string()))?;
 465                  debug!(
 466                      cid = %hex::encode(&cid[..8]),
 467                      "Queued for Home Node replication"
 468                  );
 469                  Ok(true)
 470              }
 471              _ => Ok(false),
 472          }
 473      }
 474  
 475      /// Get pending replication count
 476      pub fn pending_replication_count(&self) -> usize {
 477          self.replication.as_ref().map_or(0, |r| r.pending_count())
 478      }
 479  
 480      /// Get the known peers store (for submarine test reconnection)
 481      pub fn known_peers(&self) -> &PeerStore {
 482          &self.known_peers
 483      }
 484  
 485      /// Register a pending fetch and return the notifier
 486      pub fn register_pending_fetch(&self, cid: [u8; 32]) -> Arc<Notify> {
 487          let notify = Arc::new(Notify::new());
 488          self.pending_fetches.insert(cid, Arc::clone(&notify));
 489          notify
 490      }
 491  
 492      /// Notify that content has arrived (called when Chunk is received)
 493      pub fn notify_content_arrived(&self, cid: &[u8; 32]) {
 494          if let Some((_, notify)) = self.pending_fetches.remove(cid) {
 495              notify.notify_waiters();
 496          }
 497      }
 498  
 499      /// Get the configuration
 500      pub fn config(&self) -> &NodeConfig {
 501          &self.config
 502      }
 503  
 504      /// Get the node role (Edge, Desktop, Infrastructure)
 505      pub fn node_role(&self) -> NodeRole {
 506          self.config.node_role
 507      }
 508  
 509      /// Add a peer connection
 510      pub async fn add_peer(&self, key: PeerKey, conn: PeerConnection) {
 511          debug!(peer = ?key, "Adding peer connection");
 512          let mut peers = self.peers.lock().await;
 513          peers.insert(key, conn);
 514      }
 515  
 516      /// Send a frame to a specific peer
 517      pub async fn send(&self, target: PeerKey, frame: AbzuFrame) -> Result<(), NodeError> {
 518          let mut peers = self.peers.lock().await;
 519          if let Some(conn) = peers.get_mut(&target) {
 520              conn.interface.send_frame(&frame).await
 521                  .map_err(|e| NodeError::Transport(e))?;
 522              conn.touch();
 523              Ok(())
 524          } else {
 525              Err(NodeError::PeerNotFound(target))
 526          }
 527      }
 528  
 529      /// Remove a peer connection
 530      pub async fn remove_peer(&self, key: &PeerKey) -> Option<PeerConnection> {
 531          debug!(peer = ?key, "Removing peer connection");
 532          let mut peers = self.peers.lock().await;
 533          peers.remove(key)
 534      }
 535  
 536      /// Get peer count
 537      pub async fn peer_count(&self) -> usize {
 538          self.peers.lock().await.len()
 539      }
 540  
 541      /// Store content by CID
 542      pub fn store_chunk(&self, cid: &[u8; 32], data: &[u8]) -> Result<(), NodeError> {
 543          self.store.insert(cid, data)?;
 544          Ok(())
 545      }
 546  
 547      /// Retrieve content by CID
 548      pub fn get_chunk(&self, cid: &[u8; 32]) -> Result<Option<Vec<u8>>, NodeError> {
 549          Ok(self.store.get(cid)?.map(|v| v.to_vec()))
 550      }
 551  
 552      /// Store content and return its BLAKE3 hash (CID)
 553      /// This is the CAS (content-addressed storage) interface.
 554      pub fn store_content(&self, data: &[u8]) -> Result<[u8; 32], NodeError> {
 555          let hash = blake3::hash(data);
 556          let cid = *hash.as_bytes();
 557          self.store.insert(cid, data)?;
 558          Ok(cid)
 559      }
 560  
 561      /// Retrieve content by BLAKE3 hash
 562      pub fn get_content(&self, cid: &[u8; 32]) -> Result<Option<Vec<u8>>, NodeError> {
 563          self.get_chunk(cid)
 564      }
 565  
 566      /// List all stored content CIDs
 567      pub fn list_content(&self) -> Result<Vec<[u8; 32]>, NodeError> {
 568          let mut cids = Vec::new();
 569          for item in self.store.iter() {
 570              let (key, _) = item?;
 571              if key.len() == 32 {
 572                  let mut cid = [0u8; 32];
 573                  cid.copy_from_slice(&key);
 574                  cids.push(cid);
 575              }
 576          }
 577          Ok(cids)
 578      }
 579  
 580      /// Get total stored content size
 581      pub fn store_size(&self) -> Result<u64, NodeError> {
 582          let mut total = 0u64;
 583          for item in self.store.iter() {
 584              let (_, value) = item?;
 585              total += value.len() as u64;
 586          }
 587          Ok(total)
 588      }
 589  
 590      /// Signal shutdown
 591      pub fn signal_shutdown(&self) {
 592          info!("Shutdown signal received");
 593          self.shutdown.notify_waiters();
 594      }
 595  
 596      /// Sign data with our machine identity (for infrastructure operations)
 597      /// 
 598      /// This signs using the MACHINE identity, not a user identity.
 599      /// Use this for: routing, DHT operations, transport authentication.
 600      /// Do NOT use for: user messages, content ownership, personal signatures.
 601      pub fn machine_sign(&self, data: &[u8]) -> ed25519_dalek::Signature {
 602          self.machine_identity.sign(data)
 603      }
 604  
 605      // ===== DHT Methods =====
 606  
 607      /// Store a value in the DHT (local only for now, TODO: network push)
 608      pub async fn dht_put(&self, key: DhtKey, value: Vec<u8>, value_type: ValueType) -> Result<(), NodeError> {
 609          let now = std::time::SystemTime::now()
 610              .duration_since(std::time::UNIX_EPOCH)
 611              .unwrap()
 612              .as_secs();
 613  
 614          // Create stored value
 615          let mut stored = StoredValue {
 616              key,
 617              metadata: abzu_dht::store::ValueMetadata {
 618                  value_type,
 619                  publisher: self.machine_identity.public_bytes(), // Publisher is us
 620                  published_at: now,
 621                  expires_at: now + value_type.default_ttl(),
 622                  seq: 0,
 623              },
 624              payload: value,
 625              signature: [0u8; 64],
 626          };
 627  
 628          // Sign the value
 629          let signature = self.machine_identity.sign(&stored.signable_bytes());
 630          stored.signature = signature.to_bytes();
 631  
 632          self.dht_store.write().await.store(stored, now)
 633               .map_err(|e| NodeError::DhtError(e.to_string()))?;
 634          Ok(())
 635      }
 636  
 637      /// Update a value in the DHT (e.g. refreshing a mutable record)
 638      pub async fn dht_update(&self, key: DhtKey, value: Vec<u8>, value_type: ValueType, seq: u64) -> Result<(), NodeError> {
 639          let now = std::time::SystemTime::now()
 640              .duration_since(std::time::UNIX_EPOCH)
 641              .unwrap()
 642              .as_secs();
 643  
 644          let mut stored = StoredValue {
 645              key,
 646              metadata: abzu_dht::store::ValueMetadata {
 647                  value_type,
 648                  publisher: self.machine_identity.public_bytes(),
 649                  published_at: now,
 650                  expires_at: now + value_type.default_ttl(),
 651                  seq,
 652              },
 653              payload: value,
 654              signature: [0u8; 64],
 655          };
 656  
 657          // Sign the value
 658          let signature = self.machine_identity.sign(&stored.signable_bytes());
 659          stored.signature = signature.to_bytes();
 660           
 661          self.dht_store.write().await.store(stored, now)
 662              .map_err(|e| NodeError::DhtError(e.to_string()))?;
 663          Ok(())
 664      }
 665  
 666      /// Get a value from the DHT (local only for now, TODO: network lookup)
 667      pub async fn dht_get(&self, key: &DhtKey) -> Result<Option<Vec<u8>>, NodeError> {
 668          let now = std::time::SystemTime::now()
 669              .duration_since(std::time::UNIX_EPOCH)
 670              .unwrap()
 671              .as_secs();
 672  
 673          // Return the simple value payload
 674          let store = self.dht_store.read().await;
 675          // get returns a Vec<&StoredValue>, we just take the first one for now
 676          Ok(store.get(key, now).first().map(|v| v.payload.clone()))
 677      }
 678      
 679      /// Get full stored value from DHT
 680      pub async fn dht_get_full(&self, key: &DhtKey) -> Result<Option<StoredValue>, NodeError> {
 681          let now = std::time::SystemTime::now()
 682              .duration_since(std::time::UNIX_EPOCH)
 683              .unwrap()
 684              .as_secs();
 685  
 686          let store = self.dht_store.read().await;
 687          Ok(store.get(key, now).first().cloned().cloned())
 688      }
 689  
 690      // ===== Chat Methods =====
 691  
 692      /// Get the next message ID (monotonically increasing, persisted to sled)
 693      pub fn next_message_id(&self) -> u64 {
 694          let id = self.next_msg_id.fetch_add(1, Ordering::SeqCst);
 695          // Persist the new counter value (best-effort, don't fail on persistence error)
 696          let _ = self.store.insert(MSG_ID_COUNTER_KEY, &(id + 1).to_be_bytes());
 697          id
 698      }
 699  
 700      /// Create a chat storage key: timestamp_be || msg_id_be
 701      fn chat_key(timestamp: u64, msg_id: u64) -> [u8; 16] {
 702          let mut key = [0u8; 16];
 703          key[..8].copy_from_slice(&timestamp.to_be_bytes());
 704          key[8..].copy_from_slice(&msg_id.to_be_bytes());
 705          key
 706      }
 707  
 708      /// Store an outbound chat message (status = Pending)
 709      pub fn store_outbound_chat(
 710          &self,
 711          recipient: [u8; 32],
 712          content: Vec<u8>,
 713          timestamp: u64,
 714      ) -> Result<StoredMessage, NodeError> {
 715          let id = self.next_message_id();
 716          let msg = StoredMessage {
 717              id,
 718              peer: recipient,
 719              content,
 720              timestamp,
 721              direction: MessageDirection::Outbound,
 722              status: MessageStatus::Pending,
 723          };
 724  
 725          let key = Self::chat_key(timestamp, id);
 726          self.current_user.put(crate::storage::collections::CHATS, &key, &msg)
 727              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 728          debug!(id = id, peer = ?recipient, "Stored outbound chat");
 729          Ok(msg)
 730      }
 731  
 732      /// Store an inbound chat message (status = Delivered)
 733      pub fn store_inbound_chat(
 734          &self,
 735          sender: [u8; 32],
 736          msg_id: u64,
 737          content: Vec<u8>,
 738          timestamp: u64,
 739      ) -> Result<StoredMessage, NodeError> {
 740          let msg = StoredMessage {
 741              id: msg_id,
 742              peer: sender,
 743              content,
 744              timestamp,
 745              direction: MessageDirection::Inbound,
 746              status: MessageStatus::Delivered,
 747          };
 748  
 749          let key = Self::chat_key(timestamp, msg_id);
 750          self.current_user.put(crate::storage::collections::CHATS, &key, &msg)
 751              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 752          debug!(id = msg_id, peer = ?sender, "Stored inbound chat");
 753          Ok(msg)
 754      }
 755  
 756      /// Mark a message as delivered
 757      pub fn mark_delivered(&self, msg_id: u64) -> Result<(), NodeError> {
 758          use crate::storage::collections::CHATS;
 759          // Find the message by scanning (could optimize with secondary index)
 760          let entries = self.current_user.scan_collection(CHATS)
 761              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 762          
 763          for (key, value) in entries {
 764              let mut msg: StoredMessage = codec::decode(&value)?;
 765              if msg.id == msg_id && msg.direction == MessageDirection::Outbound {
 766                  msg.status = MessageStatus::Delivered;
 767                  self.current_user.put(CHATS, &key, &msg)
 768                      .map_err(|e| NodeError::Serialization(e.to_string()))?;
 769                  debug!(id = msg_id, "Marked message delivered");
 770                  return Ok(());
 771              }
 772          }
 773          Err(NodeError::MessageNotFound(msg_id))
 774      }
 775  
 776      /// Mark a message as read
 777      pub fn mark_read(&self, msg_id: u64) -> Result<(), NodeError> {
 778          use crate::storage::collections::CHATS;
 779          let entries = self.current_user.scan_collection(CHATS)
 780              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 781          
 782          for (key, value) in entries {
 783              let mut msg: StoredMessage = codec::decode(&value)?;
 784              if msg.id == msg_id && msg.direction == MessageDirection::Outbound {
 785                  msg.status = MessageStatus::Read;
 786                  self.current_user.put(CHATS, &key, &msg)
 787                      .map_err(|e| NodeError::Serialization(e.to_string()))?;
 788                  debug!(id = msg_id, "Marked message read");
 789                  return Ok(());
 790              }
 791          }
 792          Err(NodeError::MessageNotFound(msg_id))
 793      }
 794  
 795      /// Get chat history with a specific peer (sorted by timestamp)
 796      pub fn get_chat_history(&self, peer: &[u8; 32]) -> Result<Vec<StoredMessage>, NodeError> {
 797          use crate::storage::collections::CHATS;
 798          let entries = self.current_user.scan_collection(CHATS)
 799              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 800          
 801          let mut messages = Vec::new();
 802          for (_, value) in entries {
 803              let msg: StoredMessage = codec::decode(&value)?;
 804              if msg.peer == *peer {
 805                  messages.push(msg);
 806              }
 807          }
 808          // Already sorted by timestamp due to key structure
 809          Ok(messages)
 810      }
 811  
 812      /// Get all pending outbound messages (for retry logic)
 813      /// Returns messages that are still in Pending status
 814      pub fn get_pending_messages(&self) -> Result<Vec<StoredMessage>, NodeError> {
 815          use crate::storage::collections::CHATS;
 816          let entries = self.current_user.scan_collection(CHATS)
 817              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 818          
 819          let mut pending = Vec::new();
 820          for (_, value) in entries {
 821              let msg: StoredMessage = codec::decode(&value)?;
 822              if msg.direction == MessageDirection::Outbound && msg.status == MessageStatus::Pending {
 823                  pending.push(msg);
 824              }
 825          }
 826          Ok(pending)
 827      }
 828  
 829      /// Mark a message as failed (after max retries)
 830      pub fn mark_failed(&self, msg_id: u64) -> Result<(), NodeError> {
 831          use crate::storage::collections::CHATS;
 832          let entries = self.current_user.scan_collection(CHATS)
 833              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 834          
 835          for (key, value) in entries {
 836              let mut msg: StoredMessage = codec::decode(&value)?;
 837              if msg.id == msg_id && msg.direction == MessageDirection::Outbound {
 838                  msg.status = MessageStatus::Failed;
 839                  self.current_user.put(CHATS, &key, &msg)
 840                      .map_err(|e| NodeError::Serialization(e.to_string()))?;
 841                  debug!(id = msg_id, "Marked message failed");
 842                  return Ok(());
 843              }
 844          }
 845          Err(NodeError::MessageNotFound(msg_id))
 846      }
 847  
 848      // ===== Contact Methods =====
 849  
 850      /// Add a contact
 851      pub fn add_contact(&self, alias: String, pubkey: [u8; 32]) -> Result<Contact, NodeError> {
 852          use crate::storage::collections::CONTACTS;
 853          let contact = Contact {
 854              alias,
 855              pubkey,
 856              added_at: std::time::SystemTime::now()
 857                  .duration_since(std::time::UNIX_EPOCH)
 858                  .unwrap_or_default()
 859                  .as_millis() as u64,
 860          };
 861  
 862          self.current_user.put(CONTACTS, &pubkey, &contact)
 863              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 864          info!(alias = %contact.alias, "Added contact");
 865          Ok(contact)
 866      }
 867  
 868      /// Remove a contact
 869      pub fn remove_contact(&self, pubkey: &[u8; 32]) -> Result<bool, NodeError> {
 870          use crate::storage::collections::CONTACTS;
 871          self.current_user.delete(CONTACTS, pubkey)
 872              .map_err(|e| NodeError::Serialization(e.to_string()))
 873      }
 874  
 875      /// Get all contacts
 876      pub fn get_contacts(&self) -> Result<Vec<Contact>, NodeError> {
 877          use crate::storage::collections::CONTACTS;
 878          let entries = self.current_user.scan_collection(CONTACTS)
 879              .map_err(|e| NodeError::Serialization(e.to_string()))?;
 880          
 881          let mut contacts = Vec::new();
 882          for (_, value) in entries {
 883              let contact: Contact = codec::decode(&value)?;
 884              contacts.push(contact);
 885          }
 886  
 887          // Sort by alias
 888          contacts.sort_by(|a, b| a.alias.cmp(&b.alias));
 889          Ok(contacts)
 890      }
 891  
 892      /// Get a contact by pubkey
 893      pub fn get_contact(&self, pubkey: &[u8; 32]) -> Result<Option<Contact>, NodeError> {
 894          use crate::storage::collections::CONTACTS;
 895          self.current_user.get::<Contact>(CONTACTS, pubkey)
 896              .map_err(|e| NodeError::Serialization(e.to_string()))
 897      }
 898  
 899      // ─────────────────────────────────────────────────────────────────────────
 900      // Circle (Group) Methods
 901      // ─────────────────────────────────────────────────────────────────────────
 902  
 903      /// Create a new circle
 904      pub fn create_circle(&self, name: String) -> Result<Circle, NodeError> {
 905          self.create_circle_with_policy(name, TrustPolicy::default())
 906      }
 907      
 908      /// Create a new Circle with a specific trust policy
 909      /// 
 910      /// # Trust Policy Options
 911      /// - `TrustPolicy::Anonymous` (default): No trust ledger, no invite tree. Safest for adversarial contexts.
 912      /// - `TrustPolicy::Private { prune_mode }`: Minimal records, only current membership.
 913      /// - `TrustPolicy::Accountable { ledger_mode, prune_mode }`: Full audit trail for moderated communities.
 914      pub fn create_circle_with_policy(&self, name: String, trust_policy: TrustPolicy) -> Result<Circle, NodeError> {
 915          let founder_key = self.public_key().to_bytes();
 916          let circle = self.trust_engine.create_circle(founder_key, name, trust_policy)?;
 917          Ok(circle)
 918      }
 919      
 920      /// Get a circle by ID
 921      pub fn get_circle(&self, id: &[u8; 32]) -> Result<Option<Circle>, NodeError> {
 922          Ok(self.trust_engine.get_circle(id)?)
 923      }
 924      
 925      /// List all circles we're a member of
 926      pub fn list_circles(&self) -> Result<Vec<Circle>, NodeError> {
 927          Ok(self.trust_engine.list_circles()?)
 928      }
 929      
 930      /// Add a member to a circle (must be founder/admin)
 931      /// 
 932      /// `caller_pubkey` is the public key of the account performing this action.
 933      /// Note: This is a legacy wrapper - use `invite_member` for the proper trust flow.
 934      #[allow(dead_code)]
 935      pub fn add_circle_member(&self, caller_pubkey: [u8; 32], circle_id: &[u8; 32], pubkey: [u8; 32], _role: MemberRole) -> Result<Circle, NodeError> {
 936          // Delegate to invite_member with empty signature for legacy compatibility
 937          self.invite_member(caller_pubkey, circle_id, pubkey, Vec::new())
 938      }
 939      
 940      /// Invite a new member to a circle with full trust protocol.
 941      /// 
 942      /// This is the proper invite flow with:
 943      /// - Capacity tracking (inviter must have invites_remaining)
 944      /// - Depth tracking (MAX_INVITE_DEPTH enforced)
 945      /// - Ledger recording (TrustAction::Invite)
 946      /// 
 947      /// `caller_pubkey` is the public key of the account performing this action.
 948      /// The signature should be over: circle_id || invitee_pubkey || timestamp
 949      pub fn invite_member(
 950          &self,
 951          caller_pubkey: [u8; 32],
 952          circle_id: &[u8; 32],
 953          invitee_pubkey: [u8; 32],
 954          invite_signature: Vec<u8>,
 955      ) -> Result<Circle, NodeError> {
 956          let circle = self.trust_engine.invite_member(
 957              caller_pubkey,
 958              circle_id,
 959              invitee_pubkey,
 960              invite_signature,
 961          )?;
 962          Ok(circle)
 963      }
 964      
 965      /// Remove a member from a circle
 966      pub fn remove_circle_member(&self, circle_id: &[u8; 32], pubkey: &[u8; 32]) -> Result<Circle, NodeError> {
 967          let circle = self.trust_engine.remove_member(circle_id, pubkey)?;
 968          Ok(circle)
 969      }
 970      
 971      // ─────────────────────────────────────────────────────────────────────────────
 972      // Trust Protocol Methods
 973      // ─────────────────────────────────────────────────────────────────────────────
 974      
 975      /// Vouch for an existing member in a circle (secondary endorsement).
 976      /// 
 977      /// Vouches are weaker than invites but can:
 978      /// - Increase a member's standing in the community
 979      /// - Help members qualify for admin elevation (requires MIN_VOUCHES_FOR_ADMIN)
 980      /// 
 981      /// Returns error if:
 982      /// - Circle not found
 983      /// - Voucher or target not a member
 984      /// - Voucher has no vouch capacity
 985      /// - Already vouched for this member
 986      /// 
 987      /// `caller_pubkey` is the public key of the account performing this action.
 988      pub fn vouch_member(
 989          &self,
 990          caller_pubkey: [u8; 32],
 991          circle_id: &[u8; 32],
 992          target_pubkey: [u8; 32],
 993          signature: Vec<u8>,
 994      ) -> Result<Circle, NodeError> {
 995          let circle = self.trust_engine.vouch_member(
 996              caller_pubkey,
 997              circle_id,
 998              target_pubkey,
 999              signature,
1000          )?;
1001          Ok(circle)
1002      }
1003      
1004      /// Prune a member and all their invite descendants from a circle.
1005      /// 
1006      /// This is the accountability mechanism: if you invite someone who
1007      /// misbehaves, an admin can prune them (and you may lose capacity
1008      /// as a consequence).
1009      /// 
1010      /// Returns the list of removed public keys.
1011      /// 
1012      /// `caller_pubkey` is the public key of the account performing this action.
1013      pub fn prune_branch(
1014          &self,
1015          caller_pubkey: [u8; 32],
1016          circle_id: &[u8; 32],
1017          target_pubkey: [u8; 32],
1018          signature: Vec<u8>,
1019          reason_hash: Option<[u8; 32]>,
1020      ) -> Result<(Circle, Vec<[u8; 32]>), NodeError> {
1021          let result = self.trust_engine.prune_branch(
1022              caller_pubkey,
1023              circle_id,
1024              target_pubkey,
1025              signature,
1026              reason_hash,
1027          )?;
1028          Ok(result)
1029      }
1030      
1031      /// Encrypt a message for a circle using its symmetric key
1032      /// Returns: nonce (12 bytes) || ciphertext
1033      pub fn encrypt_circle_message(&self, circle_id: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>, NodeError> {
1034          use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::Aead};
1035          use rand::RngCore;
1036          
1037          // Get circle to access its symmetric key
1038          let circle = self.get_circle(circle_id)?
1039              .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?;
1040          
1041          // Create cipher with circle's symmetric key
1042          let cipher = ChaCha20Poly1305::new_from_slice(&circle.symmetric_key)
1043              .map_err(|e| NodeError::Crypto(e.to_string()))?;
1044          
1045          // Generate random nonce (12 bytes)
1046          let mut nonce_bytes = [0u8; 12];
1047          rand::thread_rng().fill_bytes(&mut nonce_bytes);
1048          let nonce = chacha20poly1305::Nonce::from_slice(&nonce_bytes);
1049          
1050          // Encrypt plaintext
1051          let ciphertext = cipher.encrypt(nonce, plaintext)
1052              .map_err(|e| NodeError::Crypto(e.to_string()))?;
1053          
1054          // Prepend nonce to ciphertext
1055          let mut result = Vec::with_capacity(12 + ciphertext.len());
1056          result.extend_from_slice(&nonce_bytes);
1057          result.extend_from_slice(&ciphertext);
1058          
1059          Ok(result)
1060      }
1061      
1062      /// Decrypt a message from a circle using its symmetric key
1063      /// Input: nonce (12 bytes) || ciphertext
1064      pub fn decrypt_circle_message(&self, circle_id: &[u8; 32], encrypted: &[u8]) -> Result<Vec<u8>, NodeError> {
1065          use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::Aead};
1066          
1067          if encrypted.len() < 12 {
1068              return Err(NodeError::Crypto("Encrypted message too short".to_string()));
1069          }
1070          
1071          // Get circle to access its symmetric key
1072          let circle = self.get_circle(circle_id)?
1073              .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?;
1074          
1075          // Create cipher with circle's symmetric key
1076          let cipher = ChaCha20Poly1305::new_from_slice(&circle.symmetric_key)
1077              .map_err(|e| NodeError::Crypto(e.to_string()))?;
1078          
1079          // Extract nonce and ciphertext
1080          let nonce = chacha20poly1305::Nonce::from_slice(&encrypted[..12]);
1081          let ciphertext = &encrypted[12..];
1082          
1083          // Decrypt
1084          cipher.decrypt(nonce, ciphertext)
1085              .map_err(|e| NodeError::Crypto(format!("Decryption failed: {}", e)))
1086      }
1087      
1088      /// Decrypt a privacy-preserving CircleEnvelope from a circle
1089      /// 
1090      /// SECURITY: This envelope contains the sender and timestamp metadata
1091      /// which are protected from network observers (addressing red team finding)
1092      /// 
1093      /// Input format: nonce (12 bytes) || ciphertext(serialized CircleEnvelope)
1094      pub fn decrypt_circle_envelope(&self, circle_id: &[u8; 32], encrypted: &[u8]) -> Result<abzu_transport::CircleEnvelope, NodeError> {
1095          // First decrypt to get the raw bytes
1096          let decrypted = self.decrypt_circle_message(circle_id, encrypted)?;
1097          
1098          // Then deserialize using the transport layer's abstraction
1099          abzu_transport::CircleEnvelope::decode(&decrypted)
1100              .map_err(|e| NodeError::Serialization(format!("Failed to deserialize CircleEnvelope: {}", e)))
1101      }
1102      
1103      /// Store an inbound circle message
1104      pub fn store_inbound_circle_message(&self, circle_id: [u8; 32], msg_id: u64, sender: [u8; 32], content: Vec<u8>, timestamp: u64) -> Result<StoredCircleMessage, NodeError> {
1105          use crate::storage::collections::CIRCLE_MESSAGES;
1106          let msg = StoredCircleMessage {
1107              circle_id,
1108              id: msg_id,
1109              sender,
1110              content,
1111              timestamp,
1112              direction: MessageDirection::Inbound,
1113          };
1114          
1115          // Key: circle_id (32) + timestamp (8, big-endian for ordering) + msg_id (8)
1116          let mut key = Vec::with_capacity(48);
1117          key.extend_from_slice(&circle_id);
1118          key.extend_from_slice(&timestamp.to_be_bytes());
1119          key.extend_from_slice(&msg_id.to_be_bytes());
1120          
1121          self.current_user.put(CIRCLE_MESSAGES, &key, &msg)
1122              .map_err(|e| NodeError::Serialization(e.to_string()))?;
1123          
1124          Ok(msg)
1125      }
1126      
1127      /// Store an outbound circle message
1128      pub fn store_outbound_circle_message(&self, circle_id: [u8; 32], content: Vec<u8>, timestamp: u64) -> Result<StoredCircleMessage, NodeError> {
1129          use crate::storage::collections::CIRCLE_MESSAGES;
1130          let msg_id = self.next_msg_id.fetch_add(1, Ordering::Relaxed);
1131          
1132          let msg = StoredCircleMessage {
1133              circle_id,
1134              id: msg_id,
1135              sender: self.public_key().to_bytes(),
1136              content,
1137              timestamp,
1138              direction: MessageDirection::Outbound,
1139          };
1140          
1141          // Key: circle_id (32) + timestamp (8, big-endian for ordering) + msg_id (8)
1142          let mut key = Vec::with_capacity(48);
1143          key.extend_from_slice(&circle_id);
1144          key.extend_from_slice(&timestamp.to_be_bytes());
1145          key.extend_from_slice(&msg_id.to_be_bytes());
1146          
1147          self.current_user.put(CIRCLE_MESSAGES, &key, &msg)
1148              .map_err(|e| NodeError::Serialization(e.to_string()))?;
1149          
1150          Ok(msg)
1151      }
1152      
1153      /// Get message history for a circle
1154      pub fn get_circle_history(&self, circle_id: &[u8; 32], limit: usize) -> Result<Vec<StoredCircleMessage>, NodeError> {
1155          use crate::storage::collections::CIRCLE_MESSAGES;
1156          
1157          // Use prefix scan for O(log N) lookup instead of O(N) full scan
1158          let entries = self.current_user.scan_with_prefix(CIRCLE_MESSAGES, circle_id)?;
1159          
1160          let mut messages: Vec<StoredCircleMessage> = entries
1161              .into_iter()
1162              .map(|(_, value)| codec::decode(&value))
1163              .collect::<Result<Vec<_>, _>>()?;
1164          
1165          // Already sorted by timestamp+msg_id due to key structure
1166          // Take last N messages
1167          if messages.len() > limit {
1168              messages = messages.split_off(messages.len() - limit);
1169          }
1170          
1171          Ok(messages)
1172      }
1173  }
1174  
1175  impl std::fmt::Debug for Node {
1176      fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1177          f.debug_struct("Node")
1178              .field("address", &self.address)
1179              .field("storage_path", &self.config.storage_path)
1180              .finish_non_exhaustive()
1181      }
1182  }
1183  
1184  #[cfg(test)]
1185  mod tests {
1186      use super::*;
1187      use tempfile::tempdir;
1188  
1189      #[test]
1190      fn test_node_creation() {
1191          let tmp = tempdir().unwrap();
1192          let config = NodeConfig {
1193              storage_path: tmp.path().to_str().unwrap().to_string(),
1194              ..Default::default()
1195          };
1196  
1197          let node = Node::new(config).unwrap();
1198          assert!(node.address().is_valid());
1199      }
1200  
1201      #[test]
1202      fn test_chunk_storage() {
1203          let tmp = tempdir().unwrap();
1204          let config = NodeConfig {
1205              storage_path: tmp.path().to_str().unwrap().to_string(),
1206              ..Default::default()
1207          };
1208  
1209          let node = Node::new(config).unwrap();
1210  
1211          let cid = [0xAB; 32];
1212          let data = b"hello world";
1213  
1214          node.store_chunk(&cid, data).unwrap();
1215          let retrieved = node.get_chunk(&cid).unwrap().unwrap();
1216          assert_eq!(retrieved, data);
1217      }
1218  }