/ script / evgrd / bin / evgrd.rs
evgrd.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use darkfi::{
 20      async_daemonize, cli_desc,
 21      event_graph::{
 22          proto::{EventPut, ProtocolEventGraph},
 23          Event, EventGraph, EventGraphPtr,
 24      },
 25      net::{
 26          session::SESSION_DEFAULT,
 27          settings::SettingsOpt as NetSettingsOpt,
 28          transport::{Listener, PtListener, PtStream},
 29          P2p, P2pPtr,
 30      },
 31      rpc::{
 32          jsonrpc::JsonSubscriber,
 33          server::{listen_and_serve, RequestHandler},
 34          settings::RpcSettingsOpt,
 35      },
 36      system::{sleep, StoppableTask, StoppableTaskPtr},
 37      util::path::expand_path,
 38      Error, Result,
 39  };
 40  use darkfi_serial::{AsyncDecodable, AsyncEncodable};
 41  use futures::{FutureExt, AsyncWriteExt};
 42  use tracing::{debug, error, info};
 43  use sled_overlay::sled;
 44  use smol::{fs, lock::Mutex, stream::StreamExt, Executor};
 45  use std::{collections::HashSet, path::PathBuf, sync::Arc};
 46  use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
 47  use url::Url;
 48  
 49  use evgrd::{FetchEventsMessage, VersionMessage, MSG_EVENT, MSG_FETCHEVENTS, MSG_SENDEVENT};
 50  
 51  mod rpc;
 52  
 53  const CONFIG_FILE: &str = "evgrd.toml";
 54  const CONFIG_FILE_CONTENTS: &str = include_str!("../evgrd.toml");
 55  
 56  #[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
 57  #[serde(default)]
 58  #[structopt(name = "evgrd", about = cli_desc!())]
 59  struct Args {
 60      #[structopt(short, parse(from_occurrences))]
 61      /// Increase verbosity (-vvv supported)
 62      verbose: u8,
 63  
 64      #[structopt(short, long)]
 65      /// Configuration file to use
 66      config: Option<String>,
 67  
 68      #[structopt(long)]
 69      /// Set log file output
 70      log: Option<String>,
 71  
 72      #[structopt(long, default_value = "tcp://127.0.0.1:5588")]
 73      /// RPC server listen address
 74      daemon_listen: Vec<Url>,
 75  
 76      #[structopt(short, long, default_value = "~/.local/share/darkfi/evgrd_db")]
 77      /// Datastore (DB) path
 78      datastore: String,
 79  
 80      #[structopt(short, long, default_value = "~/.local/share/darkfi/replayed_evgrd_db")]
 81      /// Replay logs (DB) path
 82      replay_datastore: String,
 83  
 84      #[structopt(long)]
 85      /// Flag to store Sled DB instructions
 86      replay_mode: bool,
 87  
 88      #[structopt(long)]
 89      /// Flag to skip syncing the DAG (no history)
 90      skip_dag_sync: bool,
 91  
 92      #[structopt(long, default_value = "5")]
 93      /// Number of attempts to sync the DAG
 94      sync_attempts: u8,
 95  
 96      #[structopt(long, default_value = "15")]
 97      /// Number of seconds to wait before trying again if sync fails
 98      sync_timeout: u8,
 99  
100      #[structopt(flatten)]
101      /// P2P network settings
102      net: NetSettingsOpt,
103  
104      #[structopt(flatten)]
105      /// JSON-RPC settings
106      rpc: RpcSettingsOpt,
107  }
108  
109  pub struct Daemon {
110      /// P2P network pointer
111      p2p: P2pPtr,
112      ///// Sled DB (also used in event_graph and for RLN)
113      //sled: sled::Db,
114      /// Event Graph instance
115      event_graph: EventGraphPtr,
116      /// JSON-RPC connection tracker
117      rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
118      /// dnet JSON-RPC subscriber
119      dnet_sub: JsonSubscriber,
120      /// deg JSON-RPC subscriber
121      deg_sub: JsonSubscriber,
122      /// Replay logs (DB) path
123      replay_datastore: PathBuf,
124  }
125  
126  impl Daemon {
127      fn new(
128          p2p: P2pPtr,
129          //sled: sled::Db,
130          event_graph: EventGraphPtr,
131          dnet_sub: JsonSubscriber,
132          deg_sub: JsonSubscriber,
133          replay_datastore: PathBuf,
134      ) -> Self {
135          Self {
136              p2p,
137              //sled,
138              event_graph,
139              rpc_connections: Mutex::new(HashSet::new()),
140              dnet_sub,
141              deg_sub,
142              replay_datastore,
143          }
144      }
145  }
146  
147  async fn rpc_serve(
148      listener: Box<dyn PtListener>,
149      daemon: Arc<Daemon>,
150      ex: Arc<Executor<'_>>,
151  ) -> Result<()> {
152      loop {
153          match listener.next().await {
154              Ok((stream, url)) => {
155                  info!(target: "evgrd", "Accepted connection from {url}");
156                  let daemon = daemon.clone();
157                  ex.spawn(async move {
158                      if let Err(e) = handle_connect(stream, daemon).await {
159                          error!(target: "evgrd", "Handle connect exited: {e}");
160                      }
161                  })
162                  .detach();
163              }
164  
165              // Errors we didn't handle above:
166              Err(e) => {
167                  error!(
168                      target: "evgrd",
169                      "Unhandled listener.next() error: {}", e,
170                  );
171                  continue
172              }
173          }
174      }
175  }
176  
177  async fn handle_connect(mut stream: Box<dyn PtStream>, daemon: Arc<Daemon>) -> Result<()> {
178      let client_version = VersionMessage::decode_async(&mut stream).await?;
179      info!(target: "evgrd", "Client version: {}", client_version.protocol_version);
180  
181      let version = VersionMessage::new();
182      version.encode_async(&mut stream).await?;
183      stream.flush().await?;
184      debug!(target: "darkirc", "Sent version: {version:?}");
185  
186      let event_sub = daemon.event_graph.event_pub.clone().subscribe().await;
187  
188      loop {
189          futures::select! {
190              ev = event_sub.receive().fuse() => {
191                  MSG_EVENT.encode_async(&mut stream).await?;
192                  stream.flush().await?;
193                  ev.encode_async(&mut stream).await?;
194                  stream.flush().await?;
195              }
196              msg_type = u8::decode_async(&mut stream).fuse() => {
197                  debug!(target: "evgrd", "Received msg_type: {msg_type:?}");
198                  let msg_type = msg_type?;
199                  match msg_type {
200                      MSG_FETCHEVENTS => fetch_events(&mut stream, &daemon).await?,
201                      MSG_SENDEVENT => send_event(&mut stream, &daemon).await?,
202                      _ => error!(target: "evgrd", "Skipping unhandled msg_type: {msg_type}")
203                  }
204              }
205          }
206      }
207  }
208  
209  async fn fetch_events(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> {
210      let fetchevs = FetchEventsMessage::decode_async(stream).await?;
211      info!(target: "evgrd", "Fetch events: {fetchevs:?}");
212      let events = daemon.event_graph.fetch_successors_of(fetchevs.unref_tips).await?;
213  
214      let n_events = events.len();
215      for event in events {
216          MSG_EVENT.encode_async(stream).await?;
217          stream.flush().await?;
218          event.encode_async(stream).await?;
219          stream.flush().await?;
220      }
221      debug!(target: "evgrd", "Sent {n_events} for fetch");
222      Ok(())
223  }
224  
225  async fn send_event(stream: &mut Box<dyn PtStream>, daemon: &Daemon) -> Result<()> {
226      let timestamp = u64::decode_async(stream).await?;
227      let content = Vec::<u8>::decode_async(stream).await?;
228      info!(target: "evgrd", "send_event: {timestamp}, {content:?}");
229  
230      let event = Event::with_timestamp(timestamp, content, &daemon.event_graph).await;
231      daemon.event_graph.dag_insert(&[event.clone()]).await.unwrap();
232  
233      info!(target: "evgrd", "Broadcasting event put: {event:?}");
234      //daemon.p2p.broadcast(&EventPut(event)).await;
235  
236      let p2p = daemon.p2p.clone();
237      let self_version = p2p.settings().read().await.app_version.clone();
238      let connected_peers = p2p.hosts().peers();
239      let mut peers_with_matched_version = vec![];
240      let mut peers_with_different_version = vec![];
241      for peer in connected_peers {
242          let peer_version = peer.version.get();
243          if let Some(peer_version) = peer_version {
244              if self_version == peer_version.version {
245                  peers_with_matched_version.push(peer)
246              } else {
247                  peers_with_different_version.push(peer)
248              }
249          }
250      }
251  
252      if !peers_with_matched_version.is_empty() {
253          p2p.broadcast_to(&EventPut(event.clone()), &peers_with_matched_version).await;
254      }
255      if !peers_with_different_version.is_empty() {
256          let mut event = event;
257          event.timestamp /= 1000;
258          p2p.broadcast_to(&EventPut(event), &peers_with_different_version).await;
259      }
260  
261      Ok(())
262  }
263  
264  async_daemonize!(realmain);
265  async fn realmain(args: Args, ex: Arc<Executor<'static>>) -> Result<()> {
266      info!(target: "evgrd", "Starting evgrd node");
267  
268      // Create datastore path if not there already.
269      let datastore = expand_path(&args.datastore)?;
270      fs::create_dir_all(&datastore).await?;
271  
272      let replay_datastore = expand_path(&args.replay_datastore)?;
273      let replay_mode = args.replay_mode;
274  
275      info!(target: "evgrd", "Instantiating event DAG");
276      let sled_db = sled::open(datastore)?;
277      let mut p2p_settings: darkfi::net::Settings = args.net.into();
278      p2p_settings.app_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).unwrap();
279      p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:5262").unwrap());
280      let p2p = P2p::new(p2p_settings, ex.clone()).await?;
281      let event_graph = EventGraph::new(
282          p2p.clone(),
283          sled_db.clone(),
284          replay_datastore.clone(),
285          replay_mode,
286          "evgrd_dag",
287          1,
288          ex.clone(),
289      )
290      .await?;
291  
292      // Adding some events
293      // for i in 1..6 {
294      //     let event = Event::new(vec![1, 2, 3, i], &event_graph).await;
295      //     event_graph.dag_insert(&[event.clone()]).await.unwrap();
296      // }
297  
298      let prune_task = event_graph.prune_task.get().unwrap();
299  
300      info!(target: "evgrd", "Registering EventGraph P2P protocol");
301      let event_graph_ = Arc::clone(&event_graph);
302      let registry = p2p.protocol_registry();
303      registry
304          .register(SESSION_DEFAULT, move |channel, _| {
305              let event_graph_ = event_graph_.clone();
306              async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
307          })
308          .await;
309  
310      info!(target: "evgrd", "Starting dnet subs task");
311      let dnet_sub = JsonSubscriber::new("dnet.subscribe_events");
312      let dnet_sub_ = dnet_sub.clone();
313      let p2p_ = p2p.clone();
314      let dnet_task = StoppableTask::new();
315      dnet_task.clone().start(
316          async move {
317              let dnet_sub = p2p_.dnet_subscribe().await;
318              loop {
319                  let event = dnet_sub.receive().await;
320                  debug!(target: "evgrd", "Got dnet event: {:?}", event);
321                  dnet_sub_.notify(vec![event.into()].into()).await;
322              }
323          },
324          |res| async {
325              match res {
326                  Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
327                  Err(e) => panic!("{}", e),
328              }
329          },
330          Error::DetachedTaskStopped,
331          ex.clone(),
332      );
333  
334      info!(target: "evgrd", "Starting deg subs task");
335      let deg_sub = JsonSubscriber::new("deg.subscribe_events");
336      let deg_sub_ = deg_sub.clone();
337      let event_graph_ = event_graph.clone();
338      let deg_task = StoppableTask::new();
339      deg_task.clone().start(
340          async move {
341              let deg_sub = event_graph_.deg_subscribe().await;
342              loop {
343                  let event = deg_sub.receive().await;
344                  debug!(target: "evgrd", "Got deg event: {:?}", event);
345                  deg_sub_.notify(vec![event.into()].into()).await;
346              }
347          },
348          |res| async {
349              match res {
350                  Ok(()) | Err(Error::DetachedTaskStopped) => { /* Do nothing */ }
351                  Err(e) => panic!("{}", e),
352              }
353          },
354          Error::DetachedTaskStopped,
355          ex.clone(),
356      );
357  
358      info!(target: "evgrd", "Starting JSON-RPC server");
359      let daemon = Arc::new(Daemon::new(
360          p2p.clone(),
361          //sled_db.clone(),
362          event_graph.clone(),
363          dnet_sub,
364          deg_sub,
365          replay_datastore.clone(),
366      ));
367  
368      // Used for deg and dnet
369      let daemon_ = daemon.clone();
370      let rpc_task = StoppableTask::new();
371      rpc_task.clone().start(
372          listen_and_serve(args.rpc.into(), daemon.clone(), None, ex.clone()),
373          |res| async move {
374              match res {
375                  Ok(()) | Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
376                  Err(e) => error!(target: "evgrd", "Failed stopping JSON-RPC server: {}", e),
377              }
378          },
379          Error::RpcServerStopped,
380          ex.clone(),
381      );
382  
383      info!(target: "evgrd", "Starting evgrd server");
384      let mut rpc_tasks = vec![];
385      for listen_url in args.daemon_listen {
386          let listener = Listener::new(listen_url, None).await?;
387          let ptlistener = listener.listen().await?;
388  
389          let rpc_task = StoppableTask::new();
390          rpc_task.clone().start(
391              rpc_serve(ptlistener, daemon.clone(), ex.clone()),
392              |res| async move {
393                  match res {
394                      Ok(()) => panic!("Acceptor task should never complete without error status"),
395                      //Err(Error::RpcServerStopped) => daemon_.stop_connections().await,
396                      Err(e) => error!(target: "evgrd", "Failed stopping RPC server: {}", e),
397                  }
398              },
399              Error::RpcServerStopped,
400              ex.clone(),
401          );
402          rpc_tasks.push(rpc_task);
403      }
404  
405      info!(target: "evgrd", "Starting P2P network");
406      p2p.clone().start().await?;
407  
408      info!(target: "evgrd", "Waiting for some P2P connections...");
409      sleep(5).await;
410  
411      // We'll attempt to sync {sync_attempts} times
412      if !args.skip_dag_sync {
413          for i in 1..=args.sync_attempts {
414              info!(target: "evgrd", "Syncing event DAG (attempt #{})", i);
415              match event_graph.dag_sync().await {
416                  Ok(()) => break,
417                  Err(e) => {
418                      if i == args.sync_attempts {
419                          error!(target: "evgrd", "Failed syncing DAG. Exiting.");
420                          p2p.stop().await;
421                          return Err(Error::DagSyncFailed)
422                      } else {
423                          // TODO: Maybe at this point we should prune or something?
424                          // TODO: Or maybe just tell the user to delete the DAG from FS.
425                          error!(target: "evgrd", "Failed syncing DAG ({}), retrying in {}s...", e, args.sync_timeout);
426                          sleep(args.sync_timeout.into()).await;
427                      }
428                  }
429              }
430          }
431      } else {
432          *event_graph.synced.write().await = true;
433      }
434  
435      // Signal handling for graceful termination.
436      let (signals_handler, signals_task) = SignalHandler::new(ex)?;
437      signals_handler.wait_termination(signals_task).await?;
438      info!(target: "evgrd", "Caught termination signal, cleaning up and exiting...");
439  
440      info!(target: "evgrd", "Stopping P2P network");
441      p2p.stop().await;
442  
443      info!(target: "evgrd", "Stopping RPC server");
444      for rpc_task in rpc_tasks {
445          rpc_task.stop().await;
446      }
447      dnet_task.stop().await;
448      deg_task.stop().await;
449  
450      info!(target: "evgrd", "Stopping IRC server");
451      prune_task.stop().await;
452  
453      info!(target: "evgrd", "Flushing sled database...");
454      let flushed_bytes = sled_db.flush_async().await?;
455      info!(target: "evgrd", "Flushed {} bytes", flushed_bytes);
456  
457      info!(target: "evgrd", "Shut down successfully");
458      Ok(())
459  }