/ bin / lilith / src / main.rs
main.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::{
 20      collections::{HashMap, HashSet},
 21      process::exit,
 22      sync::Arc,
 23      time::UNIX_EPOCH,
 24  };
 25  
 26  use async_trait::async_trait;
 27  use semver::Version;
 28  use smol::{
 29      lock::{Mutex, MutexGuard},
 30      stream::StreamExt,
 31      Executor,
 32  };
 33  use structopt::StructOpt;
 34  use structopt_toml::StructOptToml;
 35  use tinyjson::JsonValue;
 36  use toml::Value;
 37  use tracing::{debug, error, info, warn};
 38  use url::Url;
 39  
 40  use darkfi::{
 41      async_daemonize, cli_desc,
 42      net::{self, hosts::HostColor, settings::BanPolicy, P2p, P2pPtr},
 43      rpc::{
 44          jsonrpc::*,
 45          server::{listen_and_serve, RequestHandler},
 46          settings::{RpcSettings, RpcSettingsOpt},
 47      },
 48      system::{sleep, StoppableTask, StoppableTaskPtr},
 49      util::path::get_config_path,
 50      Error, Result,
 51  };
 52  
 53  const CONFIG_FILE: &str = "lilith_config.toml";
 54  const CONFIG_FILE_CONTENTS: &str = include_str!("../lilith_config.toml");
 55  
 56  #[derive(Clone, Debug, serde::Deserialize, StructOpt, StructOptToml)]
 57  #[serde(default)]
 58  #[structopt(name = "lilith", about = cli_desc!())]
 59  struct Args {
 60      #[structopt(flatten)]
 61      /// JSON-RPC settings
 62      rpc: RpcSettingsOpt,
 63  
 64      #[structopt(short, long)]
 65      /// Configuration file to use
 66      config: Option<String>,
 67  
 68      #[structopt(short, long)]
 69      /// Set log file to ouput into
 70      log: Option<String>,
 71  
 72      #[structopt(short, parse(from_occurrences))]
 73      /// Increase verbosity (-vvv supported)
 74      verbose: u8,
 75  
 76      #[structopt(long, default_value = "120")]
 77      /// Interval after which to check whitelist peers
 78      whitelist_refinery_interval: u64,
 79  }
 80  
 81  /// Struct representing a spawned P2P network
 82  struct Spawn {
 83      /// String identifier,
 84      pub name: String,
 85      /// P2P pointer
 86      pub p2p: P2pPtr,
 87  }
 88  
 89  impl Spawn {
 90      async fn get_whitelist(&self) -> Vec<JsonValue> {
 91          self.p2p
 92              .hosts()
 93              .container
 94              .fetch_all(HostColor::White)
 95              .iter()
 96              .map(|(addr, _url)| JsonValue::String(addr.to_string()))
 97              .collect()
 98      }
 99  
100      async fn get_greylist(&self) -> Vec<JsonValue> {
101          self.p2p
102              .hosts()
103              .container
104              .fetch_all(HostColor::Grey)
105              .iter()
106              .map(|(addr, _url)| JsonValue::String(addr.to_string()))
107              .collect()
108      }
109  
110      async fn get_goldlist(&self) -> Vec<JsonValue> {
111          self.p2p
112              .hosts()
113              .container
114              .fetch_all(HostColor::Gold)
115              .iter()
116              .map(|(addr, _url)| JsonValue::String(addr.to_string()))
117              .collect()
118      }
119  
120      async fn info(&self) -> JsonValue {
121          let mut addr_vec = vec![];
122          for addr in &self.p2p.settings().read().await.inbound_addrs {
123              addr_vec.push(JsonValue::String(addr.as_ref().to_string()));
124          }
125  
126          JsonValue::Object(HashMap::from([
127              ("name".to_string(), JsonValue::String(self.name.clone())),
128              ("urls".to_string(), JsonValue::Array(addr_vec)),
129              ("whitelist".to_string(), JsonValue::Array(self.get_whitelist().await)),
130              ("greylist".to_string(), JsonValue::Array(self.get_greylist().await)),
131              ("goldlist".to_string(), JsonValue::Array(self.get_goldlist().await)),
132          ]))
133      }
134  }
135  
136  /// Defines the network-specific settings
137  #[derive(Clone)]
138  struct NetInfo {
139      /// Accept addresses the network will use
140      pub accept_addrs: Vec<Url>,
141      /// Other seeds to connect to
142      pub seeds: Vec<Url>,
143      /// Manual peers to connect to
144      pub peers: Vec<Url>,
145      /// Supported network version
146      pub version: Version,
147      /// Enable localnet hosts
148      pub localnet: bool,
149      /// Path to P2P datastore
150      pub datastore: String,
151      /// Path to hostlist
152      pub hostlist: String,
153  }
154  
155  /// Struct representing the daemon
156  struct Lilith {
157      /// Spawned networks
158      pub networks: Vec<Spawn>,
159      /// JSON-RPC connection tracker
160      pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
161  }
162  
163  impl Lilith {
164      /// Since `Lilith` does not make outbound connections, if a peer is
165      /// upgraded to whitelist it will remain on the whitelist even if the
166      /// give peer is no longer online.
167      ///
168      /// To protect `Lilith` from sharing potentially offline nodes,
169      /// `whitelist_refinery` periodically ping nodes on the whitelist. If they
170      /// are reachable, we update their last seen field. Otherwise, we downgrade
171      /// them to the greylist.
172      ///
173      /// Note: if `Lilith` loses connectivity this method will delete peers from
174      /// the whitelist, meaning `Lilith` will need to rebuild its hostlist when
175      /// it comes back online.
176      async fn whitelist_refinery(
177          network_name: String,
178          p2p: P2pPtr,
179          refinery_interval: u64,
180      ) -> Result<()> {
181          debug!(target: "net::refinery::whitelist_refinery", "Starting whitelist refinery for \"{network_name}\"");
182  
183          let hosts = p2p.hosts();
184  
185          loop {
186              sleep(refinery_interval).await;
187  
188              match hosts.container.fetch_last(HostColor::White) {
189                  Some(entry) => {
190                      let url = &entry.0;
191                      let last_seen = &entry.1;
192  
193                      if !hosts.refinable(url.clone()) {
194                          debug!(target: "net::refinery::whitelist_refinery", "Addr={} not available!",
195                         url.clone());
196  
197                          continue
198                      }
199  
200                      if !p2p.session_refine().handshake_node(url.clone(), p2p.clone()).await {
201                          debug!(target: "net::refinery:::whitelist_refinery",
202                         "Host {url} is not responsive. Downgrading from whitelist");
203  
204                          hosts.greylist_host(url, *last_seen).await?;
205  
206                          continue
207                      }
208  
209                      debug!(target: "net::refinery::whitelist_refinery",
210                     "Peer {url} is responsive. Updating last_seen");
211  
212                      // This node is active. Update the last seen field.
213                      let last_seen = UNIX_EPOCH.elapsed().unwrap().as_secs();
214  
215                      hosts.whitelist_host(url, last_seen).await?;
216                  }
217                  None => {
218                      debug!(target: "net::refinery::whitelist_refinery",
219                                "Whitelist is empty! Cannot start refinery process");
220  
221                      continue
222                  }
223              }
224          }
225      }
226      // RPCAPI:
227      // Returns all spawned networks names with their node addresses.
228      // --> {"jsonrpc": "2.0", "method": "spawns", "params": [], "id": 42}
229      // <-- {"jsonrpc": "2.0", "result": {"spawns": spawns_info}, "id": 42}
230      async fn spawns(&self, id: u16, _params: JsonValue) -> JsonResult {
231          let mut spawns = vec![];
232          for spawn in &self.networks {
233              spawns.push(spawn.info().await);
234          }
235  
236          let json =
237              JsonValue::Object(HashMap::from([("spawns".to_string(), JsonValue::Array(spawns))]));
238  
239          JsonResponse::new(json, id).into()
240      }
241  }
242  
243  #[async_trait]
244  impl RequestHandler<()> for Lilith {
245      async fn handle_request(&self, req: JsonRequest) -> JsonResult {
246          return match req.method.as_str() {
247              "ping" => self.pong(req.id, req.params).await,
248              "spawns" => self.spawns(req.id, req.params).await,
249              _ => JsonError::new(ErrorCode::MethodNotFound, None, req.id).into(),
250          }
251      }
252  
253      async fn connections_mut(&self) -> MutexGuard<'life0, HashSet<StoppableTaskPtr>> {
254          self.rpc_connections.lock().await
255      }
256  }
257  
258  /// Parse a TOML string for any configured network and return a map containing
259  /// said configurations.
260  fn parse_configured_networks(data: &str) -> Result<HashMap<String, NetInfo>> {
261      let mut ret = HashMap::new();
262  
263      if let Value::Table(map) = toml::from_str(data)? {
264          if map.contains_key("network") && map["network"].is_table() {
265              for net in map["network"].as_table().unwrap() {
266                  info!(target: "lilith", "Found configuration for network: {}", net.0);
267                  let table = net.1.as_table().unwrap();
268                  if !table.contains_key("accept_addrs") {
269                      warn!(target: "lilith", "Network accept addrs are mandatory, skipping network.");
270                      continue
271                  }
272  
273                  if !table.contains_key("hostlist") {
274                      error!(target: "lilith", "Hostlist path is mandatory! Configure and try again.");
275                      exit(1)
276                  }
277  
278                  let name = net.0.to_string();
279                  let accept_addrs: Vec<Url> = table["accept_addrs"]
280                      .as_array()
281                      .unwrap()
282                      .iter()
283                      .map(|x| Url::parse(x.as_str().unwrap()).unwrap())
284                      .collect();
285  
286                  let mut seeds = vec![];
287                  if table.contains_key("seeds") {
288                      if let Some(s) = table["seeds"].as_array() {
289                          for seed in s {
290                              if let Some(u) = seed.as_str() {
291                                  if let Ok(url) = Url::parse(u) {
292                                      seeds.push(url);
293                                  }
294                              }
295                          }
296                      }
297                  }
298  
299                  let mut peers = vec![];
300                  if table.contains_key("peers") {
301                      if let Some(p) = table["peers"].as_array() {
302                          for peer in p {
303                              if let Some(u) = peer.as_str() {
304                                  if let Ok(url) = Url::parse(u) {
305                                      peers.push(url);
306                                  }
307                              }
308                          }
309                      }
310                  }
311  
312                  let localnet = if table.contains_key("localnet") {
313                      table["localnet"].as_bool().unwrap()
314                  } else {
315                      false
316                  };
317  
318                  let version = if table.contains_key("version") {
319                      semver::Version::parse(table["version"].as_str().unwrap())?
320                  } else {
321                      semver::Version::parse(option_env!("CARGO_PKG_VERSION").unwrap_or("0.0.0"))?
322                  };
323  
324                  let datastore: String = table["datastore"].as_str().unwrap().to_string();
325  
326                  let hostlist: String = table["hostlist"].as_str().unwrap().to_string();
327  
328                  let net_info =
329                      NetInfo { accept_addrs, seeds, peers, version, localnet, datastore, hostlist };
330                  ret.insert(name, net_info);
331              }
332          }
333      }
334  
335      Ok(ret)
336  }
337  
338  async fn spawn_net(name: String, info: &NetInfo, ex: Arc<Executor<'static>>) -> Result<Spawn> {
339      let mut listen_urls = vec![];
340  
341      // Configure listen addrs for this network
342      for url in &info.accept_addrs {
343          listen_urls.push(url.clone());
344      }
345  
346      // P2P network settings
347      let settings = net::Settings {
348          inbound_addrs: listen_urls.clone(),
349          seeds: info.seeds.clone(),
350          peers: info.peers.clone(),
351          outbound_connections: 0,
352          outbound_connect_timeout: 30,
353          inbound_connections: 512,
354          app_version: info.version.clone(),
355          localnet: info.localnet,
356          p2p_datastore: Some(info.datastore.clone()),
357          hostlist: Some(info.hostlist.clone()),
358          allowed_transports: vec![
359              "tcp".to_string(),
360              "tcp+tls".to_string(),
361              "tor".to_string(),
362              "tor+tls".to_string(),
363              "nym".to_string(),
364              "nym+tls".to_string(),
365              "i2p".to_string(),
366              "i2p+tls".to_string(),
367          ],
368          ban_policy: BanPolicy::Relaxed,
369          ..Default::default()
370      };
371  
372      // Create P2P instance
373      let p2p = P2p::new(settings, ex.clone()).await?;
374  
375      let addrs_str: Vec<&str> = listen_urls.iter().map(|x| x.as_str()).collect();
376      info!(target: "lilith", "Starting seed network node for \"{name}\" on {addrs_str:?}");
377      p2p.clone().start().await?;
378  
379      let spawn = Spawn { name, p2p };
380      Ok(spawn)
381  }
382  
383  async_daemonize!(realmain);
384  async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
385      // Pick up network settings from the TOML config
386      let cfg_path = get_config_path(args.config, CONFIG_FILE)?;
387      let toml_contents = std::fs::read_to_string(cfg_path)?;
388      let configured_nets = parse_configured_networks(&toml_contents)?;
389  
390      if configured_nets.is_empty() {
391          error!(target: "lilith", "No networks are enabled in config");
392          exit(1);
393      }
394  
395      // Spawn configured networks
396      let mut networks = vec![];
397      for (name, info) in &configured_nets {
398          match spawn_net(name.to_string(), info, ex.clone()).await {
399              Ok(spawn) => networks.push(spawn),
400              Err(e) => {
401                  error!(target: "lilith", "Failed to start P2P network seed for \"{name}\": {e}");
402                  exit(1);
403              }
404          }
405      }
406  
407      // Set up main daemon and background refinery_tasks
408      let lilith = Arc::new(Lilith { networks, rpc_connections: Mutex::new(HashSet::new()) });
409      let mut refinery_tasks = HashMap::new();
410      for network in &lilith.networks {
411          let name = network.name.clone();
412          let task = StoppableTask::new();
413          task.clone().start(
414              Lilith::whitelist_refinery(name.clone(), network.p2p.clone(), args.whitelist_refinery_interval),
415              |res| async move {
416                  match res {
417                      Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
418                      Err(e) => error!(target: "lilith", "Failed starting refinery task for \"{name}\": {e}"),
419                  }
420              },
421              Error::DetachedTaskStopped,
422              ex.clone(),
423          );
424          refinery_tasks.insert(network.name.clone(), task);
425      }
426  
427      // JSON-RPC server
428      let rpc_settings: RpcSettings = args.rpc.into();
429      info!(target: "lilith", "Starting JSON-RPC server on {}", rpc_settings.listen);
430      let lilith_ = lilith.clone();
431      let rpc_task = StoppableTask::new();
432      rpc_task.clone().start(
433          listen_and_serve(rpc_settings, lilith.clone(), None, ex.clone()),
434          |res| async move {
435              match res {
436                  Ok(()) | Err(Error::RpcServerStopped) => lilith_.stop_connections().await,
437                  Err(e) => error!(target: "lilith", "Failed starting JSON-RPC server: {e}"),
438              }
439          },
440          Error::RpcServerStopped,
441          ex.clone(),
442      );
443  
444      // Signal handling for graceful termination.
445      let (signals_handler, signals_task) = SignalHandler::new(ex)?;
446      signals_handler.wait_termination(signals_task).await?;
447      info!(target: "lilith", "Caught termination signal, cleaning up and exiting...");
448  
449      info!(target: "lilith", "Stopping JSON-RPC server...");
450      rpc_task.stop().await;
451  
452      // Cleanly stop p2p networks
453      for spawn in &lilith.networks {
454          info!(target: "lilith", "Stopping \"{}\" task", spawn.name);
455          refinery_tasks.get(&spawn.name).unwrap().stop().await;
456          info!(target: "lilith", "Stopping \"{}\" P2P", spawn.name);
457          spawn.p2p.stop().await;
458      }
459  
460      info!(target: "lilith", "Bye!");
461      Ok(())
462  }