main.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{ 20 collections::{HashMap, HashSet}, 21 process::exit, 22 sync::Arc, 23 time::UNIX_EPOCH, 24 }; 25 26 use async_trait::async_trait; 27 use semver::Version; 28 use smol::{ 29 lock::{Mutex, MutexGuard}, 30 stream::StreamExt, 31 Executor, 32 }; 33 use structopt::StructOpt; 34 use structopt_toml::StructOptToml; 35 use tinyjson::JsonValue; 36 use toml::Value; 37 use tracing::{debug, error, info, warn}; 38 use url::Url; 39 40 use darkfi::{ 41 async_daemonize, cli_desc, 42 net::{self, hosts::HostColor, settings::BanPolicy, P2p, P2pPtr}, 43 rpc::{ 44 jsonrpc::*, 45 server::{listen_and_serve, RequestHandler}, 46 settings::{RpcSettings, RpcSettingsOpt}, 47 }, 48 system::{sleep, StoppableTask, StoppableTaskPtr}, 49 util::path::get_config_path, 50 Error, Result, 51 }; 52 53 const CONFIG_FILE: &str = "lilith_config.toml"; 54 const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml"); 55 56 #[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)] 57 #[serde(default)] 58 #[structopt(name = "lilith", about = cli_desc!())] 59 struct Args { 60 #[structopt(flatten)] 61 /// JSON-RPC settings 62 rpc: RpcSettingsOpt, 63 64 #[structopt(short, long)] 65 /// Configuration file to use 66 config: Option<String>, 67 68 #[structopt(short, long)] 69 /// Set log file to ouput into 70 log: Option<String>, 71 72 #[structopt(short, parse(from_occurrences))] 73 /// Increase verbosity (-vvv supported) 74 verbose: u8, 75 76 #[structopt(long, default_value = "120")] 77 /// Interval after which to check whitelist peers 78 whitelist_refinery_interval: u64, 79 } 80 81 /// Struct representing a spawned P2P network 82 struct Spawn { 83 /// String identifier, 84 pub name: String, 85 /// P2P pointer 86 pub p2p: P2pPtr, 87 } 88 89 impl Spawn { 90 async fn get_whitelist(&self) -> Vec<JsonValue> { 91 self.p2p 92 .hosts() 93 .container 94 .fetch_all(HostColor::White) 95 .iter() 96 .map(|(addr, _url)| JsonValue::String(addr.to_string())) 97 .collect() 98 } 99 100 async fn get_greylist(&self) -> Vec<JsonValue> { 101 self.p2p 102 .hosts() 103 .container 104 .fetch_all(HostColor::Grey) 105 .iter() 106 .map(|(addr, _url)| JsonValue::String(addr.to_string())) 107 .collect() 108 } 109 110 async fn get_goldlist(&self) -> Vec<JsonValue> { 111 self.p2p 112 .hosts() 113 .container 114 .fetch_all(HostColor::Gold) 115 .iter() 116 .map(|(addr, _url)| JsonValue::String(addr.to_string())) 117 .collect() 118 } 119 120 async fn info(&self) -> JsonValue { 121 let mut addr_vec = vec![]; 122 for addr in &self.p2p.settings().read().await.inbound_addrs { 123 addr_vec.push(JsonValue::String(addr.as_ref().to_string())); 124 } 125 126 JsonValue::Object(HashMap::from([ 127 ("name".to_string(), JsonValue::String(self.name.clone())), 128 ("urls".to_string(), JsonValue::Array(addr_vec)), 129 ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)), 130 ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)), 131 ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)), 132 ])) 133 } 134 } 135 136 /// Defines the network-specific settings 137 #[derive(Clone)] 138 struct NetInfo { 139 /// Accept addresses the network will use 140 pub accept_addrs: Vec<Url>, 141 /// Other seeds to connect to 142 pub seeds: Vec<Url>, 143 /// Manual peers to connect to 144 pub peers: Vec<Url>, 145 /// Supported network version 146 pub version: Version, 147 /// Enable localnet hosts 148 pub localnet: bool, 149 /// Path to P2P datastore 150 pub datastore: String, 151 /// Path to hostlist 152 pub hostlist: String, 153 } 154 155 /// Struct representing the daemon 156 struct Lilith { 157 /// Spawned networks 158 pub networks: Vec<Spawn>, 159 /// JSON-RPC connection tracker 160 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 161 } 162 163 impl Lilith { 164 /// Since `Lilith` does not make outbound connections, if a peer is 165 /// upgraded to whitelist it will remain on the whitelist even if the 166 /// give peer is no longer online. 167 /// 168 /// To protect `Lilith` from sharing potentially offline nodes, 169 /// `whitelist_refinery` periodically ping nodes on the whitelist. If they 170 /// are reachable, we update their last seen field. Otherwise, we downgrade 171 /// them to the greylist. 172 /// 173 /// Note: if `Lilith` loses connectivity this method will delete peers from 174 /// the whitelist, meaning `Lilith` will need to rebuild its hostlist when 175 /// it comes back online. 176 async fn whitelist_refinery( 177 network_name: String, 178 p2p: P2pPtr, 179 refinery_interval: u64, 180 ) -> Result<()> { 181 debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\""); 182 183 let hosts = p2p.hosts(); 184 185 loop { 186 sleep(refinery_interval).await; 187 188 match hosts.container.fetch_last(HostColor::White) { 189 Some(entry) => { 190 let url = &entry.0; 191 let last_seen = &entry.1; 192 193 if !hosts.refinable(url.clone()) { 194 debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!", 195 url.clone()); 196 197 continue 198 } 199 200 if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await { 201 debug!(target: "net::refinery:::whitelist_refinery", 202 "Host {url} is not responsive. Downgrading from whitelist"); 203 204 hosts.greylist_host(url, *last_seen).await?; 205 206 continue 207 } 208 209 debug!(target: "net::refinery::whitelist_refinery", 210 "Peer {url} is responsive. Updating last_seen"); 211 212 // This node is active. Update the last seen field. 213 let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs(); 214 215 hosts.whitelist_host(url, last_seen).await?; 216 } 217 None => { 218 debug!(target: "net::refinery::whitelist_refinery", 219 "Whitelist is empty! Cannot start refinery process"); 220 221 continue 222 } 223 } 224 } 225 } 226 // RPCAPI: 227 // Returns all spawned networks names with their node addresses. 228 // --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42} 229 // <-- {"jsonrpc": "2.0", "result": {"spawns": spawns_info}, "id": 42} 230 async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult { 231 let mut spawns = vec![]; 232 for spawn in &self.networks { 233 spawns.push(spawn.info().await); 234 } 235 236 let json = 237 JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))])); 238 239 JsonResponse::new(json, id).into() 240 } 241 } 242 243 #[async_trait] 244 impl RequestHandler<()> for Lilith { 245 async fn handle_request(&self, req: JsonRequest) -> JsonResult { 246 return match req.method.as_str() { 247 "ping" => self.pong(req.id, req.params).await, 248 "spawns" => self.spawns(req.id, req.params).await, 249 _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(), 250 } 251 } 252 253 async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> { 254 self.rpc_connections.lock().await 255 } 256 } 257 258 /// Parse a TOML string for any configured network and return a map containing 259 /// said configurations. 260 fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> { 261 let mut ret = HashMap::new(); 262 263 if let Value::Table(map) = toml::from_str(data)? { 264 if map.contains_key("network") && map["network"].is_table() { 265 for net in map["network"].as_table().unwrap() { 266 info!(target: "lilith", "Found configuration for network: {}", net.0); 267 let table = net.1.as_table().unwrap(); 268 if !table.contains_key("accept_addrs") { 269 warn!(target: "lilith", "Network accept addrs are mandatory, skipping network."); 270 continue 271 } 272 273 if !table.contains_key("hostlist") { 274 error!(target: "lilith", "Hostlist path is mandatory! Configure and try again."); 275 exit(1) 276 } 277 278 let name = net.0.to_string(); 279 let accept_addrs: Vec<Url> = table["accept_addrs"] 280 .as_array() 281 .unwrap() 282 .iter() 283 .map(|x| Url::parse(x.as_str().unwrap()).unwrap()) 284 .collect(); 285 286 let mut seeds = vec![]; 287 if table.contains_key("seeds") { 288 if let Some(s) = table["seeds"].as_array() { 289 for seed in s { 290 if let Some(u) = seed.as_str() { 291 if let Ok(url) = Url::parse(u) { 292 seeds.push(url); 293 } 294 } 295 } 296 } 297 } 298 299 let mut peers = vec![]; 300 if table.contains_key("peers") { 301 if let Some(p) = table["peers"].as_array() { 302 for peer in p { 303 if let Some(u) = peer.as_str() { 304 if let Ok(url) = Url::parse(u) { 305 peers.push(url); 306 } 307 } 308 } 309 } 310 } 311 312 let localnet = if table.contains_key("localnet") { 313 table["localnet"].as_bool().unwrap() 314 } else { 315 false 316 }; 317 318 let version = if table.contains_key("version") { 319 semver::Version::parse(table["version"].as_str().unwrap())? 320 } else { 321 semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))? 322 }; 323 324 let datastore: String = table["datastore"].as_str().unwrap().to_string(); 325 326 let hostlist: String = table["hostlist"].as_str().unwrap().to_string(); 327 328 let net_info = 329 NetInfo { accept_addrs, seeds, peers, version, localnet, datastore, hostlist }; 330 ret.insert(name, net_info); 331 } 332 } 333 } 334 335 Ok(ret) 336 } 337 338 async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> { 339 let mut listen_urls = vec![]; 340 341 // Configure listen addrs for this network 342 for url in &info.accept_addrs { 343 listen_urls.push(url.clone()); 344 } 345 346 // P2P network settings 347 let settings = net::Settings { 348 inbound_addrs: listen_urls.clone(), 349 seeds: info.seeds.clone(), 350 peers: info.peers.clone(), 351 outbound_connections: 0, 352 outbound_connect_timeout: 30, 353 inbound_connections: 512, 354 app_version: info.version.clone(), 355 localnet: info.localnet, 356 p2p_datastore: Some(info.datastore.clone()), 357 hostlist: Some(info.hostlist.clone()), 358 allowed_transports: vec![ 359 "tcp".to_string(), 360 "tcp+tls".to_string(), 361 "tor".to_string(), 362 "tor+tls".to_string(), 363 "nym".to_string(), 364 "nym+tls".to_string(), 365 "i2p".to_string(), 366 "i2p+tls".to_string(), 367 ], 368 ban_policy: BanPolicy::Relaxed, 369 ..Default::default() 370 }; 371 372 // Create P2P instance 373 let p2p = P2p::new(settings, ex.clone()).await?; 374 375 let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect(); 376 info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}"); 377 p2p.clone().start().await?; 378 379 let spawn = Spawn { name, p2p }; 380 Ok(spawn) 381 } 382 383 async_daemonize!(realmain); 384 async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> { 385 // Pick up network settings from the TOML config 386 let cfg_path = get_config_path(args.config, CONFIG_FILE)?; 387 let toml_contents = std::fs::read_to_string(cfg_path)?; 388 let configured_nets = parse_configured_networks(&toml_contents)?; 389 390 if configured_nets.is_empty() { 391 error!(target: "lilith", "No networks are enabled in config"); 392 exit(1); 393 } 394 395 // Spawn configured networks 396 let mut networks = vec![]; 397 for (name, info) in &configured_nets { 398 match spawn_net(name.to_string(), info, ex.clone()).await { 399 Ok(spawn) => networks.push(spawn), 400 Err(e) => { 401 error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}"); 402 exit(1); 403 } 404 } 405 } 406 407 // Set up main daemon and background refinery_tasks 408 let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) }); 409 let mut refinery_tasks = HashMap::new(); 410 for network in &lilith.networks { 411 let name = network.name.clone(); 412 let task = StoppableTask::new(); 413 task.clone().start( 414 Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval), 415 |res| async move { 416 match res { 417 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 418 Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"), 419 } 420 }, 421 Error::DetachedTaskStopped, 422 ex.clone(), 423 ); 424 refinery_tasks.insert(network.name.clone(), task); 425 } 426 427 // JSON-RPC server 428 let rpc_settings: RpcSettings = args.rpc.into(); 429 info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen); 430 let lilith_ = lilith.clone(); 431 let rpc_task = StoppableTask::new(); 432 rpc_task.clone().start( 433 listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()), 434 |res| async move { 435 match res { 436 Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await, 437 Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"), 438 } 439 }, 440 Error::RpcServerStopped, 441 ex.clone(), 442 ); 443 444 // Signal handling for graceful termination. 445 let (signals_handler, signals_task) = SignalHandler::new(ex)?; 446 signals_handler.wait_termination(signals_task).await?; 447 info!(target: "lilith", "Caught termination signal, cleaning up and exiting..."); 448 449 info!(target: "lilith", "Stopping JSON-RPC server..."); 450 rpc_task.stop().await; 451 452 // Cleanly stop p2p networks 453 for spawn in &lilith.networks { 454 info!(target: "lilith", "Stopping \"{}\" task", spawn.name); 455 refinery_tasks.get(&spawn.name).unwrap().stop().await; 456 info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name); 457 spawn.p2p.stop().await; 458 } 459 460 info!(target: "lilith", "Bye!"); 461 Ok(()) 462 }