/ bin / fud / fud / src / main.rs
main.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use sled_overlay::sled;
 20  use smol::{stream::StreamExt, Executor};
 21  use std::sync::Arc;
 22  use structopt_toml::StructOptToml;
 23  use tracing::{debug, error, info, warn};
 24  
 25  use darkfi::{
 26      async_daemonize,
 27      net::{session::SESSION_DEFAULT, P2p, Settings as NetSettings},
 28      rpc::{
 29          jsonrpc::JsonSubscriber,
 30          server::{listen_and_serve, RequestHandler},
 31          settings::RpcSettings,
 32      },
 33      system::{Publisher, StoppableTask},
 34      util::path::expand_path,
 35      Error, Result,
 36  };
 37  use fud::{
 38      proto::ProtocolFud,
 39      rpc::JsonRpcInterface,
 40      settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS},
 41      Fud,
 42  };
 43  
 44  async_daemonize!(realmain);
 45  async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
 46      // The working directory for this daemon and geode.
 47      let basedir = expand_path(&args.base_dir)?;
 48  
 49      // Cloned args
 50      let args_ = args.clone();
 51  
 52      // Sled database init
 53      info!(target: "fud", "Instantiating database");
 54      let sled_db = sled::open(basedir.join("db"))?;
 55  
 56      info!(target: "fud", "Instantiating P2P network");
 57      let net_settings: NetSettings = args.net.into();
 58      let p2p = P2p::new(net_settings.clone(), ex.clone()).await?;
 59  
 60      info!(target: "fud", "Starting dnet subs task");
 61      let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
 62      let dnet_sub_ = dnet_sub.clone();
 63      let p2p_ = p2p.clone();
 64      let dnet_task = StoppableTask::new();
 65      dnet_task.clone().start(
 66          async move {
 67              let dnet_sub = p2p_.dnet_subscribe().await;
 68              loop {
 69                  let event = dnet_sub.receive().await;
 70                  debug!("Got dnet event: {event:?}");
 71                  dnet_sub_.notify(vec![event.into()].into()).await;
 72              }
 73          },
 74          |res| async {
 75              match res {
 76                  Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
 77                  Err(e) => panic!("{e}"),
 78              }
 79          },
 80          Error::DetachedTaskStopped,
 81          ex.clone(),
 82      );
 83  
 84      // Daemon instantiation
 85      let event_pub = Publisher::new();
 86  
 87      let fud: Arc<Fud> =
 88          Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?;
 89  
 90      fud.start_tasks().await;
 91  
 92      info!(target: "fud", "Starting event subs task");
 93      let event_sub = JsonSubscriber::new("event");
 94      let event_sub_ = event_sub.clone();
 95      let event_task = StoppableTask::new();
 96      event_task.clone().start(
 97          async move {
 98              let event_sub = event_pub.clone().subscribe().await;
 99              loop {
100                  let event = event_sub.receive().await;
101                  debug!(target: "fud", "Got event: {event:?}");
102                  event_sub_.notify(event.into()).await;
103              }
104          },
105          |res| async {
106              match res {
107                  Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
108                  Err(e) => panic!("{e}"),
109              }
110          },
111          Error::DetachedTaskStopped,
112          ex.clone(),
113      );
114  
115      let rpc_settings: RpcSettings = args.rpc.into();
116      info!(target: "fud", "Starting JSON-RPC server on {}", rpc_settings.listen);
117      let rpc_interface = Arc::new(JsonRpcInterface::new(fud.clone(), dnet_sub, event_sub));
118      let rpc_task = StoppableTask::new();
119      let rpc_interface_ = rpc_interface.clone();
120      rpc_task.clone().start(
121          listen_and_serve(rpc_settings, rpc_interface, None, ex.clone()),
122          |res| async move {
123              match res {
124                  Ok(()) | Err(Error::RpcServerStopped) => rpc_interface_.stop_connections().await,
125                  Err(e) => error!(target: "fud", "Failed starting sync JSON-RPC server: {e}"),
126              }
127          },
128          Error::RpcServerStopped,
129          ex.clone(),
130      );
131  
132      info!(target: "fud", "Starting P2P protocols");
133      let registry = p2p.protocol_registry();
134      let fud_ = fud.clone();
135      registry
136          .register(SESSION_DEFAULT, move |channel, p2p| {
137              let fud_ = fud_.clone();
138              async move { ProtocolFud::init(fud_, channel, p2p).await.unwrap() }
139          })
140          .await;
141      p2p.clone().start().await?;
142  
143      let p2p_settings_lock = p2p.settings();
144      let p2p_settings = p2p_settings_lock.read().await;
145      if p2p_settings.external_addrs.is_empty() {
146          warn!(target: "fud::realmain", "No external addresses, you won't be able to seed")
147      }
148      drop(p2p_settings);
149  
150      // Signal handling for graceful termination.
151      let (signals_handler, signals_task) = SignalHandler::new(ex)?;
152      signals_handler.wait_termination(signals_task).await?;
153      info!(target: "fud", "Caught termination signal, cleaning up and exiting...");
154  
155      fud.stop().await;
156  
157      info!(target: "fud", "Stopping JSON-RPC server...");
158      rpc_task.stop().await;
159  
160      info!(target: "fud", "Stopping P2P network...");
161      p2p.stop().await;
162  
163      info!(target: "fud", "Flushing sled database...");
164      let flushed_bytes = sled_db.flush_async().await?;
165      info!(target: "fud", "Flushed {flushed_bytes} bytes");
166  
167      info!(target: "fud", "Shut down successfully");
168      Ok(())
169  }