main.rs
1 //! Abzu Daemon 2 //! 3 //! Main entry point for the Abzu mesh networking daemon. 4 5 mod auth; 6 mod config; 7 mod rpc; 8 mod stun; 9 10 use std::net::SocketAddr; 11 use std::sync::atomic::{AtomicBool, Ordering}; 12 use std::sync::Arc; 13 14 use anyhow::{Context, Result}; 15 use tokio::net::TcpListener; 16 use tracing::{error, info, warn}; 17 use tracing_subscriber::EnvFilter; 18 19 use abzu_core::config::SecurityTier; 20 use abzu_core::{bootstrap, run_with_listener, BootstrapConfig, Node, NodeConfig, PeerConnection}; 21 use abzu_transport::transports::WsStream; 22 use abzu_sdk::Identity; 23 24 25 use config::Args; 26 use rpc::start_rpc_server; 27 28 #[tokio::main] 29 async fn main() -> Result<()> { 30 // Parse CLI arguments 31 let args = Args::parse_args(); 32 33 // Initialize logging 34 let filter = if args.verbose { 35 EnvFilter::new("debug") 36 } else { 37 EnvFilter::try_from_default_env() 38 .unwrap_or_else(|_| EnvFilter::new("info")) 39 }; 40 41 tracing_subscriber::fmt() 42 .with_env_filter(filter) 43 .with_target(true) 44 .init(); 45 46 info!("Abzu daemon starting..."); 47 info!( 48 listen = args.listen, 49 rpc = args.rpc, 50 ws_listen = ?args.ws_listen, 51 security_tier = %args.security_tier, 52 "Configuration" 53 ); 54 55 // Load or create identity (using SDK) 56 let signing_key = Identity::load_or_create(&args.identity_path()) 57 .context("Failed to load/create identity")?; 58 let public_key = signing_key.verifying_key(); 59 let address = abzu_router::address_for_key(&public_key); 60 61 info!( 62 address = %address, 63 public_key = %hex_encode(public_key.as_bytes()), 64 "Identity loaded" 65 ); 66 67 // Parse security tier from CLI 68 let security_tier = match args.security_tier.to_lowercase().as_str() { 69 "off" => SecurityTier::Off, 70 "blend" => SecurityTier::Blend, 71 "shadow" => SecurityTier::Shadow, 72 "ghost" => SecurityTier::Ghost, 73 other => { 74 warn!(tier = %other, "Unknown security tier, defaulting to Blend"); 75 SecurityTier::Blend 76 } 77 }; 78 79 // Create node configuration 80 let node_config = NodeConfig { 81 storage_path: args.storage_path().to_string_lossy().to_string(), 82 listen_addr: Some(args.listen_addr()), 83 bootstrap_peers: args.peer.clone(), 84 heartbeat_ms: 500, 85 security_tier, 86 node_role: abzu_core::config::NodeRole::Desktop, 87 home_node: None, // Desktop nodes don't need Home delegation 88 }; 89 90 // Create the node 91 let service_key = signing_key.clone(); 92 let agent_key = signing_key.clone(); 93 let node = Arc::new(Node::with_identity(signing_key, node_config.clone())?); 94 95 // Start Inference Service (Worker) 96 let service_node = Arc::clone(&node); 97 tokio::spawn(async move { 98 let service = abzu_inference::InferenceService::new(service_node, service_key, None); 99 service.start().await; 100 }); 101 102 // Generate a shared key for now (TODO: proper key exchange) 103 let shared_key: [u8; 32] = { 104 let hash = blake3::hash(b"abzu-shared-key-v0"); 105 *hash.as_bytes() 106 }; 107 108 // Generate and save RPC auth token 109 let auth_token = auth::generate_token(); 110 let token_path = auth::default_token_path(); 111 auth::write_token_file(&auth_token, &token_path) 112 .with_context(|| format!("Failed to write RPC auth token to {:?}", token_path))?; 113 info!(path = ?token_path, "RPC auth token saved (mode 0600)"); 114 115 116 // Initialize Token Ledger (Persistent) 117 let token_db_path = std::path::Path::new(&node_config.storage_path).join("token_db"); 118 let ledger: Arc<tokio::sync::RwLock<dyn abzu_token::Ledger>> = match abzu_token::SledLedger::open(&token_db_path) { 119 Ok(l) => { 120 info!(path = ?token_db_path, "Persistent ledger opened"); 121 Arc::new(tokio::sync::RwLock::new(l)) 122 }, 123 Err(e) => { 124 error!(error = ?e, "Failed to open persistent ledger, falling back to memory (DATA WILL BE LOST)"); 125 Arc::new(tokio::sync::RwLock::new(abzu_token::MemoryLedger::new())) 126 } 127 }; 128 129 // Initialize Sovereign Agent Runtime 130 info!("Initializing Sovereign Agent Runtime..."); 131 let agent_seed = agent_key.to_bytes(); 132 let agent_identity = sovereign_agent::Agent::from_seed(&agent_seed) 133 .context("Failed to create agent identity")?; 134 let agent = Arc::new(sovereign_agent::agent::AgentRuntime::new(Arc::new(agent_identity), ledger).await); 135 136 137 // Start RPC server with authentication 138 let rpc_addr = start_rpc_server(&args.rpc_addr(), Arc::clone(&node), Arc::clone(&agent), shared_key, auth_token).await?; 139 info!(addr = %rpc_addr, "Control plane ready (authentication required)"); 140 141 // Bootstrap into the network 142 // By default, connects to Abzu Network Core + CLI peers 143 // Use --no-core for private/isolated networks 144 let mut bootstrap_config = BootstrapConfig::with_peers(args.peer.clone()); 145 if args.no_core { 146 bootstrap_config.connect_to_core = false; 147 info!("Abzu Network Core disabled (--no-core). Running in private/isolated mode."); 148 } 149 let result = bootstrap(Arc::clone(&node), &shared_key, bootstrap_config).await; 150 151 if result.connected == 0 && args.peer.is_empty() && !args.no_core { 152 warn!( 153 "Failed to connect to Abzu Network Core ({} failed). Running in isolated mode.", 154 result.failed 155 ); 156 } else if result.connected == 0 && !args.peer.is_empty() { 157 warn!( 158 "Failed to connect to any bootstrap peers ({} failed). Running in isolated mode.", 159 result.failed 160 ); 161 } 162 163 // Spawn the main event loop with TCP/FakeTLS listener 164 let node_clone = Arc::clone(&node); 165 let listen_addr = args.listen_addr(); 166 let event_loop = tokio::spawn(async move { 167 if let Err(e) = run_with_listener(node_clone, &listen_addr, shared_key).await { 168 error!(error = %e, "Event loop error"); 169 } 170 }); 171 172 // Spawn WebSocket listener if configured 173 if let Some(ws_addr) = args.ws_listen_addr() { 174 let node_ws = Arc::clone(&node); 175 tokio::spawn(async move { 176 if let Err(e) = run_ws_listener(node_ws, &ws_addr).await { 177 error!(error = %e, "WebSocket listener error"); 178 } 179 }); 180 } 181 182 // Spawn sovereign STUN server (runs on port 3478 by default) 183 let stun_shutdown = Arc::new(AtomicBool::new(false)); 184 let stun_shutdown_clone = Arc::clone(&stun_shutdown); 185 tokio::spawn(async move { 186 if let Err(e) = stun::run_stun_server(stun::StunServerConfig::default(), stun_shutdown_clone).await { 187 error!(error = %e, "STUN server error"); 188 } 189 }); 190 191 // Wait for Ctrl+C 192 info!("Abzu node running. Press Ctrl+C to shutdown."); 193 194 tokio::select! { 195 _ = tokio::signal::ctrl_c() => { 196 info!("Ctrl+C received, initiating shutdown..."); 197 } 198 _ = event_loop => { 199 info!("Event loop exited"); 200 } 201 } 202 203 // Signal shutdown 204 node.signal_shutdown(); 205 stun_shutdown.store(true, Ordering::Relaxed); 206 207 // Give some time for graceful shutdown 208 tokio::time::sleep(std::time::Duration::from_millis(500)).await; 209 210 info!("Abzu daemon stopped"); 211 Ok(()) 212 } 213 214 /// Hex encode bytes 215 fn hex_encode(bytes: impl AsRef<[u8]>) -> String { 216 bytes.as_ref().iter().map(|b| format!("{:02x}", b)).collect() 217 } 218 219 /// Run a WebSocket listener for browser/mobile clients 220 async fn run_ws_listener(node: Arc<Node>, addr: &str) -> Result<()> { 221 let addr: SocketAddr = addr.parse()?; 222 let listener = TcpListener::bind(addr).await?; 223 224 info!(addr = %addr, "WebSocket listener bound"); 225 226 loop { 227 match listener.accept().await { 228 Ok((stream, peer_addr)) => { 229 info!(peer = %peer_addr, "Accepted WebSocket connection"); 230 231 let node_clone = Arc::clone(&node); 232 233 tokio::spawn(async move { 234 // Perform Perfect Forward Secrecy handshake 235 // This establishes a cryptographic identity (ephemeral key) and encrypted channel 236 match WsStream::accept_pfs(stream).await { 237 Ok(handshake) => { 238 let peer_key = handshake.peer_pubkey; 239 let conn = PeerConnection::new(Box::new(handshake.stream)); 240 241 node_clone.add_peer(peer_key, conn).await; 242 243 info!( 244 peer = %peer_addr, 245 pubkey = %hex_encode(peer_key), 246 "Secure WebSocket peer registered" 247 ); 248 } 249 Err(e) => { 250 warn!(peer = %peer_addr, error = %e, "Secure WebSocket handshake failed"); 251 } 252 } 253 }); 254 } 255 Err(e) => { 256 warn!(error = %e, "Failed to accept WebSocket connection"); 257 } 258 } 259 } 260 }