/ bin / app / src / plugin / darkirc.rs
darkirc.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use async_trait::async_trait;
 20  use darkfi::{
 21      event_graph::{
 22          self,
 23          proto::{EventPut, ProtocolEventGraph},
 24          EventGraph, EventGraphPtr,
 25      },
 26      net::{session::SESSION_DEFAULT, settings::Settings as NetSettings, ChannelPtr, P2p, P2pPtr},
 27      system::{sleep, Subscription},
 28      Result as DarkFiResult,
 29  };
 30  use darkfi_serial::{
 31      deserialize_async, serialize, serialize_async, AsyncEncodable, Decodable, Encodable,
 32      SerialDecodable, SerialEncodable,
 33  };
 34  use sled_overlay::sled;
 35  use std::{
 36      io::Cursor,
 37      sync::{Arc, Mutex as SyncMutex, OnceLock, Weak},
 38      time::UNIX_EPOCH,
 39  };
 40  
 41  use crate::{
 42      error::{Error, Result},
 43      prop::{BatchGuardPtr, PropertyAtomicGuard, PropertyStr, Role},
 44      scene::{MethodCallSub, Pimpl, SceneNode, SceneNodeType, SceneNodeWeak},
 45      ui::{
 46          chatview::{MessageId, Timestamp},
 47          OnModify,
 48      },
 49      ExecutorPtr,
 50  };
 51  
 52  use super::PluginSettings;
 53  
 54  const P2P_RETRY_TIME: u64 = 20;
 55  const COOLOFF_SLEEP_TIME: u64 = 20;
 56  const COOLOFF_SYNC_ATTEMPTS: usize = 6;
 57  const SYNC_MIN_PEERS: usize = 2;
 58  
 59  /// Due to drift between different machine's clocks, if the message timestamp is recent
 60  /// then we will just correct it to the current time so messages appear sequential in the UI.
 61  const RECENT_TIME_DIST: u64 = 25_000;
 62  
 63  #[cfg(target_os = "android")]
 64  mod paths {
 65      use crate::android::{get_appdata_path, get_external_storage_path};
 66      use std::path::PathBuf;
 67  
 68      pub fn get_evgrdb_path() -> PathBuf {
 69          get_external_storage_path().join("evgr")
 70      }
 71      pub fn get_use_tor_filename() -> PathBuf {
 72          get_external_storage_path().join("use_tor.txt")
 73      }
 74  
 75      pub fn nick_filename() -> PathBuf {
 76          get_appdata_path().join("/nick.txt")
 77      }
 78  
 79      pub fn p2p_datastore_path() -> PathBuf {
 80          get_appdata_path().join("darkirc_p2p")
 81      }
 82      pub fn hostlist_path() -> PathBuf {
 83          get_appdata_path().join("hostlist.tsv")
 84      }
 85  }
 86  
 87  #[cfg(not(target_os = "android"))]
 88  mod paths {
 89      use std::path::PathBuf;
 90  
 91      pub fn get_evgrdb_path() -> PathBuf {
 92          dirs::data_local_dir().unwrap().join("darkfi/app/evgr")
 93      }
 94      pub fn get_use_tor_filename() -> PathBuf {
 95          dirs::data_local_dir().unwrap().join("darkfi/app/use_tor.txt")
 96      }
 97  
 98      pub fn nick_filename() -> PathBuf {
 99          dirs::cache_dir().unwrap().join("darkfi/app/nick.txt")
100      }
101  
102      pub fn p2p_datastore_path() -> PathBuf {
103          dirs::cache_dir().unwrap().join("darkfi/app/darkirc_p2p")
104      }
105      pub fn hostlist_path() -> PathBuf {
106          dirs::cache_dir().unwrap().join("darkfi/app/hostlist.tsv")
107      }
108  }
109  
110  use paths::*;
111  
112  macro_rules! t { ($($arg:tt)*) => { trace!(target: "plugin::darkirc", $($arg)*); } }
113  macro_rules! d { ($($arg:tt)*) => { debug!(target: "plugin::darkirc", $($arg)*); } }
114  macro_rules! i { ($($arg:tt)*) => { info!(target: "plugin::darkirc", $($arg)*); } }
115  macro_rules! e { ($($arg:tt)*) => { error!(target: "plugin::darkirc", $($arg)*); } }
116  macro_rules! w { ($($arg:tt)*) => { warn!(target: "plugin::darkirc", $($arg)*); } }
117  
118  #[derive(Clone, Debug, SerialEncodable, SerialDecodable)]
119  pub struct Privmsg {
120      pub channel: String,
121      pub nick: String,
122      pub msg: String,
123  }
124  
125  impl Privmsg {
126      pub fn new(channel: String, nick: String, msg: String) -> Self {
127          Self { channel, nick, msg }
128      }
129  
130      pub fn msg_id(&self, timest: u64) -> MessageId {
131          let mut hasher = blake3::Hasher::new();
132          0u8.encode(&mut hasher).unwrap();
133          0u8.encode(&mut hasher).unwrap();
134          timest.encode(&mut hasher).unwrap();
135          self.channel.encode(&mut hasher).unwrap();
136          self.nick.encode(&mut hasher).unwrap();
137          self.msg.encode(&mut hasher).unwrap();
138          MessageId(hasher.finalize().into())
139      }
140  }
141  
142  struct SeenMsg {
143      id: MessageId,
144      is_self: bool,
145      seen_times: usize,
146  }
147  
148  struct SeenMessages {
149      seen: Vec<SeenMsg>,
150  }
151  
152  impl SeenMessages {
153      fn new() -> Self {
154          Self { seen: vec![] }
155      }
156  
157      fn get_status(&self, id: &MessageId) -> Option<&SeenMsg> {
158          self.seen.iter().find(|s| s.id == *id)
159      }
160  
161      fn push(&mut self, id: MessageId, is_self: bool) {
162          self.seen.push(SeenMsg { id, is_self, seen_times: 0 });
163      }
164  }
165  
166  pub type DarkIrcPtr = Arc<DarkIrc>;
167  
168  pub struct DarkIrc {
169      node: SceneNodeWeak,
170      tasks: OnceLock<Vec<smol::Task<()>>>,
171  
172      p2p: P2pPtr,
173      event_graph: EventGraphPtr,
174  
175      seen_msgs: SyncMutex<SeenMessages>,
176      nick: PropertyStr,
177  
178      settings: PluginSettings,
179  }
180  
181  impl DarkIrc {
182      pub async fn new(node: SceneNodeWeak, ex: ExecutorPtr) -> Result<Pimpl> {
183          let node_ref = &node.upgrade().unwrap();
184          let nick = PropertyStr::wrap(node_ref, Role::Internal, "nick", 0).unwrap();
185  
186          let setting_root = Arc::new(SceneNode::new("setting", SceneNodeType::SettingRoot));
187          node_ref.link(setting_root.clone());
188  
189          i!("Starting DarkIRC backend");
190          let evgr_path = get_evgrdb_path();
191          let db = match sled::open(&evgr_path) {
192              Ok(db) => db,
193              Err(err) => {
194                  e!("Sled database '{}' failed to open: {err}!", evgr_path.display());
195                  return Err(Error::SledDbErr)
196              }
197          };
198  
199          let setting_tree = db.open_tree("settings")?;
200          let settings = PluginSettings { setting_root, sled_tree: setting_tree };
201  
202          let mut p2p_settings: NetSettings = Default::default();
203          p2p_settings.app_version = semver::Version::parse("0.5.0").unwrap();
204          if get_use_tor_filename().exists() {
205              i!("Setup P2P network [tor]");
206              p2p_settings.outbound_connect_timeout = 60;
207              p2p_settings.channel_handshake_timeout = 55;
208              p2p_settings.channel_heartbeat_interval = 90;
209              p2p_settings.outbound_peer_discovery_cooloff_time = 60;
210  
211              p2p_settings.seeds.push(
212                  url::Url::parse(
213                      "tor://g7fxelebievvpr27w7gt24lflptpw3jeeuvafovgliq5utdst6xyruyd.onion:25552",
214                  )
215                  .unwrap(),
216              );
217              p2p_settings.seeds.push(
218                  url::Url::parse(
219                      "tor://yvklzjnfmwxhyodhrkpomawjcdvcaushsj6torjz2gyd7e25f3gfunyd.onion:25552",
220                  )
221                  .unwrap(),
222              );
223              p2p_settings.allowed_transports = vec!["tor".to_string()];
224          } else {
225              i!("Setup P2P network [clearnet]");
226              p2p_settings.outbound_connect_timeout = 40;
227              p2p_settings.channel_handshake_timeout = 30;
228  
229              p2p_settings.outbound_connections = 3;
230              p2p_settings.inbound_connections = 0;
231  
232              p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith0.dark.fi:25551").unwrap());
233              p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:25551").unwrap());
234          }
235          p2p_settings.p2p_datastore = p2p_datastore_path().into_os_string().into_string().ok();
236          p2p_settings.hostlist = hostlist_path().into_os_string().into_string().ok();
237  
238          settings.add_p2p_settings(&p2p_settings);
239  
240          settings.load_settings();
241          settings.update_p2p_settings(&mut p2p_settings);
242  
243          let p2p = match P2p::new(p2p_settings.clone(), ex.clone()).await {
244              Ok(p2p) => p2p,
245              Err(err) => {
246                  e!("Create p2p network failed: {err}!");
247                  return Err(Error::ServiceFailed)
248              }
249          };
250  
251          let event_graph = match EventGraph::new(
252              p2p.clone(),
253              db.clone(),
254              std::path::PathBuf::new(),
255              false,
256              "darkirc_dag",
257              1,
258              ex.clone(),
259          )
260          .await
261          {
262              Ok(evgr) => evgr,
263              Err(err) => {
264                  e!("Create event graph failed: {err}!");
265                  return Err(Error::ServiceFailed)
266              }
267          };
268  
269          if let Ok(prev_nick) = std::fs::read_to_string(nick_filename()) {
270              nick.set(&mut PropertyAtomicGuard::none(), prev_nick);
271          }
272  
273          let self_ = Arc::new(Self {
274              node: node.clone(),
275              tasks: OnceLock::new(),
276  
277              p2p,
278              event_graph,
279  
280              seen_msgs: SyncMutex::new(SeenMessages::new()),
281              nick,
282              settings,
283          });
284          self_.clone().start(ex).await;
285          Ok(Pimpl::DarkIrc(self_))
286      }
287  
288      async fn dag_sync(self: Arc<Self>, channel_sub: Subscription<DarkFiResult<ChannelPtr>>) {
289          i!("Starting p2p network");
290          while let Err(err) = self.p2p.clone().start().await {
291              // This usually means we cannot listen on the inbound ports
292              e!("Failed to start p2p network: {err}!");
293              e!("Usually this means there is another process listening on the same ports.");
294              e!("Trying again in {P2P_RETRY_TIME} secs");
295              sleep(P2P_RETRY_TIME).await;
296          }
297  
298          i!("Waiting for some P2P connections...");
299  
300          let mut sync_attempt = 0;
301          loop {
302              // Wait for a channel
303              if let Err(err) = channel_sub.receive().await {
304                  w!("There was an error listening for channels. The service closed unexpectedly with error: {err}");
305                  continue
306              }
307  
308              let peers_count = self.p2p.peers_count();
309              self.notify_connect(peers_count, false).await;
310  
311              // Wait until we have enough connections
312              if peers_count < SYNC_MIN_PEERS {
313                  i!("Connected to {peers_count} peers. Waiting for more connections.");
314                  continue
315              }
316  
317              sync_attempt += 1;
318  
319              // Cool off periodically
320              if sync_attempt > COOLOFF_SYNC_ATTEMPTS {
321                  i!("Wasn't able to sync yet. Cooling off for {COOLOFF_SLEEP_TIME} then will try again.");
322                  sleep(COOLOFF_SLEEP_TIME).await;
323                  sync_attempt = 0;
324              }
325  
326              i!("Syncing event DAG (attempt #{sync_attempt})");
327              match self.event_graph.dag_sync().await {
328                  Ok(()) => break,
329                  Err(e) => {
330                      // TODO: Maybe at this point we should prune or something?
331                      // TODO: Or maybe just tell the user to delete the DAG from FS.
332                      w!("Failed DAG sync: ({e}). Waiting for more connections before retry.");
333                  }
334              }
335          }
336  
337          let peers_count = self.p2p.peers_count();
338          self.notify_connect(peers_count, true).await;
339  
340          // Initial sync finished. Now just notify of connection changes
341          loop {
342              // Wait for a channel
343              if let Err(err) = channel_sub.receive().await {
344                  w!("There was an error listening for channels. The service closed unexpectedly with error: {err}");
345                  continue
346              }
347  
348              let peers_count = self.p2p.peers_count();
349              self.notify_connect(peers_count, true).await;
350          }
351      }
352  
353      async fn notify_connect(&self, peers_count: usize, is_dag_synced: bool) {
354          let node = self.node.upgrade().unwrap();
355          node.trigger("connect", serialize(&(peers_count as u32, is_dag_synced))).await.unwrap();
356      }
357  
358      async fn relay_events(self: Arc<Self>, ev_sub: Subscription<event_graph::Event>) {
359          loop {
360              let ev = ev_sub.receive().await;
361  
362              // Try to deserialize the `Event`'s content into a `Privmsg`
363              let privmsg: Privmsg = match deserialize_async(ev.content()).await {
364                  Ok(v) => v,
365                  Err(e) => {
366                      e!("[IRC CLIENT] Failed deserializing incoming Privmsg event: {}", e);
367                      continue
368                  }
369              };
370  
371              let mut timest = ev.timestamp;
372              let msg_id = privmsg.msg_id(timest);
373              t!(
374                  "Relaying ev_id={:?}, ev={ev:?}, msg_id={msg_id}, privmsg={privmsg:?}, timest={timest}",
375                  ev.id(),
376              );
377  
378              let is_self = {
379                  let mut is_self = false;
380                  let mut seen = self.seen_msgs.lock().unwrap();
381                  match seen.get_status(&msg_id) {
382                      Some(msg) => {
383                          is_self = msg.is_self;
384  
385                          if !msg.is_self || msg.seen_times > 1 {
386                              warn!(target: "plugin::darkirc", "Skipping duplicate seen message: {msg_id}");
387                              continue
388                          }
389                      }
390                      None => {
391                          seen.push(msg_id.clone(), false);
392                      }
393                  }
394                  is_self
395              };
396  
397              // This is a hack to make messages appear sequentially in the UI
398              let now_timest = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64;
399              if !is_self && timest.abs_diff(now_timest) < RECENT_TIME_DIST {
400                  d!("Applied timestamp correction: <{timest}> => <{now_timest}>");
401                  timest = now_timest;
402              }
403  
404              // Strip off starting #
405              let mut channel = privmsg.channel;
406              if channel.is_empty() {
407                  warn!(target: "plugin::darkirc", "Received privmsg with empty channel!");
408                  continue
409              }
410              if channel.chars().next().unwrap() != '#' {
411                  warn!(target: "plugin::darkirc", "Skipping encrypted channel: {channel}");
412                  continue
413              }
414              channel.remove(0);
415  
416              // Workaround for the chatview hack. This nick is off limits!
417              let mut nick = privmsg.nick;
418              if nick == "NOTICE" {
419                  nick = "noticer".to_string();
420              }
421  
422              let mut arg_data = vec![];
423              channel.encode(&mut arg_data).unwrap();
424              timest.encode(&mut arg_data).unwrap();
425              msg_id.encode(&mut arg_data).unwrap();
426              nick.encode(&mut arg_data).unwrap();
427              privmsg.msg.encode(&mut arg_data).unwrap();
428  
429              let node = self.node.upgrade().unwrap();
430              node.trigger("recv", arg_data).await.unwrap();
431          }
432      }
433  
434      async fn process_send(me: &Weak<Self>, sub: &MethodCallSub) -> bool {
435          let Ok(method_call) = sub.receive().await else {
436              d!("Event relayer closed");
437              return false
438          };
439  
440          t!("method called: send({method_call:?})");
441          assert!(method_call.send_res.is_none());
442  
443          fn decode_data(data: &[u8]) -> std::io::Result<(Timestamp, String, String)> {
444              let mut cur = Cursor::new(&data);
445              let timest = Timestamp::decode(&mut cur).unwrap();
446              let channel = String::decode(&mut cur)?;
447              let msg = String::decode(&mut cur)?;
448              Ok((timest, channel, msg))
449          }
450  
451          let Ok((timest, channel, msg)) = decode_data(&method_call.data) else {
452              e!("send() method invalid arg data");
453              return true
454          };
455  
456          let Some(self_) = me.upgrade() else {
457              // Should not happen
458              panic!("self destroyed before send_method_task was stopped!");
459          };
460  
461          self_.handle_send(timest, channel, msg).await;
462  
463          true
464      }
465  
466      async fn handle_send(&self, timest: Timestamp, channel: String, msg: String) {
467          let nick = self.nick.get();
468  
469          // Send text to channel
470          d!("Sending privmsg: {timest} {channel}: <{nick}> {msg}");
471          let msg = Privmsg::new(channel, nick, msg);
472          let evgr = self.event_graph.clone();
473          let mut event = event_graph::Event::new(serialize_async(&msg).await, &evgr).await;
474          event.timestamp = timest;
475          let msg_id = msg.msg_id(timest);
476  
477          // Keep track of our own messages so we don't apply timestamp correction to them
478          // which messes up the msg id.
479          {
480              let mut seen = self.seen_msgs.lock().unwrap();
481              seen.push(msg_id.clone(), true);
482          }
483  
484          let mut arg_data = vec![];
485          timest.encode_async(&mut arg_data).await.unwrap();
486          msg_id.encode_async(&mut arg_data).await.unwrap();
487          msg.nick.encode_async(&mut arg_data).await.unwrap();
488          msg.msg.encode_async(&mut arg_data).await.unwrap();
489  
490          // Broadcast the msg
491  
492          if let Err(e) = evgr.dag_insert(&[event.clone()]).await {
493              error!(target: "darkirc", "Failed inserting new event to DAG: {}", e);
494          }
495  
496          self.p2p.broadcast(&EventPut(event)).await;
497      }
498  
499      async fn apply_settings(self_: Arc<Self>, _: BatchGuardPtr) {
500          self_.settings.save_settings();
501  
502          let p2p_settings = self_.p2p.settings();
503          let mut write_guard = p2p_settings.write().await;
504          self_.settings.update_p2p_settings(&mut write_guard);
505      }
506  
507      async fn start(self: Arc<Self>, ex: ExecutorPtr) {
508          i!("Registering EventGraph P2P protocol");
509          let event_graph_ = Arc::clone(&self.event_graph);
510          let registry = self.p2p.protocol_registry();
511          registry
512              .register(SESSION_DEFAULT, move |channel, _| {
513                  let event_graph_ = event_graph_.clone();
514                  async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() }
515              })
516              .await;
517  
518          let me = Arc::downgrade(&self);
519  
520          let node = &self.node.upgrade().unwrap();
521  
522          let method_sub = node.subscribe_method_call("send").unwrap();
523          let me2 = me.clone();
524          let send_method_task =
525              ex.spawn(async move { while Self::process_send(&me2, &method_sub).await {} });
526  
527          let mut on_modify = OnModify::new(ex.clone(), self.node.clone(), me.clone());
528          async fn save_nick(self_: Arc<DarkIrc>, _batch: BatchGuardPtr) {
529              let _ = std::fs::write(nick_filename(), self_.nick.get());
530          }
531          on_modify.when_change(self.nick.prop(), save_nick);
532  
533          // `apply_settings` is triggered if any setting changes
534          for setting_node in self.settings.setting_root.get_children().iter() {
535              on_modify.when_change(
536                  setting_node.get_property("value").clone().unwrap(),
537                  Self::apply_settings,
538              );
539          }
540  
541          let ev_sub = self.event_graph.event_pub.clone().subscribe().await;
542          let ev_task = ex.spawn(self.clone().relay_events(ev_sub));
543  
544          // Sync the DAG
545          let channel_sub = self.p2p.hosts().subscribe_channel().await;
546          let dag_task = ex.spawn(self.clone().dag_sync(channel_sub));
547  
548          let mut tasks = vec![send_method_task, ev_task, dag_task];
549          tasks.append(&mut on_modify.tasks);
550          self.tasks.set(tasks).unwrap();
551      }
552  }