node.rs
1 //! Node Context 2 //! 3 //! The core state container for an Abzu mesh node. 4 5 use std::collections::HashMap; 6 use std::sync::Arc; 7 use std::sync::atomic::{AtomicU64, Ordering}; 8 9 use dashmap::DashMap; 10 use ed25519_dalek::{SigningKey, VerifyingKey}; 11 12 use crate::identity::MachineIdentity; 13 use serde::{Deserialize, Serialize}; 14 use sled::Db; 15 use thiserror::Error; 16 use tokio::sync::{Mutex, Notify, RwLock, mpsc}; 17 use tracing::{debug, info, warn}; 18 19 use abzu_router::{address_for_key, Address, PeerKey, RoutingTable, TreeCoords}; 20 use abzu_dht::{ 21 DhtKey, ValueStore, RoutingTable as DhtRoutingTable, StoredValue, ValueType, 22 }; 23 use abzu_transport::{AbzuInterface, AbzuInterfaceExt, AbzuFrame, TransportError}; 24 25 use crate::codec; 26 use crate::config::{HomeNodeConfig, SecurityTier, NodeRole}; 27 use crate::persistence::PeerStore; 28 use crate::replication::ReplicationService; 29 use crate::storage::{SledBackend, UserRegistry, UserStore, StorageError}; 30 use crate::trust::{ 31 Circle, MemberRole, TrustPolicy, TrustEngine, TrustError, 32 }; 33 34 /// Errors from node operations 35 #[derive(Error, Debug)] 36 pub enum NodeError { 37 #[error("Storage error: {0}")] 38 Storage(#[from] sled::Error), 39 40 #[error("Crypto error: {0}")] 41 Crypto(String), 42 43 #[error("Transport error: {0}")] 44 Transport(#[from] abzu_transport::TransportError), 45 46 #[error("Routing error: {0}")] 47 Routing(#[from] abzu_router::RoutingError), 48 49 #[error("Peer not found: {0:?}")] 50 PeerNotFound(PeerKey), 51 52 #[error("Node not running")] 53 NotRunning, 54 55 #[error("Serialization error: {0}")] 56 Serialization(String), 57 58 #[error("Message not found: {0}")] 59 MessageNotFound(u64), 60 61 #[error("Connection error: {0}")] 62 Connection(String), 63 64 #[error("No peer connection")] 65 NoPeerConnection, 66 67 #[error("DHT error: {0}")] 68 DhtError(String), 69 70 #[error("Operation timed out")] 71 Timeout, 72 73 #[error("Operation rejected (rate limited)")] 74 OperationRejected, 75 76 #[error("Validation error: {0}")] 77 Validation(String), 78 79 #[error("Replication error: {0}")] 80 Replication(String), 81 82 #[error("Trust error: {0}")] 83 Trust(#[from] TrustError), 84 85 #[error("User storage error: {0}")] 86 UserStorage(#[from] StorageError), 87 } 88 89 /// Service event for generic transport 90 #[derive(Debug, Clone)] 91 pub enum ServiceEvent { 92 /// Incoming request from a peer 93 Request { 94 request_id: u64, 95 requester: PeerKey, 96 payload: Vec<u8>, 97 }, 98 /// Response to a request 99 Response { 100 request_id: u64, 101 status: u8, 102 payload: Vec<u8>, 103 }, 104 } 105 106 /// Direction of a chat message 107 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 108 pub enum MessageDirection { 109 Inbound, 110 Outbound, 111 } 112 113 /// Delivery status of a chat message 114 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 115 pub enum MessageStatus { 116 Pending, 117 Delivered, 118 Read, 119 Failed, 120 } 121 122 /// A stored chat message 123 #[derive(Debug, Clone, Serialize, Deserialize)] 124 pub struct StoredMessage { 125 /// Unique message ID 126 pub id: u64, 127 /// The other party (sender for inbound, recipient for outbound) 128 pub peer: [u8; 32], 129 /// Message content (plaintext - encryption at transport layer) 130 pub content: Vec<u8>, 131 /// Unix timestamp (milliseconds) 132 pub timestamp: u64, 133 /// Direction of the message 134 pub direction: MessageDirection, 135 /// Delivery status 136 pub status: MessageStatus, 137 } 138 139 /// A contact in the address book 140 #[derive(Debug, Clone, Serialize, Deserialize)] 141 pub struct Contact { 142 /// Human-readable alias 143 pub alias: String, 144 /// Public key (32 bytes) 145 pub pubkey: [u8; 32], 146 /// When the contact was added (Unix ms) 147 pub added_at: u64, 148 } 149 150 // ───────────────────────────────────────────────────────────────────────────── 151 // Storage Constants 152 // ───────────────────────────────────────────────────────────────────────────── 153 154 /// Key for persisting the next message ID counter (O(1) startup) 155 const MSG_ID_COUNTER_KEY: &[u8] = b"__msg_id_counter__"; 156 157 /// A stored Circle message 158 #[derive(Debug, Clone, Serialize, Deserialize)] 159 pub struct StoredCircleMessage { 160 /// Circle ID 161 pub circle_id: [u8; 32], 162 /// Unique message ID within the circle 163 pub id: u64, 164 /// Sender's public key 165 pub sender: [u8; 32], 166 /// Message content (plaintext) 167 pub content: Vec<u8>, 168 /// Unix timestamp (milliseconds) 169 pub timestamp: u64, 170 /// Direction (we sent or received) 171 pub direction: MessageDirection, 172 } 173 174 /// Configuration for a node 175 #[derive(Debug, Clone)] 176 pub struct NodeConfig { 177 /// Path to the storage directory 178 pub storage_path: String, 179 /// Listen address for incoming connections 180 pub listen_addr: Option<String>, 181 /// Initial peers to connect to 182 pub bootstrap_peers: Vec<String>, 183 /// Heartbeat interval in milliseconds 184 pub heartbeat_ms: u64, 185 /// Security tier (Off, Blend, Shadow, Ghost) 186 pub security_tier: SecurityTier, 187 /// Node role (Edge, Desktop, Infrastructure) 188 pub node_role: NodeRole, 189 /// Optional Home Node for storage/mailbox delegation (Edge nodes) 190 pub home_node: Option<HomeNodeConfig>, 191 } 192 193 impl Default for NodeConfig { 194 fn default() -> Self { 195 Self { 196 storage_path: "./abzu-data".to_string(), 197 listen_addr: None, 198 bootstrap_peers: Vec::new(), 199 heartbeat_ms: 500, 200 security_tier: SecurityTier::Blend, 201 node_role: NodeRole::default(), 202 home_node: None, 203 } 204 } 205 } 206 207 /// An active peer connection 208 pub struct PeerConnection { 209 /// The transport interface 210 pub interface: Box<dyn AbzuInterface>, 211 /// Last activity timestamp 212 pub last_activity: std::time::Instant, 213 /// Bytes sent 214 pub tx_bytes: u64, 215 /// Bytes received 216 pub rx_bytes: u64, 217 } 218 219 impl PeerConnection { 220 pub fn new(interface: Box<dyn AbzuInterface>) -> Self { 221 Self { 222 interface, 223 last_activity: std::time::Instant::now(), 224 tx_bytes: 0, 225 rx_bytes: 0, 226 } 227 } 228 229 /// Check if this connection needs a keepalive 230 pub fn needs_keepalive(&self, idle_ms: u64) -> bool { 231 self.last_activity.elapsed().as_millis() as u64 >= idle_ms 232 } 233 234 /// Mark activity 235 pub fn touch(&mut self) { 236 self.last_activity = std::time::Instant::now(); 237 } 238 } 239 240 /// The main node state container 241 pub struct Node { 242 /// Machine identity (infrastructure: routing, DHT, transport) 243 /// This is NOT a user identity - it's tied to this device/node. 244 machine_identity: MachineIdentity, 245 /// Our derived address 246 address: Address, 247 /// The routing table (pure logic) 248 router: Arc<RwLock<RoutingTable>>, 249 /// Active peer connections 250 peers: Arc<Mutex<HashMap<PeerKey, PeerConnection>>>, 251 /// Content-addressed storage (sled) 252 store: Db, 253 /// User registry for multi-tenant storage 254 user_registry: UserRegistry, 255 /// Current user's store (bound on init for single-user nodes) 256 current_user: Arc<UserStore>, 257 /// Known peers for reconnection (submarine test) 258 known_peers: PeerStore, 259 /// Monotonic message ID counter 260 next_msg_id: AtomicU64, 261 /// Pending content fetches - notifies when content arrives 262 pending_fetches: Arc<DashMap<[u8; 32], Arc<Notify>>>, 263 /// Configuration 264 config: NodeConfig, 265 /// Shutdown signal 266 shutdown: Arc<Notify>, 267 /// Home Node configuration (for Edge nodes) 268 home_node: Option<HomeNodeConfig>, 269 /// Replication service for Home Node sync 270 replication: Option<ReplicationService>, 271 /// Trust Engine for Circle operations 272 trust_engine: TrustEngine, 273 /// DHT Value Store (content discovery) 274 dht_store: Arc<RwLock<ValueStore>>, 275 /// DHT Routing Table (peer discovery) 276 dht_routing: Arc<RwLock<DhtRoutingTable>>, 277 /// Service handlers (generic transport dispatch) 278 /// service_id -> Sender<ServiceEvent> 279 service_handlers: Arc<DashMap<u64, mpsc::Sender<ServiceEvent>>>, 280 } 281 282 impl Node { 283 /// Create a new node with a fresh machine identity 284 pub fn new(config: NodeConfig) -> Result<Self, NodeError> { 285 let machine_identity = MachineIdentity::generate(); 286 Self::with_machine_identity(machine_identity, config) 287 } 288 289 /// Create a node with an existing identity (backwards compatibility) 290 pub fn with_identity(identity: SigningKey, config: NodeConfig) -> Result<Self, NodeError> { 291 Self::with_machine_identity(MachineIdentity::new(identity), config) 292 } 293 294 /// Create a node with an existing machine identity 295 pub fn with_machine_identity(machine_identity: MachineIdentity, config: NodeConfig) -> Result<Self, NodeError> { 296 let public_key = machine_identity.verifying_key(); 297 let address = address_for_key(&public_key); 298 299 info!( 300 address = %address, 301 "Initializing Abzu node" 302 ); 303 304 // Initialize storage 305 let store = sled::open(&config.storage_path)?; 306 307 // Initialize multi-tenant storage via UserRegistry 308 let storage_backend = Arc::new(SledBackend::new(store.clone())?); 309 let user_registry = UserRegistry::new(storage_backend); 310 311 // For single-user nodes, bind machine identity as the user 312 // Multi-user infrastructure nodes will manage binding explicitly 313 let user_id = machine_identity.public_bytes(); 314 let current_user = user_registry.bind_user(user_id); 315 316 // Legacy tree for trust engine (TODO: migrate) 317 let circles_trust = store.open_tree("circles_trust")?; 318 319 // Initialize service handlers map 320 let service_handlers = Arc::new(DashMap::<u64, mpsc::Sender<ServiceEvent>>::new()); 321 322 // Load message ID counter from dedicated key (O(1) startup) 323 let next_msg_id = store 324 .get(MSG_ID_COUNTER_KEY)? 325 .and_then(|bytes| bytes.as_ref().try_into().ok()) 326 .map(u64::from_be_bytes) 327 .unwrap_or(1); 328 329 // Initialize router 330 let router = RoutingTable::new(&public_key, TreeCoords::root()); 331 332 // Initialize peer persistence for submarine test 333 let known_peers_tree = store.open_tree("known_peers")?; 334 let known_peers = PeerStore::new(known_peers_tree); 335 336 // Log known peers count for restart awareness 337 let peer_count = known_peers.count(); 338 if peer_count > 0 { 339 info!(known_peers = peer_count, "Loaded persisted peers for reconnection"); 340 } 341 342 // Initialize replication service if home node configured 343 let (home_node, replication) = if let Some(ref home) = config.home_node { 344 let repl_tree = store.open_tree("replication")?; 345 let repl_svc = ReplicationService::new(repl_tree); 346 info!( 347 home_peer = %hex::encode(&home.peer_id[..8]), 348 pending = repl_svc.pending_count(), 349 "Initialized Home Node replication" 350 ); 351 (Some(home.clone()), Some(repl_svc)) 352 } else { 353 (None, None) 354 }; 355 356 // Initialize DHT components 357 // Note: DhtRoutingTable uses the same Node ID (public key) but different logic (Kademlia) 358 let dht_routing = DhtRoutingTable::new(machine_identity.public_bytes()); 359 let dht_store = ValueStore::new(); 360 361 Ok(Self { 362 machine_identity, 363 address, 364 router: Arc::new(RwLock::new(router)), 365 peers: Arc::new(Mutex::new(HashMap::new())), 366 store, 367 user_registry, 368 current_user, 369 known_peers, 370 next_msg_id: AtomicU64::new(next_msg_id), 371 pending_fetches: Arc::new(DashMap::new()), 372 config, 373 shutdown: Arc::new(Notify::new()), 374 home_node, 375 replication, 376 trust_engine: TrustEngine::new(circles_trust), 377 dht_store: Arc::new(RwLock::new(dht_store)), 378 dht_routing: Arc::new(RwLock::new(dht_routing)), 379 service_handlers, 380 }) 381 } 382 383 /// Get our public key (machine identity) 384 pub fn public_key(&self) -> VerifyingKey { 385 self.machine_identity.verifying_key() 386 } 387 388 /// Get our address 389 pub fn address(&self) -> &Address { 390 &self.address 391 } 392 393 /// Get our peer key (32-byte public key) 394 pub fn peer_key(&self) -> PeerKey { 395 self.machine_identity.public_bytes() 396 } 397 398 /// Get the machine identity 399 pub fn machine_identity(&self) -> &MachineIdentity { 400 &self.machine_identity 401 } 402 403 /// Get a reference to the Trust Engine 404 pub fn trust_engine(&self) -> &TrustEngine { 405 &self.trust_engine 406 } 407 408 /// Get the data directory path 409 pub fn data_dir(&self) -> std::path::PathBuf { 410 std::path::PathBuf::from(&self.config.storage_path) 411 } 412 413 /// Get a reference to the router 414 pub fn router(&self) -> Arc<RwLock<RoutingTable>> { 415 Arc::clone(&self.router) 416 } 417 418 /// Get a reference to the peer connections 419 pub fn peers(&self) -> Arc<Mutex<HashMap<PeerKey, PeerConnection>>> { 420 Arc::clone(&self.peers) 421 } 422 423 /// Register a service handler 424 /// 425 /// Returns a receiver for incoming service events. 426 pub fn register_service(&self, service_id: u64) -> mpsc::Receiver<ServiceEvent> { 427 let (tx, rx) = mpsc::channel(100); // Buffer 100 events 428 self.service_handlers.insert(service_id, tx); 429 rx 430 } 431 432 /// Get a service handler 433 pub fn get_service_handler(&self, service_id: u64) -> Option<mpsc::Sender<ServiceEvent>> { 434 self.service_handlers.get(&service_id).map(|ref_pair| ref_pair.value().clone()) 435 } 436 437 /// Get a reference to the content store 438 pub fn store(&self) -> &Db { 439 &self.store 440 } 441 442 /// Get the shutdown notifier 443 pub fn shutdown(&self) -> Arc<Notify> { 444 Arc::clone(&self.shutdown) 445 } 446 447 /// Get the pending fetches map 448 pub fn pending_fetches(&self) -> Arc<DashMap<[u8; 32], Arc<Notify>>> { 449 Arc::clone(&self.pending_fetches) 450 } 451 452 /// Get Home Node configuration (if configured) 453 pub fn home_node(&self) -> Option<&HomeNodeConfig> { 454 self.home_node.as_ref() 455 } 456 457 /// Queue content for replication to Home Node 458 /// 459 /// Returns Ok(true) if queued, Ok(false) if no Home Node configured 460 pub fn queue_home_replication(&self, cid: [u8; 32]) -> Result<bool, NodeError> { 461 match (&self.replication, &self.home_node) { 462 (Some(repl), Some(home)) if home.enable_storage_sync => { 463 repl.queue(cid, home.peer_id) 464 .map_err(|e| NodeError::Replication(e.to_string()))?; 465 debug!( 466 cid = %hex::encode(&cid[..8]), 467 "Queued for Home Node replication" 468 ); 469 Ok(true) 470 } 471 _ => Ok(false), 472 } 473 } 474 475 /// Get pending replication count 476 pub fn pending_replication_count(&self) -> usize { 477 self.replication.as_ref().map_or(0, |r| r.pending_count()) 478 } 479 480 /// Get the known peers store (for submarine test reconnection) 481 pub fn known_peers(&self) -> &PeerStore { 482 &self.known_peers 483 } 484 485 /// Register a pending fetch and return the notifier 486 pub fn register_pending_fetch(&self, cid: [u8; 32]) -> Arc<Notify> { 487 let notify = Arc::new(Notify::new()); 488 self.pending_fetches.insert(cid, Arc::clone(¬ify)); 489 notify 490 } 491 492 /// Notify that content has arrived (called when Chunk is received) 493 pub fn notify_content_arrived(&self, cid: &[u8; 32]) { 494 if let Some((_, notify)) = self.pending_fetches.remove(cid) { 495 notify.notify_waiters(); 496 } 497 } 498 499 /// Get the configuration 500 pub fn config(&self) -> &NodeConfig { 501 &self.config 502 } 503 504 /// Get the node role (Edge, Desktop, Infrastructure) 505 pub fn node_role(&self) -> NodeRole { 506 self.config.node_role 507 } 508 509 /// Add a peer connection 510 pub async fn add_peer(&self, key: PeerKey, conn: PeerConnection) { 511 debug!(peer = ?key, "Adding peer connection"); 512 let mut peers = self.peers.lock().await; 513 peers.insert(key, conn); 514 } 515 516 /// Send a frame to a specific peer 517 pub async fn send(&self, target: PeerKey, frame: AbzuFrame) -> Result<(), NodeError> { 518 let mut peers = self.peers.lock().await; 519 if let Some(conn) = peers.get_mut(&target) { 520 conn.interface.send_frame(&frame).await 521 .map_err(|e| NodeError::Transport(e))?; 522 conn.touch(); 523 Ok(()) 524 } else { 525 Err(NodeError::PeerNotFound(target)) 526 } 527 } 528 529 /// Remove a peer connection 530 pub async fn remove_peer(&self, key: &PeerKey) -> Option<PeerConnection> { 531 debug!(peer = ?key, "Removing peer connection"); 532 let mut peers = self.peers.lock().await; 533 peers.remove(key) 534 } 535 536 /// Get peer count 537 pub async fn peer_count(&self) -> usize { 538 self.peers.lock().await.len() 539 } 540 541 /// Store content by CID 542 pub fn store_chunk(&self, cid: &[u8; 32], data: &[u8]) -> Result<(), NodeError> { 543 self.store.insert(cid, data)?; 544 Ok(()) 545 } 546 547 /// Retrieve content by CID 548 pub fn get_chunk(&self, cid: &[u8; 32]) -> Result<Option<Vec<u8>>, NodeError> { 549 Ok(self.store.get(cid)?.map(|v| v.to_vec())) 550 } 551 552 /// Store content and return its BLAKE3 hash (CID) 553 /// This is the CAS (content-addressed storage) interface. 554 pub fn store_content(&self, data: &[u8]) -> Result<[u8; 32], NodeError> { 555 let hash = blake3::hash(data); 556 let cid = *hash.as_bytes(); 557 self.store.insert(cid, data)?; 558 Ok(cid) 559 } 560 561 /// Retrieve content by BLAKE3 hash 562 pub fn get_content(&self, cid: &[u8; 32]) -> Result<Option<Vec<u8>>, NodeError> { 563 self.get_chunk(cid) 564 } 565 566 /// List all stored content CIDs 567 pub fn list_content(&self) -> Result<Vec<[u8; 32]>, NodeError> { 568 let mut cids = Vec::new(); 569 for item in self.store.iter() { 570 let (key, _) = item?; 571 if key.len() == 32 { 572 let mut cid = [0u8; 32]; 573 cid.copy_from_slice(&key); 574 cids.push(cid); 575 } 576 } 577 Ok(cids) 578 } 579 580 /// Get total stored content size 581 pub fn store_size(&self) -> Result<u64, NodeError> { 582 let mut total = 0u64; 583 for item in self.store.iter() { 584 let (_, value) = item?; 585 total += value.len() as u64; 586 } 587 Ok(total) 588 } 589 590 /// Signal shutdown 591 pub fn signal_shutdown(&self) { 592 info!("Shutdown signal received"); 593 self.shutdown.notify_waiters(); 594 } 595 596 /// Sign data with our machine identity (for infrastructure operations) 597 /// 598 /// This signs using the MACHINE identity, not a user identity. 599 /// Use this for: routing, DHT operations, transport authentication. 600 /// Do NOT use for: user messages, content ownership, personal signatures. 601 pub fn machine_sign(&self, data: &[u8]) -> ed25519_dalek::Signature { 602 self.machine_identity.sign(data) 603 } 604 605 // ===== DHT Methods ===== 606 607 /// Store a value in the DHT (local only for now, TODO: network push) 608 pub async fn dht_put(&self, key: DhtKey, value: Vec<u8>, value_type: ValueType) -> Result<(), NodeError> { 609 let now = std::time::SystemTime::now() 610 .duration_since(std::time::UNIX_EPOCH) 611 .unwrap() 612 .as_secs(); 613 614 // Create stored value 615 let mut stored = StoredValue { 616 key, 617 metadata: abzu_dht::store::ValueMetadata { 618 value_type, 619 publisher: self.machine_identity.public_bytes(), // Publisher is us 620 published_at: now, 621 expires_at: now + value_type.default_ttl(), 622 seq: 0, 623 }, 624 payload: value, 625 signature: [0u8; 64], 626 }; 627 628 // Sign the value 629 let signature = self.machine_identity.sign(&stored.signable_bytes()); 630 stored.signature = signature.to_bytes(); 631 632 self.dht_store.write().await.store(stored, now) 633 .map_err(|e| NodeError::DhtError(e.to_string()))?; 634 Ok(()) 635 } 636 637 /// Update a value in the DHT (e.g. refreshing a mutable record) 638 pub async fn dht_update(&self, key: DhtKey, value: Vec<u8>, value_type: ValueType, seq: u64) -> Result<(), NodeError> { 639 let now = std::time::SystemTime::now() 640 .duration_since(std::time::UNIX_EPOCH) 641 .unwrap() 642 .as_secs(); 643 644 let mut stored = StoredValue { 645 key, 646 metadata: abzu_dht::store::ValueMetadata { 647 value_type, 648 publisher: self.machine_identity.public_bytes(), 649 published_at: now, 650 expires_at: now + value_type.default_ttl(), 651 seq, 652 }, 653 payload: value, 654 signature: [0u8; 64], 655 }; 656 657 // Sign the value 658 let signature = self.machine_identity.sign(&stored.signable_bytes()); 659 stored.signature = signature.to_bytes(); 660 661 self.dht_store.write().await.store(stored, now) 662 .map_err(|e| NodeError::DhtError(e.to_string()))?; 663 Ok(()) 664 } 665 666 /// Get a value from the DHT (local only for now, TODO: network lookup) 667 pub async fn dht_get(&self, key: &DhtKey) -> Result<Option<Vec<u8>>, NodeError> { 668 let now = std::time::SystemTime::now() 669 .duration_since(std::time::UNIX_EPOCH) 670 .unwrap() 671 .as_secs(); 672 673 // Return the simple value payload 674 let store = self.dht_store.read().await; 675 // get returns a Vec<&StoredValue>, we just take the first one for now 676 Ok(store.get(key, now).first().map(|v| v.payload.clone())) 677 } 678 679 /// Get full stored value from DHT 680 pub async fn dht_get_full(&self, key: &DhtKey) -> Result<Option<StoredValue>, NodeError> { 681 let now = std::time::SystemTime::now() 682 .duration_since(std::time::UNIX_EPOCH) 683 .unwrap() 684 .as_secs(); 685 686 let store = self.dht_store.read().await; 687 Ok(store.get(key, now).first().cloned().cloned()) 688 } 689 690 // ===== Chat Methods ===== 691 692 /// Get the next message ID (monotonically increasing, persisted to sled) 693 pub fn next_message_id(&self) -> u64 { 694 let id = self.next_msg_id.fetch_add(1, Ordering::SeqCst); 695 // Persist the new counter value (best-effort, don't fail on persistence error) 696 let _ = self.store.insert(MSG_ID_COUNTER_KEY, &(id + 1).to_be_bytes()); 697 id 698 } 699 700 /// Create a chat storage key: timestamp_be || msg_id_be 701 fn chat_key(timestamp: u64, msg_id: u64) -> [u8; 16] { 702 let mut key = [0u8; 16]; 703 key[..8].copy_from_slice(×tamp.to_be_bytes()); 704 key[8..].copy_from_slice(&msg_id.to_be_bytes()); 705 key 706 } 707 708 /// Store an outbound chat message (status = Pending) 709 pub fn store_outbound_chat( 710 &self, 711 recipient: [u8; 32], 712 content: Vec<u8>, 713 timestamp: u64, 714 ) -> Result<StoredMessage, NodeError> { 715 let id = self.next_message_id(); 716 let msg = StoredMessage { 717 id, 718 peer: recipient, 719 content, 720 timestamp, 721 direction: MessageDirection::Outbound, 722 status: MessageStatus::Pending, 723 }; 724 725 let key = Self::chat_key(timestamp, id); 726 self.current_user.put(crate::storage::collections::CHATS, &key, &msg) 727 .map_err(|e| NodeError::Serialization(e.to_string()))?; 728 debug!(id = id, peer = ?recipient, "Stored outbound chat"); 729 Ok(msg) 730 } 731 732 /// Store an inbound chat message (status = Delivered) 733 pub fn store_inbound_chat( 734 &self, 735 sender: [u8; 32], 736 msg_id: u64, 737 content: Vec<u8>, 738 timestamp: u64, 739 ) -> Result<StoredMessage, NodeError> { 740 let msg = StoredMessage { 741 id: msg_id, 742 peer: sender, 743 content, 744 timestamp, 745 direction: MessageDirection::Inbound, 746 status: MessageStatus::Delivered, 747 }; 748 749 let key = Self::chat_key(timestamp, msg_id); 750 self.current_user.put(crate::storage::collections::CHATS, &key, &msg) 751 .map_err(|e| NodeError::Serialization(e.to_string()))?; 752 debug!(id = msg_id, peer = ?sender, "Stored inbound chat"); 753 Ok(msg) 754 } 755 756 /// Mark a message as delivered 757 pub fn mark_delivered(&self, msg_id: u64) -> Result<(), NodeError> { 758 use crate::storage::collections::CHATS; 759 // Find the message by scanning (could optimize with secondary index) 760 let entries = self.current_user.scan_collection(CHATS) 761 .map_err(|e| NodeError::Serialization(e.to_string()))?; 762 763 for (key, value) in entries { 764 let mut msg: StoredMessage = codec::decode(&value)?; 765 if msg.id == msg_id && msg.direction == MessageDirection::Outbound { 766 msg.status = MessageStatus::Delivered; 767 self.current_user.put(CHATS, &key, &msg) 768 .map_err(|e| NodeError::Serialization(e.to_string()))?; 769 debug!(id = msg_id, "Marked message delivered"); 770 return Ok(()); 771 } 772 } 773 Err(NodeError::MessageNotFound(msg_id)) 774 } 775 776 /// Mark a message as read 777 pub fn mark_read(&self, msg_id: u64) -> Result<(), NodeError> { 778 use crate::storage::collections::CHATS; 779 let entries = self.current_user.scan_collection(CHATS) 780 .map_err(|e| NodeError::Serialization(e.to_string()))?; 781 782 for (key, value) in entries { 783 let mut msg: StoredMessage = codec::decode(&value)?; 784 if msg.id == msg_id && msg.direction == MessageDirection::Outbound { 785 msg.status = MessageStatus::Read; 786 self.current_user.put(CHATS, &key, &msg) 787 .map_err(|e| NodeError::Serialization(e.to_string()))?; 788 debug!(id = msg_id, "Marked message read"); 789 return Ok(()); 790 } 791 } 792 Err(NodeError::MessageNotFound(msg_id)) 793 } 794 795 /// Get chat history with a specific peer (sorted by timestamp) 796 pub fn get_chat_history(&self, peer: &[u8; 32]) -> Result<Vec<StoredMessage>, NodeError> { 797 use crate::storage::collections::CHATS; 798 let entries = self.current_user.scan_collection(CHATS) 799 .map_err(|e| NodeError::Serialization(e.to_string()))?; 800 801 let mut messages = Vec::new(); 802 for (_, value) in entries { 803 let msg: StoredMessage = codec::decode(&value)?; 804 if msg.peer == *peer { 805 messages.push(msg); 806 } 807 } 808 // Already sorted by timestamp due to key structure 809 Ok(messages) 810 } 811 812 /// Get all pending outbound messages (for retry logic) 813 /// Returns messages that are still in Pending status 814 pub fn get_pending_messages(&self) -> Result<Vec<StoredMessage>, NodeError> { 815 use crate::storage::collections::CHATS; 816 let entries = self.current_user.scan_collection(CHATS) 817 .map_err(|e| NodeError::Serialization(e.to_string()))?; 818 819 let mut pending = Vec::new(); 820 for (_, value) in entries { 821 let msg: StoredMessage = codec::decode(&value)?; 822 if msg.direction == MessageDirection::Outbound && msg.status == MessageStatus::Pending { 823 pending.push(msg); 824 } 825 } 826 Ok(pending) 827 } 828 829 /// Mark a message as failed (after max retries) 830 pub fn mark_failed(&self, msg_id: u64) -> Result<(), NodeError> { 831 use crate::storage::collections::CHATS; 832 let entries = self.current_user.scan_collection(CHATS) 833 .map_err(|e| NodeError::Serialization(e.to_string()))?; 834 835 for (key, value) in entries { 836 let mut msg: StoredMessage = codec::decode(&value)?; 837 if msg.id == msg_id && msg.direction == MessageDirection::Outbound { 838 msg.status = MessageStatus::Failed; 839 self.current_user.put(CHATS, &key, &msg) 840 .map_err(|e| NodeError::Serialization(e.to_string()))?; 841 debug!(id = msg_id, "Marked message failed"); 842 return Ok(()); 843 } 844 } 845 Err(NodeError::MessageNotFound(msg_id)) 846 } 847 848 // ===== Contact Methods ===== 849 850 /// Add a contact 851 pub fn add_contact(&self, alias: String, pubkey: [u8; 32]) -> Result<Contact, NodeError> { 852 use crate::storage::collections::CONTACTS; 853 let contact = Contact { 854 alias, 855 pubkey, 856 added_at: std::time::SystemTime::now() 857 .duration_since(std::time::UNIX_EPOCH) 858 .unwrap_or_default() 859 .as_millis() as u64, 860 }; 861 862 self.current_user.put(CONTACTS, &pubkey, &contact) 863 .map_err(|e| NodeError::Serialization(e.to_string()))?; 864 info!(alias = %contact.alias, "Added contact"); 865 Ok(contact) 866 } 867 868 /// Remove a contact 869 pub fn remove_contact(&self, pubkey: &[u8; 32]) -> Result<bool, NodeError> { 870 use crate::storage::collections::CONTACTS; 871 self.current_user.delete(CONTACTS, pubkey) 872 .map_err(|e| NodeError::Serialization(e.to_string())) 873 } 874 875 /// Get all contacts 876 pub fn get_contacts(&self) -> Result<Vec<Contact>, NodeError> { 877 use crate::storage::collections::CONTACTS; 878 let entries = self.current_user.scan_collection(CONTACTS) 879 .map_err(|e| NodeError::Serialization(e.to_string()))?; 880 881 let mut contacts = Vec::new(); 882 for (_, value) in entries { 883 let contact: Contact = codec::decode(&value)?; 884 contacts.push(contact); 885 } 886 887 // Sort by alias 888 contacts.sort_by(|a, b| a.alias.cmp(&b.alias)); 889 Ok(contacts) 890 } 891 892 /// Get a contact by pubkey 893 pub fn get_contact(&self, pubkey: &[u8; 32]) -> Result<Option<Contact>, NodeError> { 894 use crate::storage::collections::CONTACTS; 895 self.current_user.get::<Contact>(CONTACTS, pubkey) 896 .map_err(|e| NodeError::Serialization(e.to_string())) 897 } 898 899 // ───────────────────────────────────────────────────────────────────────── 900 // Circle (Group) Methods 901 // ───────────────────────────────────────────────────────────────────────── 902 903 /// Create a new circle 904 pub fn create_circle(&self, name: String) -> Result<Circle, NodeError> { 905 self.create_circle_with_policy(name, TrustPolicy::default()) 906 } 907 908 /// Create a new Circle with a specific trust policy 909 /// 910 /// # Trust Policy Options 911 /// - `TrustPolicy::Anonymous` (default): No trust ledger, no invite tree. Safest for adversarial contexts. 912 /// - `TrustPolicy::Private { prune_mode }`: Minimal records, only current membership. 913 /// - `TrustPolicy::Accountable { ledger_mode, prune_mode }`: Full audit trail for moderated communities. 914 pub fn create_circle_with_policy(&self, name: String, trust_policy: TrustPolicy) -> Result<Circle, NodeError> { 915 let founder_key = self.public_key().to_bytes(); 916 let circle = self.trust_engine.create_circle(founder_key, name, trust_policy)?; 917 Ok(circle) 918 } 919 920 /// Get a circle by ID 921 pub fn get_circle(&self, id: &[u8; 32]) -> Result<Option<Circle>, NodeError> { 922 Ok(self.trust_engine.get_circle(id)?) 923 } 924 925 /// List all circles we're a member of 926 pub fn list_circles(&self) -> Result<Vec<Circle>, NodeError> { 927 Ok(self.trust_engine.list_circles()?) 928 } 929 930 /// Add a member to a circle (must be founder/admin) 931 /// 932 /// `caller_pubkey` is the public key of the account performing this action. 933 /// Note: This is a legacy wrapper - use `invite_member` for the proper trust flow. 934 #[allow(dead_code)] 935 pub fn add_circle_member(&self, caller_pubkey: [u8; 32], circle_id: &[u8; 32], pubkey: [u8; 32], _role: MemberRole) -> Result<Circle, NodeError> { 936 // Delegate to invite_member with empty signature for legacy compatibility 937 self.invite_member(caller_pubkey, circle_id, pubkey, Vec::new()) 938 } 939 940 /// Invite a new member to a circle with full trust protocol. 941 /// 942 /// This is the proper invite flow with: 943 /// - Capacity tracking (inviter must have invites_remaining) 944 /// - Depth tracking (MAX_INVITE_DEPTH enforced) 945 /// - Ledger recording (TrustAction::Invite) 946 /// 947 /// `caller_pubkey` is the public key of the account performing this action. 948 /// The signature should be over: circle_id || invitee_pubkey || timestamp 949 pub fn invite_member( 950 &self, 951 caller_pubkey: [u8; 32], 952 circle_id: &[u8; 32], 953 invitee_pubkey: [u8; 32], 954 invite_signature: Vec<u8>, 955 ) -> Result<Circle, NodeError> { 956 let circle = self.trust_engine.invite_member( 957 caller_pubkey, 958 circle_id, 959 invitee_pubkey, 960 invite_signature, 961 )?; 962 Ok(circle) 963 } 964 965 /// Remove a member from a circle 966 pub fn remove_circle_member(&self, circle_id: &[u8; 32], pubkey: &[u8; 32]) -> Result<Circle, NodeError> { 967 let circle = self.trust_engine.remove_member(circle_id, pubkey)?; 968 Ok(circle) 969 } 970 971 // ───────────────────────────────────────────────────────────────────────────── 972 // Trust Protocol Methods 973 // ───────────────────────────────────────────────────────────────────────────── 974 975 /// Vouch for an existing member in a circle (secondary endorsement). 976 /// 977 /// Vouches are weaker than invites but can: 978 /// - Increase a member's standing in the community 979 /// - Help members qualify for admin elevation (requires MIN_VOUCHES_FOR_ADMIN) 980 /// 981 /// Returns error if: 982 /// - Circle not found 983 /// - Voucher or target not a member 984 /// - Voucher has no vouch capacity 985 /// - Already vouched for this member 986 /// 987 /// `caller_pubkey` is the public key of the account performing this action. 988 pub fn vouch_member( 989 &self, 990 caller_pubkey: [u8; 32], 991 circle_id: &[u8; 32], 992 target_pubkey: [u8; 32], 993 signature: Vec<u8>, 994 ) -> Result<Circle, NodeError> { 995 let circle = self.trust_engine.vouch_member( 996 caller_pubkey, 997 circle_id, 998 target_pubkey, 999 signature, 1000 )?; 1001 Ok(circle) 1002 } 1003 1004 /// Prune a member and all their invite descendants from a circle. 1005 /// 1006 /// This is the accountability mechanism: if you invite someone who 1007 /// misbehaves, an admin can prune them (and you may lose capacity 1008 /// as a consequence). 1009 /// 1010 /// Returns the list of removed public keys. 1011 /// 1012 /// `caller_pubkey` is the public key of the account performing this action. 1013 pub fn prune_branch( 1014 &self, 1015 caller_pubkey: [u8; 32], 1016 circle_id: &[u8; 32], 1017 target_pubkey: [u8; 32], 1018 signature: Vec<u8>, 1019 reason_hash: Option<[u8; 32]>, 1020 ) -> Result<(Circle, Vec<[u8; 32]>), NodeError> { 1021 let result = self.trust_engine.prune_branch( 1022 caller_pubkey, 1023 circle_id, 1024 target_pubkey, 1025 signature, 1026 reason_hash, 1027 )?; 1028 Ok(result) 1029 } 1030 1031 /// Encrypt a message for a circle using its symmetric key 1032 /// Returns: nonce (12 bytes) || ciphertext 1033 pub fn encrypt_circle_message(&self, circle_id: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>, NodeError> { 1034 use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::Aead}; 1035 use rand::RngCore; 1036 1037 // Get circle to access its symmetric key 1038 let circle = self.get_circle(circle_id)? 1039 .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?; 1040 1041 // Create cipher with circle's symmetric key 1042 let cipher = ChaCha20Poly1305::new_from_slice(&circle.symmetric_key) 1043 .map_err(|e| NodeError::Crypto(e.to_string()))?; 1044 1045 // Generate random nonce (12 bytes) 1046 let mut nonce_bytes = [0u8; 12]; 1047 rand::thread_rng().fill_bytes(&mut nonce_bytes); 1048 let nonce = chacha20poly1305::Nonce::from_slice(&nonce_bytes); 1049 1050 // Encrypt plaintext 1051 let ciphertext = cipher.encrypt(nonce, plaintext) 1052 .map_err(|e| NodeError::Crypto(e.to_string()))?; 1053 1054 // Prepend nonce to ciphertext 1055 let mut result = Vec::with_capacity(12 + ciphertext.len()); 1056 result.extend_from_slice(&nonce_bytes); 1057 result.extend_from_slice(&ciphertext); 1058 1059 Ok(result) 1060 } 1061 1062 /// Decrypt a message from a circle using its symmetric key 1063 /// Input: nonce (12 bytes) || ciphertext 1064 pub fn decrypt_circle_message(&self, circle_id: &[u8; 32], encrypted: &[u8]) -> Result<Vec<u8>, NodeError> { 1065 use chacha20poly1305::{ChaCha20Poly1305, KeyInit, aead::Aead}; 1066 1067 if encrypted.len() < 12 { 1068 return Err(NodeError::Crypto("Encrypted message too short".to_string())); 1069 } 1070 1071 // Get circle to access its symmetric key 1072 let circle = self.get_circle(circle_id)? 1073 .ok_or_else(|| NodeError::Serialization("Circle not found".to_string()))?; 1074 1075 // Create cipher with circle's symmetric key 1076 let cipher = ChaCha20Poly1305::new_from_slice(&circle.symmetric_key) 1077 .map_err(|e| NodeError::Crypto(e.to_string()))?; 1078 1079 // Extract nonce and ciphertext 1080 let nonce = chacha20poly1305::Nonce::from_slice(&encrypted[..12]); 1081 let ciphertext = &encrypted[12..]; 1082 1083 // Decrypt 1084 cipher.decrypt(nonce, ciphertext) 1085 .map_err(|e| NodeError::Crypto(format!("Decryption failed: {}", e))) 1086 } 1087 1088 /// Decrypt a privacy-preserving CircleEnvelope from a circle 1089 /// 1090 /// SECURITY: This envelope contains the sender and timestamp metadata 1091 /// which are protected from network observers (addressing red team finding) 1092 /// 1093 /// Input format: nonce (12 bytes) || ciphertext(serialized CircleEnvelope) 1094 pub fn decrypt_circle_envelope(&self, circle_id: &[u8; 32], encrypted: &[u8]) -> Result<abzu_transport::CircleEnvelope, NodeError> { 1095 // First decrypt to get the raw bytes 1096 let decrypted = self.decrypt_circle_message(circle_id, encrypted)?; 1097 1098 // Then deserialize using the transport layer's abstraction 1099 abzu_transport::CircleEnvelope::decode(&decrypted) 1100 .map_err(|e| NodeError::Serialization(format!("Failed to deserialize CircleEnvelope: {}", e))) 1101 } 1102 1103 /// Store an inbound circle message 1104 pub fn store_inbound_circle_message(&self, circle_id: [u8; 32], msg_id: u64, sender: [u8; 32], content: Vec<u8>, timestamp: u64) -> Result<StoredCircleMessage, NodeError> { 1105 use crate::storage::collections::CIRCLE_MESSAGES; 1106 let msg = StoredCircleMessage { 1107 circle_id, 1108 id: msg_id, 1109 sender, 1110 content, 1111 timestamp, 1112 direction: MessageDirection::Inbound, 1113 }; 1114 1115 // Key: circle_id (32) + timestamp (8, big-endian for ordering) + msg_id (8) 1116 let mut key = Vec::with_capacity(48); 1117 key.extend_from_slice(&circle_id); 1118 key.extend_from_slice(×tamp.to_be_bytes()); 1119 key.extend_from_slice(&msg_id.to_be_bytes()); 1120 1121 self.current_user.put(CIRCLE_MESSAGES, &key, &msg) 1122 .map_err(|e| NodeError::Serialization(e.to_string()))?; 1123 1124 Ok(msg) 1125 } 1126 1127 /// Store an outbound circle message 1128 pub fn store_outbound_circle_message(&self, circle_id: [u8; 32], content: Vec<u8>, timestamp: u64) -> Result<StoredCircleMessage, NodeError> { 1129 use crate::storage::collections::CIRCLE_MESSAGES; 1130 let msg_id = self.next_msg_id.fetch_add(1, Ordering::Relaxed); 1131 1132 let msg = StoredCircleMessage { 1133 circle_id, 1134 id: msg_id, 1135 sender: self.public_key().to_bytes(), 1136 content, 1137 timestamp, 1138 direction: MessageDirection::Outbound, 1139 }; 1140 1141 // Key: circle_id (32) + timestamp (8, big-endian for ordering) + msg_id (8) 1142 let mut key = Vec::with_capacity(48); 1143 key.extend_from_slice(&circle_id); 1144 key.extend_from_slice(×tamp.to_be_bytes()); 1145 key.extend_from_slice(&msg_id.to_be_bytes()); 1146 1147 self.current_user.put(CIRCLE_MESSAGES, &key, &msg) 1148 .map_err(|e| NodeError::Serialization(e.to_string()))?; 1149 1150 Ok(msg) 1151 } 1152 1153 /// Get message history for a circle 1154 pub fn get_circle_history(&self, circle_id: &[u8; 32], limit: usize) -> Result<Vec<StoredCircleMessage>, NodeError> { 1155 use crate::storage::collections::CIRCLE_MESSAGES; 1156 1157 // Use prefix scan for O(log N) lookup instead of O(N) full scan 1158 let entries = self.current_user.scan_with_prefix(CIRCLE_MESSAGES, circle_id)?; 1159 1160 let mut messages: Vec<StoredCircleMessage> = entries 1161 .into_iter() 1162 .map(|(_, value)| codec::decode(&value)) 1163 .collect::<Result<Vec<_>, _>>()?; 1164 1165 // Already sorted by timestamp+msg_id due to key structure 1166 // Take last N messages 1167 if messages.len() > limit { 1168 messages = messages.split_off(messages.len() - limit); 1169 } 1170 1171 Ok(messages) 1172 } 1173 } 1174 1175 impl std::fmt::Debug for Node { 1176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 1177 f.debug_struct("Node") 1178 .field("address", &self.address) 1179 .field("storage_path", &self.config.storage_path) 1180 .finish_non_exhaustive() 1181 } 1182 } 1183 1184 #[cfg(test)] 1185 mod tests { 1186 use super::*; 1187 use tempfile::tempdir; 1188 1189 #[test] 1190 fn test_node_creation() { 1191 let tmp = tempdir().unwrap(); 1192 let config = NodeConfig { 1193 storage_path: tmp.path().to_str().unwrap().to_string(), 1194 ..Default::default() 1195 }; 1196 1197 let node = Node::new(config).unwrap(); 1198 assert!(node.address().is_valid()); 1199 } 1200 1201 #[test] 1202 fn test_chunk_storage() { 1203 let tmp = tempdir().unwrap(); 1204 let config = NodeConfig { 1205 storage_path: tmp.path().to_str().unwrap().to_string(), 1206 ..Default::default() 1207 }; 1208 1209 let node = Node::new(config).unwrap(); 1210 1211 let cid = [0xAB; 32]; 1212 let data = b"hello world"; 1213 1214 node.store_chunk(&cid, data).unwrap(); 1215 let retrieved = node.get_chunk(&cid).unwrap().unwrap(); 1216 assert_eq!(retrieved, data); 1217 } 1218 }