/ example / dchat / dchatd / src / main.rs
main.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  // ANCHOR: imports
 20  use smol::{lock::Mutex, stream::StreamExt};
 21  use std::{collections::HashSet, sync::Arc};
 22  use tracing::{debug, error, info};
 23  
 24  use darkfi::{
 25      async_daemonize, cli_desc, net,
 26      net::settings::SettingsOpt,
 27      rpc::{
 28          jsonrpc::JsonSubscriber,
 29          server::{listen_and_serve, RequestHandler},
 30          settings::{RpcSettings, RpcSettingsOpt},
 31      },
 32      system::{StoppableTask, StoppableTaskPtr},
 33      Error, Result,
 34  };
 35  
 36  use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
 37  
 38  use crate::{
 39      dchatmsg::{DchatMsg, DchatMsgsBuffer},
 40      protocol_dchat::ProtocolDchat,
 41  };
 42  // ANCHOR_END: imports
 43  
 44  pub mod dchat_error;
 45  pub mod dchatmsg;
 46  pub mod protocol_dchat;
 47  pub mod rpc;
 48  
 49  const CONFIG_FILE: &str = "dchatd_config.toml";
 50  const CONFIG_FILE_CONTENTS: &str = include_str!("../dchatd_config.toml");
 51  
 52  // ANCHOR: args
 53  #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
 54  #[serde(default)]
 55  #[structopt(name = "dchat", about = cli_desc!())]
 56  struct Args {
 57      #[structopt(flatten)]
 58      /// JSON-RPC settings
 59      rpc: RpcSettingsOpt,
 60  
 61      #[structopt(short, long)]
 62      /// Configuration file to use
 63      config: Option<String>,
 64  
 65      #[structopt(short, long)]
 66      /// Set log file to ouput into
 67      log: Option<String>,
 68  
 69      #[structopt(short, parse(from_occurrences))]
 70      /// Increase verbosity (-vvv supported)
 71      verbose: u8,
 72  
 73      #[structopt(flatten)]
 74      /// P2P network settings
 75      net: SettingsOpt,
 76  }
 77  // ANCHOR_END: args
 78  
 79  // ANCHOR: dchat
 80  struct Dchat {
 81      p2p: net::P2pPtr,
 82      recv_msgs: DchatMsgsBuffer,
 83      pub rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
 84      pub dnet_sub: JsonSubscriber,
 85  }
 86  
 87  impl Dchat {
 88      fn new(
 89          p2p: net::P2pPtr,
 90          recv_msgs: DchatMsgsBuffer,
 91          rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
 92          dnet_sub: JsonSubscriber,
 93      ) -> Self {
 94          Self { p2p, recv_msgs, rpc_connections, dnet_sub }
 95      }
 96  }
 97  // ANCHOR_END: dchat
 98  
 99  // ANCHOR: main
100  async_daemonize!(realmain);
101  async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
102      let p2p = net::P2p::new(args.net.into(), ex.clone()).await?;
103  
104      // ANCHOR: dnet
105      info!("Starting dnet subs task");
106      let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
107      let dnet_sub_ = dnet_sub.clone();
108      let p2p_ = p2p.clone();
109      let dnet_task = StoppableTask::new();
110      dnet_task.clone().start(
111          async move {
112              let dnet_sub = p2p_.dnet_subscribe().await;
113              loop {
114                  let event = dnet_sub.receive().await;
115                  debug!("Got dnet event: {event:?}");
116                  dnet_sub_.notify(vec![event.into()].into()).await;
117              }
118          },
119          |res| async {
120              match res {
121                  Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
122                  Err(e) => panic!("{e}"),
123              }
124          },
125          Error::DetachedTaskStopped,
126          ex.clone(),
127      );
128      // ANCHOR_end: dnet
129  
130      // ANCHOR: rpc
131      let rpc_settings: RpcSettings = args.rpc.into();
132      info!("Starting JSON-RPC server on port {}", rpc_settings.listen);
133      let msgs: DchatMsgsBuffer = Arc::new(Mutex::new(vec![DchatMsg { msg: String::new() }]));
134      let rpc_connections = Mutex::new(HashSet::new());
135      let dchat = Arc::new(Dchat::new(p2p.clone(), msgs.clone(), rpc_connections, dnet_sub));
136      let _ex = ex.clone();
137  
138      let rpc_task = StoppableTask::new();
139      rpc_task.clone().start(
140          listen_and_serve(rpc_settings, dchat.clone(), None, ex.clone()),
141          |res| async move {
142              match res {
143                  Ok(()) | Err(Error::RpcServerStopped) => dchat.stop_connections().await,
144                  Err(e) => error!("Failed stopping JSON-RPC server: {e}"),
145              }
146          },
147          Error::RpcServerStopped,
148          ex.clone(),
149      );
150      // ANCHOR_end: rpc
151  
152      // ANCHOR: register_protocol
153      info!("Registering Dchat protocol");
154      let registry = p2p.protocol_registry();
155      registry
156          .register(net::session::SESSION_DEFAULT, move |channel, _p2p| {
157              let msgs_ = msgs.clone();
158              async move { ProtocolDchat::init(channel, msgs_).await }
159          })
160          .await;
161      // ANCHOR_END: register_protocol
162  
163      // ANCHOR: p2p_start
164      info!("Starting P2P network");
165      p2p.clone().start().await?;
166      // ANCHOR_END: p2p_start
167  
168      // ANCHOR: shutdown
169      let (signals_handler, signals_task) = SignalHandler::new(ex)?;
170      signals_handler.wait_termination(signals_task).await?;
171      info!("Caught termination signal, cleaning up and exiting...");
172  
173      info!("Stopping JSON-RPC server");
174      rpc_task.stop().await;
175  
176      info!("Stopping dnet tasks");
177      dnet_task.stop().await;
178  
179      info!("Stopping P2P network");
180      p2p.stop().await;
181  
182      info!("Shut down successfully");
183      // ANCHOR_END: shutdown
184      Ok(())
185  }
186  // ANCHOR_END: main