main.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use sled_overlay::sled; 20 use smol::{stream::StreamExt, Executor}; 21 use std::sync::Arc; 22 use structopt_toml::StructOptToml; 23 use tracing::{debug, error, info, warn}; 24 25 use darkfi::{ 26 async_daemonize, 27 net::{session::SESSION_DEFAULT, P2p, Settings as NetSettings}, 28 rpc::{ 29 jsonrpc::JsonSubscriber, 30 server::{listen_and_serve, RequestHandler}, 31 settings::RpcSettings, 32 }, 33 system::{Publisher, StoppableTask}, 34 util::path::expand_path, 35 Error, Result, 36 }; 37 use fud::{ 38 proto::ProtocolFud, 39 rpc::JsonRpcInterface, 40 settings::{Args, CONFIG_FILE, CONFIG_FILE_CONTENTS}, 41 Fud, 42 }; 43 44 async_daemonize!(realmain); 45 async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> { 46 // The working directory for this daemon and geode. 47 let basedir = expand_path(&args.base_dir)?; 48 49 // Cloned args 50 let args_ = args.clone(); 51 52 // Sled database init 53 info!(target: "fud", "Instantiating database"); 54 let sled_db = sled::open(basedir.join("db"))?; 55 56 info!(target: "fud", "Instantiating P2P network"); 57 let net_settings: NetSettings = args.net.into(); 58 let p2p = P2p::new(net_settings.clone(), ex.clone()).await?; 59 60 info!(target: "fud", "Starting dnet subs task"); 61 let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); 62 let dnet_sub_ = dnet_sub.clone(); 63 let p2p_ = p2p.clone(); 64 let dnet_task = StoppableTask::new(); 65 dnet_task.clone().start( 66 async move { 67 let dnet_sub = p2p_.dnet_subscribe().await; 68 loop { 69 let event = dnet_sub.receive().await; 70 debug!("Got dnet event: {event:?}"); 71 dnet_sub_.notify(vec![event.into()].into()).await; 72 } 73 }, 74 |res| async { 75 match res { 76 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 77 Err(e) => panic!("{e}"), 78 } 79 }, 80 Error::DetachedTaskStopped, 81 ex.clone(), 82 ); 83 84 // Daemon instantiation 85 let event_pub = Publisher::new(); 86 87 let fud: Arc<Fud> = 88 Fud::new(args_, p2p.clone(), &sled_db, event_pub.clone(), ex.clone()).await?; 89 90 fud.start_tasks().await; 91 92 info!(target: "fud", "Starting event subs task"); 93 let event_sub = JsonSubscriber::new("event"); 94 let event_sub_ = event_sub.clone(); 95 let event_task = StoppableTask::new(); 96 event_task.clone().start( 97 async move { 98 let event_sub = event_pub.clone().subscribe().await; 99 loop { 100 let event = event_sub.receive().await; 101 debug!(target: "fud", "Got event: {event:?}"); 102 event_sub_.notify(event.into()).await; 103 } 104 }, 105 |res| async { 106 match res { 107 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 108 Err(e) => panic!("{e}"), 109 } 110 }, 111 Error::DetachedTaskStopped, 112 ex.clone(), 113 ); 114 115 let rpc_settings: RpcSettings = args.rpc.into(); 116 info!(target: "fud", "Starting JSON-RPC server on {}", rpc_settings.listen); 117 let rpc_interface = Arc::new(JsonRpcInterface::new(fud.clone(), dnet_sub, event_sub)); 118 let rpc_task = StoppableTask::new(); 119 let rpc_interface_ = rpc_interface.clone(); 120 rpc_task.clone().start( 121 listen_and_serve(rpc_settings, rpc_interface, None, ex.clone()), 122 |res| async move { 123 match res { 124 Ok(()) | Err(Error::RpcServerStopped) => rpc_interface_.stop_connections().await, 125 Err(e) => error!(target: "fud", "Failed starting sync JSON-RPC server: {e}"), 126 } 127 }, 128 Error::RpcServerStopped, 129 ex.clone(), 130 ); 131 132 info!(target: "fud", "Starting P2P protocols"); 133 let registry = p2p.protocol_registry(); 134 let fud_ = fud.clone(); 135 registry 136 .register(SESSION_DEFAULT, move |channel, p2p| { 137 let fud_ = fud_.clone(); 138 async move { ProtocolFud::init(fud_, channel, p2p).await.unwrap() } 139 }) 140 .await; 141 p2p.clone().start().await?; 142 143 let p2p_settings_lock = p2p.settings(); 144 let p2p_settings = p2p_settings_lock.read().await; 145 if p2p_settings.external_addrs.is_empty() { 146 warn!(target: "fud::realmain", "No external addresses, you won't be able to seed") 147 } 148 drop(p2p_settings); 149 150 // Signal handling for graceful termination. 151 let (signals_handler, signals_task) = SignalHandler::new(ex)?; 152 signals_handler.wait_termination(signals_task).await?; 153 info!(target: "fud", "Caught termination signal, cleaning up and exiting..."); 154 155 fud.stop().await; 156 157 info!(target: "fud", "Stopping JSON-RPC server..."); 158 rpc_task.stop().await; 159 160 info!(target: "fud", "Stopping P2P network..."); 161 p2p.stop().await; 162 163 info!(target: "fud", "Flushing sled database..."); 164 let flushed_bytes = sled_db.flush_async().await?; 165 info!(target: "fud", "Flushed {flushed_bytes} bytes"); 166 167 info!(target: "fud", "Shut down successfully"); 168 Ok(()) 169 }