evgrd.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use darkfi::{ 20 async_daemonize, cli_desc, 21 event_graph::{ 22 proto::{EventPut, ProtocolEventGraph}, 23 Event, EventGraph, EventGraphPtr, 24 }, 25 net::{ 26 session::SESSION_DEFAULT, 27 settings::SettingsOpt as NetSettingsOpt, 28 transport::{Listener, PtListener, PtStream}, 29 P2p, P2pPtr, 30 }, 31 rpc::{ 32 jsonrpc::JsonSubscriber, 33 server::{listen_and_serve, RequestHandler}, 34 settings::RpcSettingsOpt, 35 }, 36 system::{sleep, StoppableTask, StoppableTaskPtr}, 37 util::path::expand_path, 38 Error, Result, 39 }; 40 use darkfi_serial::{AsyncDecodable, AsyncEncodable}; 41 use futures::{FutureExt, AsyncWriteExt}; 42 use tracing::{debug, error, info}; 43 use sled_overlay::sled; 44 use smol::{fs, lock::Mutex, stream::StreamExt, Executor}; 45 use std::{collections::HashSet, path::PathBuf, sync::Arc}; 46 use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml}; 47 use url::Url; 48 49 use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS, MSG_SENDEVENT}; 50 51 mod rpc; 52 53 const CONFIG_FILE: &str = "evgrd.toml"; 54 const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml"); 55 56 #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)] 57 #[serde(default)] 58 #[structopt(name = "evgrd", about = cli_desc!())] 59 struct Args { 60 #[structopt(short, parse(from_occurrences))] 61 /// Increase verbosity (-vvv supported) 62 verbose: u8, 63 64 #[structopt(short, long)] 65 /// Configuration file to use 66 config: Option<String>, 67 68 #[structopt(long)] 69 /// Set log file output 70 log: Option<String>, 71 72 #[structopt(long, default_value = "tcp://127.0.0.1:5588")] 73 /// RPC server listen address 74 daemon_listen: Vec<Url>, 75 76 #[structopt(short, long, default_value = "~/.local/share/darkfi/evgrd_db")] 77 /// Datastore (DB) path 78 datastore: String, 79 80 #[structopt(short, long, default_value = "~/.local/share/darkfi/replayed_evgrd_db")] 81 /// Replay logs (DB) path 82 replay_datastore: String, 83 84 #[structopt(long)] 85 /// Flag to store Sled DB instructions 86 replay_mode: bool, 87 88 #[structopt(long)] 89 /// Flag to skip syncing the DAG (no history) 90 skip_dag_sync: bool, 91 92 #[structopt(long, default_value = "5")] 93 /// Number of attempts to sync the DAG 94 sync_attempts: u8, 95 96 #[structopt(long, default_value = "15")] 97 /// Number of seconds to wait before trying again if sync fails 98 sync_timeout: u8, 99 100 #[structopt(flatten)] 101 /// P2P network settings 102 net: NetSettingsOpt, 103 104 #[structopt(flatten)] 105 /// JSON-RPC settings 106 rpc: RpcSettingsOpt, 107 } 108 109 pub struct Daemon { 110 /// P2P network pointer 111 p2p: P2pPtr, 112 ///// Sled DB (also used in event_graph and for RLN) 113 //sled: sled::Db, 114 /// Event Graph instance 115 event_graph: EventGraphPtr, 116 /// JSON-RPC connection tracker 117 rpc_connections: Mutex<HashSet<StoppableTaskPtr>>, 118 /// dnet JSON-RPC subscriber 119 dnet_sub: JsonSubscriber, 120 /// deg JSON-RPC subscriber 121 deg_sub: JsonSubscriber, 122 /// Replay logs (DB) path 123 replay_datastore: PathBuf, 124 } 125 126 impl Daemon { 127 fn new( 128 p2p: P2pPtr, 129 //sled: sled::Db, 130 event_graph: EventGraphPtr, 131 dnet_sub: JsonSubscriber, 132 deg_sub: JsonSubscriber, 133 replay_datastore: PathBuf, 134 ) -> Self { 135 Self { 136 p2p, 137 //sled, 138 event_graph, 139 rpc_connections: Mutex::new(HashSet::new()), 140 dnet_sub, 141 deg_sub, 142 replay_datastore, 143 } 144 } 145 } 146 147 async fn rpc_serve( 148 listener: Box<dyn PtListener>, 149 daemon: Arc<Daemon>, 150 ex: Arc<Executor<'_>>, 151 ) -> Result<()> { 152 loop { 153 match listener.next().await { 154 Ok((stream, url)) => { 155 info!(target: "evgrd", "Accepted connection from {url}"); 156 let daemon = daemon.clone(); 157 ex.spawn(async move { 158 if let Err(e) = handle_connect(stream, daemon).await { 159 error!(target: "evgrd", "Handle connect exited: {e}"); 160 } 161 }) 162 .detach(); 163 } 164 165 // Errors we didn't handle above: 166 Err(e) => { 167 error!( 168 target: "evgrd", 169 "Unhandled listener.next() error: {}", e, 170 ); 171 continue 172 } 173 } 174 } 175 } 176 177 async fn handle_connect(mut stream: Box<dyn PtStream>, daemon: Arc<Daemon>) -> Result<()> { 178 let client_version = VersionMessage::decode_async(&mut stream).await?; 179 info!(target: "evgrd", "Client version: {}", client_version.protocol_version); 180 181 let version = VersionMessage::new(); 182 version.encode_async(&mut stream).await?; 183 stream.flush().await?; 184 debug!(target: "darkirc", "Sent version: {version:?}"); 185 186 let event_sub = daemon.event_graph.event_pub.clone().subscribe().await; 187 188 loop { 189 futures::select! { 190 ev = event_sub.receive().fuse() => { 191 MSG_EVENT.encode_async(&mut stream).await?; 192 stream.flush().await?; 193 ev.encode_async(&mut stream).await?; 194 stream.flush().await?; 195 } 196 msg_type = u8::decode_async(&mut stream).fuse() => { 197 debug!(target: "evgrd", "Received msg_type: {msg_type:?}"); 198 let msg_type = msg_type?; 199 match msg_type { 200 MSG_FETCHEVENTS => fetch_events(&mut stream, &daemon).await?, 201 MSG_SENDEVENT => send_event(&mut stream, &daemon).await?, 202 _ => error!(target: "evgrd", "Skipping unhandled msg_type: {msg_type}") 203 } 204 } 205 } 206 } 207 } 208 209 async fn fetch_events(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> { 210 let fetchevs = FetchEventsMessage::decode_async(stream).await?; 211 info!(target: "evgrd", "Fetch events: {fetchevs:?}"); 212 let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?; 213 214 let n_events = events.len(); 215 for event in events { 216 MSG_EVENT.encode_async(stream).await?; 217 stream.flush().await?; 218 event.encode_async(stream).await?; 219 stream.flush().await?; 220 } 221 debug!(target: "evgrd", "Sent {n_events} for fetch"); 222 Ok(()) 223 } 224 225 async fn send_event(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> { 226 let timestamp = u64::decode_async(stream).await?; 227 let content = Vec::<u8>::decode_async(stream).await?; 228 info!(target: "evgrd", "send_event: {timestamp}, {content:?}"); 229 230 let event = Event::with_timestamp(timestamp, content, &daemon.event_graph).await; 231 daemon.event_graph.dag_insert(&[event.clone()]).await.unwrap(); 232 233 info!(target: "evgrd", "Broadcasting event put: {event:?}"); 234 //daemon.p2p.broadcast(&EventPut(event)).await; 235 236 let p2p = daemon.p2p.clone(); 237 let self_version = p2p.settings().read().await.app_version.clone(); 238 let connected_peers = p2p.hosts().peers(); 239 let mut peers_with_matched_version = vec![]; 240 let mut peers_with_different_version = vec![]; 241 for peer in connected_peers { 242 let peer_version = peer.version.get(); 243 if let Some(peer_version) = peer_version { 244 if self_version == peer_version.version { 245 peers_with_matched_version.push(peer) 246 } else { 247 peers_with_different_version.push(peer) 248 } 249 } 250 } 251 252 if !peers_with_matched_version.is_empty() { 253 p2p.broadcast_to(&EventPut(event.clone()), &peers_with_matched_version).await; 254 } 255 if !peers_with_different_version.is_empty() { 256 let mut event = event; 257 event.timestamp /= 1000; 258 p2p.broadcast_to(&EventPut(event), &peers_with_different_version).await; 259 } 260 261 Ok(()) 262 } 263 264 async_daemonize!(realmain); 265 async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> { 266 info!(target: "evgrd", "Starting evgrd node"); 267 268 // Create datastore path if not there already. 269 let datastore = expand_path(&args.datastore)?; 270 fs::create_dir_all(&datastore).await?; 271 272 let replay_datastore = expand_path(&args.replay_datastore)?; 273 let replay_mode = args.replay_mode; 274 275 info!(target: "evgrd", "Instantiating event DAG"); 276 let sled_db = sled::open(datastore)?; 277 let mut p2p_settings: darkfi::net::Settings = args.net.into(); 278 p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap(); 279 p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:5262").unwrap()); 280 let p2p = P2p::new(p2p_settings, ex.clone()).await?; 281 let event_graph = EventGraph::new( 282 p2p.clone(), 283 sled_db.clone(), 284 replay_datastore.clone(), 285 replay_mode, 286 "evgrd_dag", 287 1, 288 ex.clone(), 289 ) 290 .await?; 291 292 // Adding some events 293 // for i in 1..6 { 294 // let event = Event::new(vec![1, 2, 3, i], &event_graph).await; 295 // event_graph.dag_insert(&[event.clone()]).await.unwrap(); 296 // } 297 298 let prune_task = event_graph.prune_task.get().unwrap(); 299 300 info!(target: "evgrd", "Registering EventGraph P2P protocol"); 301 let event_graph_ = Arc::clone(&event_graph); 302 let registry = p2p.protocol_registry(); 303 registry 304 .register(SESSION_DEFAULT, move |channel, _| { 305 let event_graph_ = event_graph_.clone(); 306 async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } 307 }) 308 .await; 309 310 info!(target: "evgrd", "Starting dnet subs task"); 311 let dnet_sub = JsonSubscriber::new("dnet.subscribe_events"); 312 let dnet_sub_ = dnet_sub.clone(); 313 let p2p_ = p2p.clone(); 314 let dnet_task = StoppableTask::new(); 315 dnet_task.clone().start( 316 async move { 317 let dnet_sub = p2p_.dnet_subscribe().await; 318 loop { 319 let event = dnet_sub.receive().await; 320 debug!(target: "evgrd", "Got dnet event: {:?}", event); 321 dnet_sub_.notify(vec![event.into()].into()).await; 322 } 323 }, 324 |res| async { 325 match res { 326 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 327 Err(e) => panic!("{}", e), 328 } 329 }, 330 Error::DetachedTaskStopped, 331 ex.clone(), 332 ); 333 334 info!(target: "evgrd", "Starting deg subs task"); 335 let deg_sub = JsonSubscriber::new("deg.subscribe_events"); 336 let deg_sub_ = deg_sub.clone(); 337 let event_graph_ = event_graph.clone(); 338 let deg_task = StoppableTask::new(); 339 deg_task.clone().start( 340 async move { 341 let deg_sub = event_graph_.deg_subscribe().await; 342 loop { 343 let event = deg_sub.receive().await; 344 debug!(target: "evgrd", "Got deg event: {:?}", event); 345 deg_sub_.notify(vec![event.into()].into()).await; 346 } 347 }, 348 |res| async { 349 match res { 350 Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ } 351 Err(e) => panic!("{}", e), 352 } 353 }, 354 Error::DetachedTaskStopped, 355 ex.clone(), 356 ); 357 358 info!(target: "evgrd", "Starting JSON-RPC server"); 359 let daemon = Arc::new(Daemon::new( 360 p2p.clone(), 361 //sled_db.clone(), 362 event_graph.clone(), 363 dnet_sub, 364 deg_sub, 365 replay_datastore.clone(), 366 )); 367 368 // Used for deg and dnet 369 let daemon_ = daemon.clone(); 370 let rpc_task = StoppableTask::new(); 371 rpc_task.clone().start( 372 listen_and_serve(args.rpc.into(), daemon.clone(), None, ex.clone()), 373 |res| async move { 374 match res { 375 Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await, 376 Err(e) => error!(target: "evgrd", "Failed stopping JSON-RPC server: {}", e), 377 } 378 }, 379 Error::RpcServerStopped, 380 ex.clone(), 381 ); 382 383 info!(target: "evgrd", "Starting evgrd server"); 384 let mut rpc_tasks = vec![]; 385 for listen_url in args.daemon_listen { 386 let listener = Listener::new(listen_url, None).await?; 387 let ptlistener = listener.listen().await?; 388 389 let rpc_task = StoppableTask::new(); 390 rpc_task.clone().start( 391 rpc_serve(ptlistener, daemon.clone(), ex.clone()), 392 |res| async move { 393 match res { 394 Ok(()) => panic!("Acceptor task should never complete without error status"), 395 //Err(Error::RpcServerStopped) => daemon_.stop_connections().await, 396 Err(e) => error!(target: "evgrd", "Failed stopping RPC server: {}", e), 397 } 398 }, 399 Error::RpcServerStopped, 400 ex.clone(), 401 ); 402 rpc_tasks.push(rpc_task); 403 } 404 405 info!(target: "evgrd", "Starting P2P network"); 406 p2p.clone().start().await?; 407 408 info!(target: "evgrd", "Waiting for some P2P connections..."); 409 sleep(5).await; 410 411 // We'll attempt to sync {sync_attempts} times 412 if !args.skip_dag_sync { 413 for i in 1..=args.sync_attempts { 414 info!(target: "evgrd", "Syncing event DAG (attempt #{})", i); 415 match event_graph.dag_sync().await { 416 Ok(()) => break, 417 Err(e) => { 418 if i == args.sync_attempts { 419 error!(target: "evgrd", "Failed syncing DAG. Exiting."); 420 p2p.stop().await; 421 return Err(Error::DagSyncFailed) 422 } else { 423 // TODO: Maybe at this point we should prune or something? 424 // TODO: Or maybe just tell the user to delete the DAG from FS. 425 error!(target: "evgrd", "Failed syncing DAG ({}), retrying in {}s...", e, args.sync_timeout); 426 sleep(args.sync_timeout.into()).await; 427 } 428 } 429 } 430 } 431 } else { 432 *event_graph.synced.write().await = true; 433 } 434 435 // Signal handling for graceful termination. 436 let (signals_handler, signals_task) = SignalHandler::new(ex)?; 437 signals_handler.wait_termination(signals_task).await?; 438 info!(target: "evgrd", "Caught termination signal, cleaning up and exiting..."); 439 440 info!(target: "evgrd", "Stopping P2P network"); 441 p2p.stop().await; 442 443 info!(target: "evgrd", "Stopping RPC server"); 444 for rpc_task in rpc_tasks { 445 rpc_task.stop().await; 446 } 447 dnet_task.stop().await; 448 deg_task.stop().await; 449 450 info!(target: "evgrd", "Stopping IRC server"); 451 prune_task.stop().await; 452 453 info!(target: "evgrd", "Flushing sled database..."); 454 let flushed_bytes = sled_db.flush_async().await?; 455 info!(target: "evgrd", "Flushed {} bytes", flushed_bytes); 456 457 info!(target: "evgrd", "Shut down successfully"); 458 Ok(()) 459 }