main.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{collections::HashSet, io::Write, path::PathBuf, sync::Arc}; 20 21 use darkfi::{ 22 async_daemonize, cli_desc, 23 event_graph::{proto::ProtocolEventGraph, EventGraph, EventGraphPtr}, 24 net::{session::SESSION_DEFAULT, settings::SettingsOpt, P2p, P2pPtr}, 25 rpc::{ 26 jsonrpc::JsonSubscriber, 27 server::{listen_and_serve, RequestHandler}, 28 settings::{RpcSettings, RpcSettingsOpt}, 29 }, 30 system::{sleep, StoppableTask, StoppableTaskPtr, Subscription}, 31 util::path::{expand_path, get_config_path}, 32 Error, Result, 33 }; 34 use darkfi_sdk::crypto::pasta_prelude::PrimeField; 35 36 use rand::rngs::OsRng; 37 use settings::list_configured_contacts; 38 use sled_overlay::sled; 39 use smol::{fs, lock::Mutex, stream::StreamExt, Executor}; 40 use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; 41 use tracing::{debug, error, info}; 42 use url::Url; 43 44 const CONFIG_FILE: &str = "darkirc_config.toml"; 45 const CONFIG_FILE_CONTENTS: &str = include_str!("../darkirc_config.toml"); 46 47 /// IRC server and client handler implementation 48 mod irc; 49 use irc::server::IrcServer; 50 51 /// Cryptography utilities 52 mod crypto; 53 use crypto::{bcrypt::bcrypt_hash_password, rln::RlnIdentity}; 54 55 /// JSON-RPC methods 56 mod rpc; 57 58 /// Settings utilities 59 mod settings; 60 61 fn panic_hook(panic_info: &std::panic::PanicHookInfo) { 62 error!("panic occurred: {panic_info}"); 63 error!("{}", std::backtrace::Backtrace::force_capture()); 64 std::process::abort() 65 } 66 67 #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] 68 #[serde(default)] 69 #[structopt( 70 name = "darkirc", 71 about = cli_desc!(), 72 version = concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMITISH")) 73 )] 74 struct Args { 75 #[structopt(short, parse(from_occurrences))] 76 /// Increase verbosity (-vvv supported) 77 verbose: u8, 78 79 #[structopt(short, long)] 80 /// Configuration file to use 81 config: Option<String>, 82 83 #[structopt(long)] 84 /// Set log file output 85 log: Option<String>, 86 87 #[structopt(long, default_value = "tcp://127.0.0.1:6667")] 88 /// IRC server listen address 89 irc_listen: Url, 90 91 /// Optional TLS certificate file path if `irc_listen` uses TLS 92 irc_tls_cert: Option<String>, 93 94 /// Optional TLS certificate key file path if `irc_listen` uses TLS 95 irc_tls_secret: Option<String>, 96 97 #[structopt(short, long, default_value = "~/.local/share/darkfi/darkirc_db")] 98 /// Datastore (DB) path 99 datastore: String, 100 101 #[structopt(short, long, default_value = "~/.local/share/darkfi/replayed_darkirc_db")] 102 /// Replay logs (DB) path 103 replay_datastore: String, 104 105 #[structopt(long)] 106 /// Flag to store Sled DB instructions 107 replay_mode: bool, 108 109 #[structopt(long)] 110 /// Generate a new NaCl keypair and exit 111 gen_chacha_keypair: bool, 112 113 #[structopt(long)] 114 /// Generate a new encrypted channel NaCl secret and exit 115 gen_channel_secret: bool, 116 117 #[structopt(long = "get-chacha-pubkey")] 118 /// Recover NaCl public key from a secret key 119 chacha_secret: Option<String>, 120 121 #[structopt(long)] 122 /// Generate a new RLN identity 123 gen_rln_identity: bool, 124 125 #[structopt(long)] 126 /// Flag to skip syncing the DAG (no history) 127 skip_dag_sync: bool, 128 129 #[structopt(long)] 130 /// IRC Password (Encrypted with bcrypt-2b) 131 password: Option<String>, 132 133 #[structopt(long)] 134 /// Encrypt a given password for the IRC server connection 135 encrypt_password: bool, 136 137 #[structopt(long)] 138 /// List configured contacts. 139 list_contacts: bool, 140 141 #[structopt(flatten)] 142 /// P2P network settings 143 net: SettingsOpt, 144 145 #[structopt(flatten)] 146 /// JSON-RPC settings 147 rpc: RpcSettingsOpt, 148 } 149 150 pub struct DarkIrc { 151 /// P2P network pointer 152 p2p: P2pPtr, 153 /// Sled DB (also used in event_graph and for RLN) 154 sled: sled::Db, 155 /// Event Graph instance 156 event_graph: EventGraphPtr, 157 /// JSON-RPC connection tracker 158 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 159 /// dnet JSON-RPC subscriber 160 dnet_sub: JsonSubscriber, 161 /// deg JSON-RPC subscriber 162 deg_sub: JsonSubscriber, 163 /// Replay logs (DB) path 164 replay_datastore: PathBuf, 165 } 166 167 impl DarkIrc { 168 fn new( 169 p2p: P2pPtr, 170 sled: sled::Db, 171 event_graph: EventGraphPtr, 172 dnet_sub: JsonSubscriber, 173 deg_sub: JsonSubscriber, 174 replay_datastore: PathBuf, 175 ) -> Self { 176 Self { 177 p2p, 178 sled, 179 event_graph, 180 rpc_connections: Mutex::new(HashSet::new()), 181 dnet_sub, 182 deg_sub, 183 replay_datastore, 184 } 185 } 186 } 187 188 async_daemonize!(realmain); 189 async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> { 190 // Abort the application on panic right away 191 std::panic::set_hook(Box::new(panic_hook)); 192 193 if args.gen_chacha_keypair { 194 let secret = crypto_box::SecretKey::generate(&mut OsRng); 195 let public = secret.public_key(); 196 let secret = bs58::encode(secret.to_bytes()).into_string(); 197 let public = bs58::encode(public.to_bytes()).into_string(); 198 199 println!( 200 "Place this in your config file under your contact, you can reuse this keypair for multiple contacts\n" 201 ); 202 println!("[contact.\"satoshi\"]"); 203 println!("dm_chacha_public = \"YOUR_CONTACT_PUBLIC_KEY\""); 204 println!("my_dm_chacha_secret = \"{secret}\""); 205 println!("#my_dm_chacha_public = \"{public}\""); 206 return Ok(()); 207 } 208 209 if args.gen_channel_secret { 210 let secret = crypto_box::SecretKey::generate(&mut OsRng); 211 let secret = bs58::encode(secret.to_bytes()).into_string(); 212 println!("Place this in your config file:\n"); 213 println!("[channel.\"#yourchannelname\"]"); 214 println!("secret = \"{secret}\""); 215 return Ok(()); 216 } 217 218 if args.gen_rln_identity { 219 let identity = RlnIdentity::new(&mut OsRng); 220 let nullifier = bs58::encode(identity.nullifier.to_repr()).into_string(); 221 let trapdoor = bs58::encode(identity.trapdoor.to_repr()).into_string(); 222 println!("Place this in your config file:\n"); 223 println!("[rln]"); 224 println!("nullifier = \"{nullifier}\""); 225 println!("trapdoor = \"{trapdoor}\""); 226 return Ok(()); 227 } 228 229 if let Some(chacha_secret) = args.chacha_secret { 230 let bytes = match bs58::decode(chacha_secret).into_vec() { 231 Ok(v) => v, 232 Err(e) => { 233 println!("Error: {e}"); 234 return Err(Error::ParseFailed("Secret key parsing failed")); 235 } 236 }; 237 238 if bytes.len() != 32 { 239 return Err(Error::ParseFailed("Decoded base58 is not 32 bytes long")); 240 } 241 242 let secret: [u8; 32] = bytes.try_into().unwrap(); 243 let secret = crypto_box::SecretKey::from(secret); 244 println!("{}", bs58::encode(secret.public_key().to_bytes()).into_string()); 245 return Ok(()); 246 } 247 248 if args.list_contacts { 249 let config_path = match get_config_path(args.config, CONFIG_FILE) { 250 Ok(path) => path, 251 Err(e) => { 252 error!("Unable to get config path: {e}"); 253 return Err(e); 254 } 255 }; 256 let contents = match fs::read_to_string(&config_path).await { 257 Ok(c) => c, 258 Err(e) => { 259 error!("Unable read path `{config_path:?}`: {e}"); 260 return Err(e.into()); 261 } 262 }; 263 let contents = match toml::from_str(&contents) { 264 Ok(v) => v, 265 Err(e) => { 266 error!("Failed parsing TOML config: {e}"); 267 return Err(Error::ParseFailed("Failed parsing TOML config")); 268 } 269 }; 270 271 // Parse configured contacts 272 let contacts = match list_configured_contacts(&contents) { 273 Ok(c) => c, 274 Err(e) => { 275 error!("List contacts failed `{config_path:?}`: {e}"); 276 return Err(e); 277 } 278 }; 279 280 for (name, (public_key, my_secret_key)) in contacts { 281 let public_key = bs58::encode(public_key.to_bytes()).into_string(); 282 let my_public_key = my_secret_key.public_key(); 283 let my_secret_key = bs58::encode(my_secret_key.to_bytes()).into_string(); 284 let my_public_key = bs58::encode(my_public_key.to_bytes()).into_string(); 285 println!("{name}: {public_key} using key {my_secret_key}({my_public_key})") 286 } 287 return Ok(()); 288 } 289 290 if args.encrypt_password { 291 let mut pw = String::new(); 292 293 print!("Enter password: "); 294 std::io::stdout().flush()?; 295 std::io::stdin().read_line(&mut pw)?; 296 297 if let Some('\n') = pw.chars().next_back() { 298 pw.pop(); 299 } 300 if let Some('\r') = pw.chars().next_back() { 301 pw.pop(); 302 } 303 304 println!("{}", bcrypt_hash_password(pw)); 305 std::io::stdout().flush()?; 306 307 return Ok(()); 308 } 309 310 info!("Initializing DarkIRC node"); 311 312 // Create datastore path if not there already. 313 let datastore = match expand_path(&args.datastore) { 314 Ok(v) => v, 315 Err(e) => { 316 error!("Bad datastore path `{}`: {e}", args.datastore); 317 return Err(e); 318 } 319 }; 320 if let Err(e) = fs::create_dir_all(&datastore).await { 321 error!("Failed to create data store path `{datastore:?}`: {e}"); 322 return Err(e.into()); 323 } 324 325 let replay_datastore = match expand_path(&args.replay_datastore) { 326 Ok(v) => v, 327 Err(e) => { 328 error!("Bad replay datastore path `{}`: {e}", args.replay_datastore); 329 return Err(e); 330 } 331 }; 332 let replay_mode = args.replay_mode; 333 334 info!("Instantiating event DAG"); 335 let sled_db = match sled::open(datastore.clone()) { 336 Ok(v) => v, 337 Err(e) => { 338 error!("Failed to open datastore database `{datastore:?}`: {e}"); 339 return Err(e.into()); 340 } 341 }; 342 let mut p2p_settings: darkfi::net::Settings = args.net.into(); 343 p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(); 344 let p2p = match P2p::new(p2p_settings, ex.clone()).await { 345 Ok(p2p) => p2p, 346 Err(e) => { 347 error!("Unable to create P2P network: {e}"); 348 return Err(e); 349 } 350 }; 351 let event_graph = match EventGraph::new( 352 p2p.clone(), 353 sled_db.clone(), 354 replay_datastore.clone(), 355 replay_mode, 356 "darkirc_dag", 357 1, 358 ex.clone(), 359 ) 360 .await 361 { 362 Ok(v) => v, 363 Err(e) => { 364 error!("Event graph failed to start: {e}"); 365 return Err(e); 366 } 367 }; 368 369 let prune_task = event_graph.prune_task.get().unwrap(); 370 371 info!("Registering EventGraph P2P protocol"); 372 let event_graph_ = Arc::clone(&event_graph); 373 let registry = p2p.protocol_registry(); 374 registry 375 .register(SESSION_DEFAULT, move |channel, _| { 376 let event_graph_ = event_graph_.clone(); 377 async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } 378 }) 379 .await; 380 381 info!("Starting dnet subs task"); 382 let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); 383 let dnet_sub_ = dnet_sub.clone(); 384 let p2p_ = p2p.clone(); 385 let dnet_task = StoppableTask::new(); 386 dnet_task.clone().start( 387 async move { 388 let dnet_sub = p2p_.dnet_subscribe().await; 389 loop { 390 let event = dnet_sub.receive().await; 391 debug!("Got dnet event: {event:?}"); 392 dnet_sub_.notify(vec![event.into()].into()).await; 393 } 394 }, 395 |res| async { 396 match res { 397 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 398 Err(e) => panic!("{e}"), 399 } 400 }, 401 Error::DetachedTaskStopped, 402 ex.clone(), 403 ); 404 405 info!("Starting deg subs task"); 406 let deg_sub = JsonSubscriber::new("deg.subscribe_events"); 407 let deg_sub_ = deg_sub.clone(); 408 let event_graph_ = event_graph.clone(); 409 let deg_task = StoppableTask::new(); 410 deg_task.clone().start( 411 async move { 412 let deg_sub = event_graph_.deg_subscribe().await; 413 loop { 414 let event = deg_sub.receive().await; 415 debug!("Got deg event: {event:?}"); 416 deg_sub_.notify(vec![event.into()].into()).await; 417 } 418 }, 419 |res| async { 420 match res { 421 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 422 Err(e) => panic!("{e}"), 423 } 424 }, 425 Error::DetachedTaskStopped, 426 ex.clone(), 427 ); 428 429 info!("Starting JSON-RPC server"); 430 let rpc_settings: RpcSettings = args.rpc.into(); 431 let darkirc = Arc::new(DarkIrc::new( 432 p2p.clone(), 433 sled_db.clone(), 434 event_graph.clone(), 435 dnet_sub, 436 deg_sub, 437 replay_datastore.clone(), 438 )); 439 let darkirc_ = Arc::clone(&darkirc); 440 let rpc_task = StoppableTask::new(); 441 rpc_task.clone().start( 442 listen_and_serve(rpc_settings, darkirc.clone(), None, ex.clone()), 443 |res| async move { 444 match res { 445 Ok(()) | Err(Error::RpcServerStopped) => darkirc_.stop_connections().await, 446 Err(e) => error!("Failed stopping JSON-RPC server: {e}"), 447 } 448 }, 449 Error::RpcServerStopped, 450 ex.clone(), 451 ); 452 453 info!("Starting IRC server"); 454 let password = args.password.unwrap_or_default(); 455 let config_path = match get_config_path(args.config.clone(), CONFIG_FILE) { 456 Ok(v) => v, 457 Err(e) => { 458 error!("Cannot get config path `{:?}`: {e}", args.config); 459 return Err(e); 460 } 461 }; 462 let irc_server = match IrcServer::new( 463 darkirc.clone(), 464 args.irc_listen, 465 args.irc_tls_cert, 466 args.irc_tls_secret, 467 config_path, 468 password, 469 ) 470 .await 471 { 472 Ok(v) => v, 473 Err(e) => { 474 error!("Unable to create IRC server: {e}"); 475 return Err(e); 476 } 477 }; 478 479 let irc_task = StoppableTask::new(); 480 let ex_ = ex.clone(); 481 irc_task.clone().start( 482 irc_server.clone().listen(ex_), 483 |res| async move { 484 match res { 485 Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } 486 Err(e) => error!("Failed stopping IRC server: {e}"), 487 } 488 }, 489 Error::DetachedTaskStopped, 490 ex.clone(), 491 ); 492 493 info!("Starting P2P network"); 494 if let Err(e) = p2p.clone().start().await { 495 error!("P2P failed to start: {e}"); 496 return Err(e); 497 } 498 499 // Initial DAG sync 500 if let Err(e) = sync_task(&p2p, &event_graph, args.skip_dag_sync).await { 501 error!("DAG sync task failed to start: {e}"); 502 return Err(e); 503 }; 504 505 // Stoppable task to monitor network and resync on disconnect. 506 let sync_mon_task = StoppableTask::new(); 507 sync_mon_task.clone().start( 508 sync_and_monitor(p2p.clone(), event_graph.clone(), args.skip_dag_sync), 509 |res| async move { 510 match res { 511 Ok(()) | Err(Error::DetachedTaskStopped) => { /* TODO: */ } 512 Err(e) => error!("Failed sync task: {e}"), 513 } 514 }, 515 Error::DetachedTaskStopped, 516 ex.clone(), 517 ); 518 519 // Signal handling for graceful termination. 520 let (signals_handler, signals_task) = SignalHandler::new(ex)?; 521 signals_handler.wait_termination(signals_task).await?; 522 info!("Caught termination signal, cleaning up and exiting..."); 523 524 info!("Stopping P2P network"); 525 p2p.stop().await; 526 527 info!("Stopping JSON-RPC server"); 528 rpc_task.stop().await; 529 dnet_task.stop().await; 530 deg_task.stop().await; 531 532 info!("Stopping IRC server"); 533 irc_task.stop().await; 534 prune_task.stop().await; 535 536 info!("Flushing sled database..."); 537 let flushed_bytes = sled_db.flush_async().await?; 538 info!("Flushed {flushed_bytes} bytes"); 539 540 info!("Shut down successfully"); 541 Ok(()) 542 } 543 544 /// Async task to monitor network disconnections. 545 async fn monitor_network(subscription: &Subscription<Error>) -> Result<()> { 546 Err(subscription.receive().await) 547 } 548 549 /// Async task to endlessly try to sync DAG, returns Ok if done. 550 async fn sync_task(p2p: &P2pPtr, event_graph: &EventGraphPtr, skip_dag_sync: bool) -> Result<()> { 551 let comms_timeout = p2p.settings().read().await.outbound_connect_timeout; 552 553 loop { 554 if p2p.is_connected() { 555 info!("Got peer connection"); 556 // We'll attempt to sync for ever 557 if !skip_dag_sync { 558 info!("Syncing event DAG"); 559 match event_graph.dag_sync().await { 560 Ok(()) => break, 561 Err(e) => { 562 // TODO: Maybe at this point we should prune or something? 563 // TODO: Or maybe just tell the user to delete the DAG from FS. 564 error!("Failed syncing DAG ({e}), retrying in {comms_timeout}s..."); 565 sleep(comms_timeout).await; 566 } 567 } 568 } else { 569 *event_graph.synced.write().await = true; 570 break; 571 } 572 } else { 573 info!("Waiting for some P2P connections..."); 574 sleep(comms_timeout).await; 575 } 576 } 577 578 Ok(()) 579 } 580 581 /// Async task to monitor the network and force resync on disconnections 582 async fn sync_and_monitor( 583 p2p: P2pPtr, 584 event_graph: EventGraphPtr, 585 skip_dag_sync: bool, 586 ) -> Result<()> { 587 loop { 588 let net_subscription = p2p.hosts().subscribe_disconnect().await; 589 let result = monitor_network(&net_subscription).await; 590 net_subscription.unsubscribe().await; 591 592 match result { 593 Ok(_) => return Ok(()), 594 Err(Error::NetworkNotConnected) => { 595 // Sync node again 596 info!("Network disconnection detected, resyncing..."); 597 *event_graph.synced.write().await = false; 598 sync_task(&p2p, &event_graph, skip_dag_sync).await?; 599 } 600 Err(e) => return Err(e), 601 } 602 } 603 }