/ abzu-daemon / src / main.rs
main.rs
  1  //! Abzu Daemon
  2  //!
  3  //! Main entry point for the Abzu mesh networking daemon.
  4  
  5  mod auth;
  6  mod config;
  7  mod rpc;
  8  mod stun;
  9  
 10  use std::net::SocketAddr;
 11  use std::sync::atomic::{AtomicBool, Ordering};
 12  use std::sync::Arc;
 13  
 14  use anyhow::{Context, Result};
 15  use tokio::net::TcpListener;
 16  use tracing::{error, info, warn};
 17  use tracing_subscriber::EnvFilter;
 18  
 19  use abzu_core::config::SecurityTier;
 20  use abzu_core::{bootstrap, run_with_listener, BootstrapConfig, Node, NodeConfig, PeerConnection};
 21  use abzu_transport::transports::WsStream;
 22  use abzu_sdk::Identity;
 23  
 24  
 25  use config::Args;
 26  use rpc::start_rpc_server;
 27  
 28  #[tokio::main]
 29  async fn main() -> Result<()> {
 30      // Parse CLI arguments
 31      let args = Args::parse_args();
 32  
 33      // Initialize logging
 34      let filter = if args.verbose {
 35          EnvFilter::new("debug")
 36      } else {
 37          EnvFilter::try_from_default_env()
 38              .unwrap_or_else(|_| EnvFilter::new("info"))
 39      };
 40  
 41      tracing_subscriber::fmt()
 42          .with_env_filter(filter)
 43          .with_target(true)
 44          .init();
 45  
 46      info!("Abzu daemon starting...");
 47      info!(
 48          listen = args.listen,
 49          rpc = args.rpc,
 50          ws_listen = ?args.ws_listen,
 51          security_tier = %args.security_tier,
 52          "Configuration"
 53      );
 54  
 55      // Load or create identity (using SDK)
 56      let signing_key = Identity::load_or_create(&args.identity_path())
 57          .context("Failed to load/create identity")?;
 58      let public_key = signing_key.verifying_key();
 59      let address = abzu_router::address_for_key(&public_key);
 60      
 61      info!(
 62          address = %address,
 63          public_key = %hex_encode(public_key.as_bytes()),
 64          "Identity loaded"
 65      );
 66  
 67      // Parse security tier from CLI
 68      let security_tier = match args.security_tier.to_lowercase().as_str() {
 69          "off" => SecurityTier::Off,
 70          "blend" => SecurityTier::Blend,
 71          "shadow" => SecurityTier::Shadow,
 72          "ghost" => SecurityTier::Ghost,
 73          other => {
 74              warn!(tier = %other, "Unknown security tier, defaulting to Blend");
 75              SecurityTier::Blend
 76          }
 77      };
 78  
 79      // Create node configuration
 80      let node_config = NodeConfig {
 81          storage_path: args.storage_path().to_string_lossy().to_string(),
 82          listen_addr: Some(args.listen_addr()),
 83          bootstrap_peers: args.peer.clone(),
 84          heartbeat_ms: 500,
 85          security_tier,
 86          node_role: abzu_core::config::NodeRole::Desktop,
 87          home_node: None, // Desktop nodes don't need Home delegation
 88      };
 89  
 90      // Create the node
 91      let service_key = signing_key.clone();
 92      let agent_key = signing_key.clone();
 93      let node = Arc::new(Node::with_identity(signing_key, node_config.clone())?);
 94  
 95      // Start Inference Service (Worker)
 96      let service_node = Arc::clone(&node);
 97      tokio::spawn(async move {
 98          let service = abzu_inference::InferenceService::new(service_node, service_key, None);
 99          service.start().await;
100      });
101  
102      // Generate a shared key for now (TODO: proper key exchange)
103      let shared_key: [u8; 32] = {
104          let hash = blake3::hash(b"abzu-shared-key-v0");
105          *hash.as_bytes()
106      };
107  
108      // Generate and save RPC auth token
109      let auth_token = auth::generate_token();
110      let token_path = auth::default_token_path();
111      auth::write_token_file(&auth_token, &token_path)
112          .with_context(|| format!("Failed to write RPC auth token to {:?}", token_path))?;
113      info!(path = ?token_path, "RPC auth token saved (mode 0600)");
114  
115  
116      // Initialize Token Ledger (Persistent)
117      let token_db_path = std::path::Path::new(&node_config.storage_path).join("token_db");
118      let ledger: Arc<tokio::sync::RwLock<dyn abzu_token::Ledger>> = match abzu_token::SledLedger::open(&token_db_path) {
119          Ok(l) => {
120              info!(path = ?token_db_path, "Persistent ledger opened");
121              Arc::new(tokio::sync::RwLock::new(l))
122          },
123          Err(e) => {
124              error!(error = ?e, "Failed to open persistent ledger, falling back to memory (DATA WILL BE LOST)");
125              Arc::new(tokio::sync::RwLock::new(abzu_token::MemoryLedger::new()))
126          }
127      };
128  
129      // Initialize Sovereign Agent Runtime
130      info!("Initializing Sovereign Agent Runtime...");
131      let agent_seed = agent_key.to_bytes();
132      let agent_identity = sovereign_agent::Agent::from_seed(&agent_seed)
133          .context("Failed to create agent identity")?;
134      let agent = Arc::new(sovereign_agent::agent::AgentRuntime::new(Arc::new(agent_identity), ledger).await);
135  
136      
137      // Start RPC server with authentication
138      let rpc_addr = start_rpc_server(&args.rpc_addr(), Arc::clone(&node), Arc::clone(&agent), shared_key, auth_token).await?;
139      info!(addr = %rpc_addr, "Control plane ready (authentication required)");
140  
141      // Bootstrap into the network
142      // By default, connects to Abzu Network Core + CLI peers
143      // Use --no-core for private/isolated networks
144      let mut bootstrap_config = BootstrapConfig::with_peers(args.peer.clone());
145      if args.no_core {
146          bootstrap_config.connect_to_core = false;
147          info!("Abzu Network Core disabled (--no-core). Running in private/isolated mode.");
148      }
149      let result = bootstrap(Arc::clone(&node), &shared_key, bootstrap_config).await;
150      
151      if result.connected == 0 && args.peer.is_empty() && !args.no_core {
152          warn!(
153              "Failed to connect to Abzu Network Core ({} failed). Running in isolated mode.", 
154              result.failed
155          );
156      } else if result.connected == 0 && !args.peer.is_empty() {
157          warn!(
158              "Failed to connect to any bootstrap peers ({} failed). Running in isolated mode.", 
159              result.failed
160          );
161      }
162  
163      // Spawn the main event loop with TCP/FakeTLS listener
164      let node_clone = Arc::clone(&node);
165      let listen_addr = args.listen_addr();
166      let event_loop = tokio::spawn(async move {
167          if let Err(e) = run_with_listener(node_clone, &listen_addr, shared_key).await {
168              error!(error = %e, "Event loop error");
169          }
170      });
171  
172      // Spawn WebSocket listener if configured
173      if let Some(ws_addr) = args.ws_listen_addr() {
174          let node_ws = Arc::clone(&node);
175          tokio::spawn(async move {
176              if let Err(e) = run_ws_listener(node_ws, &ws_addr).await {
177                  error!(error = %e, "WebSocket listener error");
178              }
179          });
180      }
181  
182      // Spawn sovereign STUN server (runs on port 3478 by default)
183      let stun_shutdown = Arc::new(AtomicBool::new(false));
184      let stun_shutdown_clone = Arc::clone(&stun_shutdown);
185      tokio::spawn(async move {
186          if let Err(e) = stun::run_stun_server(stun::StunServerConfig::default(), stun_shutdown_clone).await {
187              error!(error = %e, "STUN server error");
188          }
189      });
190  
191      // Wait for Ctrl+C
192      info!("Abzu node running. Press Ctrl+C to shutdown.");
193      
194      tokio::select! {
195          _ = tokio::signal::ctrl_c() => {
196              info!("Ctrl+C received, initiating shutdown...");
197          }
198          _ = event_loop => {
199              info!("Event loop exited");
200          }
201      }
202  
203      // Signal shutdown
204      node.signal_shutdown();
205      stun_shutdown.store(true, Ordering::Relaxed);
206  
207      // Give some time for graceful shutdown
208      tokio::time::sleep(std::time::Duration::from_millis(500)).await;
209  
210      info!("Abzu daemon stopped");
211      Ok(())
212  }
213  
214  /// Hex encode bytes
215  fn hex_encode(bytes: impl AsRef<[u8]>) -> String {
216      bytes.as_ref().iter().map(|b| format!("{:02x}", b)).collect()
217  }
218  
219  /// Run a WebSocket listener for browser/mobile clients
220  async fn run_ws_listener(node: Arc<Node>, addr: &str) -> Result<()> {
221      let addr: SocketAddr = addr.parse()?;
222      let listener = TcpListener::bind(addr).await?;
223      
224      info!(addr = %addr, "WebSocket listener bound");
225  
226      loop {
227          match listener.accept().await {
228              Ok((stream, peer_addr)) => {
229                  info!(peer = %peer_addr, "Accepted WebSocket connection");
230                  
231                  let node_clone = Arc::clone(&node);
232                  
233                  tokio::spawn(async move {
234                      // Perform Perfect Forward Secrecy handshake
235                      // This establishes a cryptographic identity (ephemeral key) and encrypted channel
236                      match WsStream::accept_pfs(stream).await {
237                          Ok(handshake) => {
238                              let peer_key = handshake.peer_pubkey;
239                              let conn = PeerConnection::new(Box::new(handshake.stream));
240                              
241                              node_clone.add_peer(peer_key, conn).await;
242                              
243                              info!(
244                                  peer = %peer_addr, 
245                                  pubkey = %hex_encode(peer_key), 
246                                  "Secure WebSocket peer registered"
247                              );
248                          }
249                          Err(e) => {
250                              warn!(peer = %peer_addr, error = %e, "Secure WebSocket handshake failed");
251                          }
252                      }
253                  });
254              }
255              Err(e) => {
256                  warn!(error = %e, "Failed to accept WebSocket connection");
257              }
258          }
259      }
260  }