main.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 // ANCHOR: imports 20 use smol::{lock::Mutex, stream::StreamExt}; 21 use std::{collections::HashSet, sync::Arc}; 22 use tracing::{debug, error, info}; 23 24 use darkfi::{ 25 async_daemonize, cli_desc, net, 26 net::settings::SettingsOpt, 27 rpc::{ 28 jsonrpc::JsonSubscriber, 29 server::{listen_and_serve, RequestHandler}, 30 settings::{RpcSettings, RpcSettingsOpt}, 31 }, 32 system::{StoppableTask, StoppableTaskPtr}, 33 Error, Result, 34 }; 35 36 use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; 37 38 use crate::{ 39 dchatmsg::{DchatMsg, DchatMsgsBuffer}, 40 protocol_dchat::ProtocolDchat, 41 }; 42 // ANCHOR_END: imports 43 44 pub mod dchat_error; 45 pub mod dchatmsg; 46 pub mod protocol_dchat; 47 pub mod rpc; 48 49 const CONFIG_FILE: &str = "dchatd_config.toml"; 50 const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml"); 51 52 // ANCHOR: args 53 #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] 54 #[serde(default)] 55 #[structopt(name = "dchat", about = cli_desc!())] 56 struct Args { 57 #[structopt(flatten)] 58 /// JSON-RPC settings 59 rpc: RpcSettingsOpt, 60 61 #[structopt(short, long)] 62 /// Configuration file to use 63 config: Option<String>, 64 65 #[structopt(short, long)] 66 /// Set log file to ouput into 67 log: Option<String>, 68 69 #[structopt(short, parse(from_occurrences))] 70 /// Increase verbosity (-vvv supported) 71 verbose: u8, 72 73 #[structopt(flatten)] 74 /// P2P network settings 75 net: SettingsOpt, 76 } 77 // ANCHOR_END: args 78 79 // ANCHOR: dchat 80 struct Dchat { 81 p2p: net::P2pPtr, 82 recv_msgs: DchatMsgsBuffer, 83 pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 84 pub dnet_sub: JsonSubscriber, 85 } 86 87 impl Dchat { 88 fn new( 89 p2p: net::P2pPtr, 90 recv_msgs: DchatMsgsBuffer, 91 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 92 dnet_sub: JsonSubscriber, 93 ) -> Self { 94 Self { p2p, recv_msgs, rpc_connections, dnet_sub } 95 } 96 } 97 // ANCHOR_END: dchat 98 99 // ANCHOR: main 100 async_daemonize!(realmain); 101 async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> { 102 let p2p = net::P2p::new(args.net.into(), ex.clone()).await?; 103 104 // ANCHOR: dnet 105 info!("Starting dnet subs task"); 106 let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); 107 let dnet_sub_ = dnet_sub.clone(); 108 let p2p_ = p2p.clone(); 109 let dnet_task = StoppableTask::new(); 110 dnet_task.clone().start( 111 async move { 112 let dnet_sub = p2p_.dnet_subscribe().await; 113 loop { 114 let event = dnet_sub.receive().await; 115 debug!("Got dnet event: {event:?}"); 116 dnet_sub_.notify(vec![event.into()].into()).await; 117 } 118 }, 119 |res| async { 120 match res { 121 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 122 Err(e) => panic!("{e}"), 123 } 124 }, 125 Error::DetachedTaskStopped, 126 ex.clone(), 127 ); 128 // ANCHOR_end: dnet 129 130 // ANCHOR: rpc 131 let rpc_settings: RpcSettings = args.rpc.into(); 132 info!("Starting JSON-RPC server on port {}", rpc_settings.listen); 133 let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }])); 134 let rpc_connections = Mutex::new(HashSet::new()); 135 let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub)); 136 let _ex = ex.clone(); 137 138 let rpc_task = StoppableTask::new(); 139 rpc_task.clone().start( 140 listen_and_serve(rpc_settings, dchat.clone(), None, ex.clone()), 141 |res| async move { 142 match res { 143 Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await, 144 Err(e) => error!("Failed stopping JSON-RPC server: {e}"), 145 } 146 }, 147 Error::RpcServerStopped, 148 ex.clone(), 149 ); 150 // ANCHOR_end: rpc 151 152 // ANCHOR: register_protocol 153 info!("Registering Dchat protocol"); 154 let registry = p2p.protocol_registry(); 155 registry 156 .register(net::session::SESSION_DEFAULT, move |channel, _p2p| { 157 let msgs_ = msgs.clone(); 158 async move { ProtocolDchat::init(channel, msgs_).await } 159 }) 160 .await; 161 // ANCHOR_END: register_protocol 162 163 // ANCHOR: p2p_start 164 info!("Starting P2P network"); 165 p2p.clone().start().await?; 166 // ANCHOR_END: p2p_start 167 168 // ANCHOR: shutdown 169 let (signals_handler, signals_task) = SignalHandler::new(ex)?; 170 signals_handler.wait_termination(signals_task).await?; 171 info!("Caught termination signal, cleaning up and exiting..."); 172 173 info!("Stopping JSON-RPC server"); 174 rpc_task.stop().await; 175 176 info!("Stopping dnet tasks"); 177 dnet_task.stop().await; 178 179 info!("Stopping P2P network"); 180 p2p.stop().await; 181 182 info!("Shut down successfully"); 183 // ANCHOR_END: shutdown 184 Ok(()) 185 } 186 // ANCHOR_END: main