darkirc.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use async_trait::async_trait; 20 use darkfi::{ 21 event_graph::{ 22 self, 23 proto::{EventPut, ProtocolEventGraph}, 24 EventGraph, EventGraphPtr, 25 }, 26 net::{session::SESSION_DEFAULT, settings::Settings as NetSettings, ChannelPtr, P2p, P2pPtr}, 27 system::{sleep, Subscription}, 28 Result as DarkFiResult, 29 }; 30 use darkfi_serial::{ 31 deserialize_async, serialize, serialize_async, AsyncEncodable, Decodable, Encodable, 32 SerialDecodable, SerialEncodable, 33 }; 34 use sled_overlay::sled; 35 use std::{ 36 io::Cursor, 37 sync::{Arc, Mutex as SyncMutex, OnceLock, Weak}, 38 time::UNIX_EPOCH, 39 }; 40 41 use crate::{ 42 error::{Error, Result}, 43 prop::{BatchGuardPtr, PropertyAtomicGuard, PropertyStr, Role}, 44 scene::{MethodCallSub, Pimpl, SceneNode, SceneNodeType, SceneNodeWeak}, 45 ui::{ 46 chatview::{MessageId, Timestamp}, 47 OnModify, 48 }, 49 ExecutorPtr, 50 }; 51 52 use super::PluginSettings; 53 54 const P2P_RETRY_TIME: u64 = 20; 55 const COOLOFF_SLEEP_TIME: u64 = 20; 56 const COOLOFF_SYNC_ATTEMPTS: usize = 6; 57 const SYNC_MIN_PEERS: usize = 2; 58 59 /// Due to drift between different machine's clocks, if the message timestamp is recent 60 /// then we will just correct it to the current time so messages appear sequential in the UI. 61 const RECENT_TIME_DIST: u64 = 25_000; 62 63 #[cfg(target_os = "android")] 64 mod paths { 65 use crate::android::{get_appdata_path, get_external_storage_path}; 66 use std::path::PathBuf; 67 68 pub fn get_evgrdb_path() -> PathBuf { 69 get_external_storage_path().join("evgr") 70 } 71 pub fn get_use_tor_filename() -> PathBuf { 72 get_external_storage_path().join("use_tor.txt") 73 } 74 75 pub fn nick_filename() -> PathBuf { 76 get_appdata_path().join("/nick.txt") 77 } 78 79 pub fn p2p_datastore_path() -> PathBuf { 80 get_appdata_path().join("darkirc_p2p") 81 } 82 pub fn hostlist_path() -> PathBuf { 83 get_appdata_path().join("hostlist.tsv") 84 } 85 } 86 87 #[cfg(not(target_os = "android"))] 88 mod paths { 89 use std::path::PathBuf; 90 91 pub fn get_evgrdb_path() -> PathBuf { 92 dirs::data_local_dir().unwrap().join("darkfi/app/evgr") 93 } 94 pub fn get_use_tor_filename() -> PathBuf { 95 dirs::data_local_dir().unwrap().join("darkfi/app/use_tor.txt") 96 } 97 98 pub fn nick_filename() -> PathBuf { 99 dirs::cache_dir().unwrap().join("darkfi/app/nick.txt") 100 } 101 102 pub fn p2p_datastore_path() -> PathBuf { 103 dirs::cache_dir().unwrap().join("darkfi/app/darkirc_p2p") 104 } 105 pub fn hostlist_path() -> PathBuf { 106 dirs::cache_dir().unwrap().join("darkfi/app/hostlist.tsv") 107 } 108 } 109 110 use paths::*; 111 112 macro_rules! t { ($($arg:tt)*) => { trace!(target: "plugin::darkirc", $($arg)*); } } 113 macro_rules! d { ($($arg:tt)*) => { debug!(target: "plugin::darkirc", $($arg)*); } } 114 macro_rules! i { ($($arg:tt)*) => { info!(target: "plugin::darkirc", $($arg)*); } } 115 macro_rules! e { ($($arg:tt)*) => { error!(target: "plugin::darkirc", $($arg)*); } } 116 macro_rules! w { ($($arg:tt)*) => { warn!(target: "plugin::darkirc", $($arg)*); } } 117 118 #[derive(Clone, Debug, SerialEncodable, SerialDecodable)] 119 pub struct Privmsg { 120 pub channel: String, 121 pub nick: String, 122 pub msg: String, 123 } 124 125 impl Privmsg { 126 pub fn new(channel: String, nick: String, msg: String) -> Self { 127 Self { channel, nick, msg } 128 } 129 130 pub fn msg_id(&self, timest: u64) -> MessageId { 131 let mut hasher = blake3::Hasher::new(); 132 0u8.encode(&mut hasher).unwrap(); 133 0u8.encode(&mut hasher).unwrap(); 134 timest.encode(&mut hasher).unwrap(); 135 self.channel.encode(&mut hasher).unwrap(); 136 self.nick.encode(&mut hasher).unwrap(); 137 self.msg.encode(&mut hasher).unwrap(); 138 MessageId(hasher.finalize().into()) 139 } 140 } 141 142 struct SeenMsg { 143 id: MessageId, 144 is_self: bool, 145 seen_times: usize, 146 } 147 148 struct SeenMessages { 149 seen: Vec<SeenMsg>, 150 } 151 152 impl SeenMessages { 153 fn new() -> Self { 154 Self { seen: vec![] } 155 } 156 157 fn get_status(&self, id: &MessageId) -> Option<&SeenMsg> { 158 self.seen.iter().find(|s| s.id == *id) 159 } 160 161 fn push(&mut self, id: MessageId, is_self: bool) { 162 self.seen.push(SeenMsg { id, is_self, seen_times: 0 }); 163 } 164 } 165 166 pub type DarkIrcPtr = Arc<DarkIrc>; 167 168 pub struct DarkIrc { 169 node: SceneNodeWeak, 170 tasks: OnceLock<Vec<smol::Task<()>>>, 171 172 p2p: P2pPtr, 173 event_graph: EventGraphPtr, 174 175 seen_msgs: SyncMutex<SeenMessages>, 176 nick: PropertyStr, 177 178 settings: PluginSettings, 179 } 180 181 impl DarkIrc { 182 pub async fn new(node: SceneNodeWeak, ex: ExecutorPtr) -> Result<Pimpl> { 183 let node_ref = &node.upgrade().unwrap(); 184 let nick = PropertyStr::wrap(node_ref, Role::Internal, "nick", 0).unwrap(); 185 186 let setting_root = Arc::new(SceneNode::new("setting", SceneNodeType::SettingRoot)); 187 node_ref.link(setting_root.clone()); 188 189 i!("Starting DarkIRC backend"); 190 let evgr_path = get_evgrdb_path(); 191 let db = match sled::open(&evgr_path) { 192 Ok(db) => db, 193 Err(err) => { 194 e!("Sled database '{}' failed to open: {err}!", evgr_path.display()); 195 return Err(Error::SledDbErr) 196 } 197 }; 198 199 let setting_tree = db.open_tree("settings")?; 200 let settings = PluginSettings { setting_root, sled_tree: setting_tree }; 201 202 let mut p2p_settings: NetSettings = Default::default(); 203 p2p_settings.app_version = semver::Version::parse("0.5.0").unwrap(); 204 if get_use_tor_filename().exists() { 205 i!("Setup P2P network [tor]"); 206 p2p_settings.outbound_connect_timeout = 60; 207 p2p_settings.channel_handshake_timeout = 55; 208 p2p_settings.channel_heartbeat_interval = 90; 209 p2p_settings.outbound_peer_discovery_cooloff_time = 60; 210 211 p2p_settings.seeds.push( 212 url::Url::parse( 213 "tor://g7fxelebievvpr27w7gt24lflptpw3jeeuvafovgliq5utdst6xyruyd.onion:25552", 214 ) 215 .unwrap(), 216 ); 217 p2p_settings.seeds.push( 218 url::Url::parse( 219 "tor://yvklzjnfmwxhyodhrkpomawjcdvcaushsj6torjz2gyd7e25f3gfunyd.onion:25552", 220 ) 221 .unwrap(), 222 ); 223 p2p_settings.allowed_transports = vec!["tor".to_string()]; 224 } else { 225 i!("Setup P2P network [clearnet]"); 226 p2p_settings.outbound_connect_timeout = 40; 227 p2p_settings.channel_handshake_timeout = 30; 228 229 p2p_settings.outbound_connections = 3; 230 p2p_settings.inbound_connections = 0; 231 232 p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith0.dark.fi:25551").unwrap()); 233 p2p_settings.seeds.push(url::Url::parse("tcp+tls://lilith1.dark.fi:25551").unwrap()); 234 } 235 p2p_settings.p2p_datastore = p2p_datastore_path().into_os_string().into_string().ok(); 236 p2p_settings.hostlist = hostlist_path().into_os_string().into_string().ok(); 237 238 settings.add_p2p_settings(&p2p_settings); 239 240 settings.load_settings(); 241 settings.update_p2p_settings(&mut p2p_settings); 242 243 let p2p = match P2p::new(p2p_settings.clone(), ex.clone()).await { 244 Ok(p2p) => p2p, 245 Err(err) => { 246 e!("Create p2p network failed: {err}!"); 247 return Err(Error::ServiceFailed) 248 } 249 }; 250 251 let event_graph = match EventGraph::new( 252 p2p.clone(), 253 db.clone(), 254 std::path::PathBuf::new(), 255 false, 256 "darkirc_dag", 257 1, 258 ex.clone(), 259 ) 260 .await 261 { 262 Ok(evgr) => evgr, 263 Err(err) => { 264 e!("Create event graph failed: {err}!"); 265 return Err(Error::ServiceFailed) 266 } 267 }; 268 269 if let Ok(prev_nick) = std::fs::read_to_string(nick_filename()) { 270 nick.set(&mut PropertyAtomicGuard::none(), prev_nick); 271 } 272 273 let self_ = Arc::new(Self { 274 node: node.clone(), 275 tasks: OnceLock::new(), 276 277 p2p, 278 event_graph, 279 280 seen_msgs: SyncMutex::new(SeenMessages::new()), 281 nick, 282 settings, 283 }); 284 self_.clone().start(ex).await; 285 Ok(Pimpl::DarkIrc(self_)) 286 } 287 288 async fn dag_sync(self: Arc<Self>, channel_sub: Subscription<DarkFiResult<ChannelPtr>>) { 289 i!("Starting p2p network"); 290 while let Err(err) = self.p2p.clone().start().await { 291 // This usually means we cannot listen on the inbound ports 292 e!("Failed to start p2p network: {err}!"); 293 e!("Usually this means there is another process listening on the same ports."); 294 e!("Trying again in {P2P_RETRY_TIME} secs"); 295 sleep(P2P_RETRY_TIME).await; 296 } 297 298 i!("Waiting for some P2P connections..."); 299 300 let mut sync_attempt = 0; 301 loop { 302 // Wait for a channel 303 if let Err(err) = channel_sub.receive().await { 304 w!("There was an error listening for channels. The service closed unexpectedly with error: {err}"); 305 continue 306 } 307 308 let peers_count = self.p2p.peers_count(); 309 self.notify_connect(peers_count, false).await; 310 311 // Wait until we have enough connections 312 if peers_count < SYNC_MIN_PEERS { 313 i!("Connected to {peers_count} peers. Waiting for more connections."); 314 continue 315 } 316 317 sync_attempt += 1; 318 319 // Cool off periodically 320 if sync_attempt > COOLOFF_SYNC_ATTEMPTS { 321 i!("Wasn't able to sync yet. Cooling off for {COOLOFF_SLEEP_TIME} then will try again."); 322 sleep(COOLOFF_SLEEP_TIME).await; 323 sync_attempt = 0; 324 } 325 326 i!("Syncing event DAG (attempt #{sync_attempt})"); 327 match self.event_graph.dag_sync().await { 328 Ok(()) => break, 329 Err(e) => { 330 // TODO: Maybe at this point we should prune or something? 331 // TODO: Or maybe just tell the user to delete the DAG from FS. 332 w!("Failed DAG sync: ({e}). Waiting for more connections before retry."); 333 } 334 } 335 } 336 337 let peers_count = self.p2p.peers_count(); 338 self.notify_connect(peers_count, true).await; 339 340 // Initial sync finished. Now just notify of connection changes 341 loop { 342 // Wait for a channel 343 if let Err(err) = channel_sub.receive().await { 344 w!("There was an error listening for channels. The service closed unexpectedly with error: {err}"); 345 continue 346 } 347 348 let peers_count = self.p2p.peers_count(); 349 self.notify_connect(peers_count, true).await; 350 } 351 } 352 353 async fn notify_connect(&self, peers_count: usize, is_dag_synced: bool) { 354 let node = self.node.upgrade().unwrap(); 355 node.trigger("connect", serialize(&(peers_count as u32, is_dag_synced))).await.unwrap(); 356 } 357 358 async fn relay_events(self: Arc<Self>, ev_sub: Subscription<event_graph::Event>) { 359 loop { 360 let ev = ev_sub.receive().await; 361 362 // Try to deserialize the `Event`'s content into a `Privmsg` 363 let privmsg: Privmsg = match deserialize_async(ev.content()).await { 364 Ok(v) => v, 365 Err(e) => { 366 e!("[IRC CLIENT] Failed deserializing incoming Privmsg event: {}", e); 367 continue 368 } 369 }; 370 371 let mut timest = ev.timestamp; 372 let msg_id = privmsg.msg_id(timest); 373 t!( 374 "Relaying ev_id={:?}, ev={ev:?}, msg_id={msg_id}, privmsg={privmsg:?}, timest={timest}", 375 ev.id(), 376 ); 377 378 let is_self = { 379 let mut is_self = false; 380 let mut seen = self.seen_msgs.lock().unwrap(); 381 match seen.get_status(&msg_id) { 382 Some(msg) => { 383 is_self = msg.is_self; 384 385 if !msg.is_self || msg.seen_times > 1 { 386 warn!(target: "plugin::darkirc", "Skipping duplicate seen message: {msg_id}"); 387 continue 388 } 389 } 390 None => { 391 seen.push(msg_id.clone(), false); 392 } 393 } 394 is_self 395 }; 396 397 // This is a hack to make messages appear sequentially in the UI 398 let now_timest = UNIX_EPOCH.elapsed().unwrap().as_millis() as u64; 399 if !is_self && timest.abs_diff(now_timest) < RECENT_TIME_DIST { 400 d!("Applied timestamp correction: <{timest}> => <{now_timest}>"); 401 timest = now_timest; 402 } 403 404 // Strip off starting # 405 let mut channel = privmsg.channel; 406 if channel.is_empty() { 407 warn!(target: "plugin::darkirc", "Received privmsg with empty channel!"); 408 continue 409 } 410 if channel.chars().next().unwrap() != '#' { 411 warn!(target: "plugin::darkirc", "Skipping encrypted channel: {channel}"); 412 continue 413 } 414 channel.remove(0); 415 416 // Workaround for the chatview hack. This nick is off limits! 417 let mut nick = privmsg.nick; 418 if nick == "NOTICE" { 419 nick = "noticer".to_string(); 420 } 421 422 let mut arg_data = vec![]; 423 channel.encode(&mut arg_data).unwrap(); 424 timest.encode(&mut arg_data).unwrap(); 425 msg_id.encode(&mut arg_data).unwrap(); 426 nick.encode(&mut arg_data).unwrap(); 427 privmsg.msg.encode(&mut arg_data).unwrap(); 428 429 let node = self.node.upgrade().unwrap(); 430 node.trigger("recv", arg_data).await.unwrap(); 431 } 432 } 433 434 async fn process_send(me: &Weak<Self>, sub: &MethodCallSub) -> bool { 435 let Ok(method_call) = sub.receive().await else { 436 d!("Event relayer closed"); 437 return false 438 }; 439 440 t!("method called: send({method_call:?})"); 441 assert!(method_call.send_res.is_none()); 442 443 fn decode_data(data: &[u8]) -> std::io::Result<(Timestamp, String, String)> { 444 let mut cur = Cursor::new(&data); 445 let timest = Timestamp::decode(&mut cur).unwrap(); 446 let channel = String::decode(&mut cur)?; 447 let msg = String::decode(&mut cur)?; 448 Ok((timest, channel, msg)) 449 } 450 451 let Ok((timest, channel, msg)) = decode_data(&method_call.data) else { 452 e!("send() method invalid arg data"); 453 return true 454 }; 455 456 let Some(self_) = me.upgrade() else { 457 // Should not happen 458 panic!("self destroyed before send_method_task was stopped!"); 459 }; 460 461 self_.handle_send(timest, channel, msg).await; 462 463 true 464 } 465 466 async fn handle_send(&self, timest: Timestamp, channel: String, msg: String) { 467 let nick = self.nick.get(); 468 469 // Send text to channel 470 d!("Sending privmsg: {timest} {channel}: <{nick}> {msg}"); 471 let msg = Privmsg::new(channel, nick, msg); 472 let evgr = self.event_graph.clone(); 473 let mut event = event_graph::Event::new(serialize_async(&msg).await, &evgr).await; 474 event.timestamp = timest; 475 let msg_id = msg.msg_id(timest); 476 477 // Keep track of our own messages so we don't apply timestamp correction to them 478 // which messes up the msg id. 479 { 480 let mut seen = self.seen_msgs.lock().unwrap(); 481 seen.push(msg_id.clone(), true); 482 } 483 484 let mut arg_data = vec![]; 485 timest.encode_async(&mut arg_data).await.unwrap(); 486 msg_id.encode_async(&mut arg_data).await.unwrap(); 487 msg.nick.encode_async(&mut arg_data).await.unwrap(); 488 msg.msg.encode_async(&mut arg_data).await.unwrap(); 489 490 // Broadcast the msg 491 492 if let Err(e) = evgr.dag_insert(&[event.clone()]).await { 493 error!(target: "darkirc", "Failed inserting new event to DAG: {}", e); 494 } 495 496 self.p2p.broadcast(&EventPut(event)).await; 497 } 498 499 async fn apply_settings(self_: Arc<Self>, _: BatchGuardPtr) { 500 self_.settings.save_settings(); 501 502 let p2p_settings = self_.p2p.settings(); 503 let mut write_guard = p2p_settings.write().await; 504 self_.settings.update_p2p_settings(&mut write_guard); 505 } 506 507 async fn start(self: Arc<Self>, ex: ExecutorPtr) { 508 i!("Registering EventGraph P2P protocol"); 509 let event_graph_ = Arc::clone(&self.event_graph); 510 let registry = self.p2p.protocol_registry(); 511 registry 512 .register(SESSION_DEFAULT, move |channel, _| { 513 let event_graph_ = event_graph_.clone(); 514 async move { ProtocolEventGraph::init(event_graph_, channel).await.unwrap() } 515 }) 516 .await; 517 518 let me = Arc::downgrade(&self); 519 520 let node = &self.node.upgrade().unwrap(); 521 522 let method_sub = node.subscribe_method_call("send").unwrap(); 523 let me2 = me.clone(); 524 let send_method_task = 525 ex.spawn(async move { while Self::process_send(&me2, &method_sub).await {} }); 526 527 let mut on_modify = OnModify::new(ex.clone(), self.node.clone(), me.clone()); 528 async fn save_nick(self_: Arc<DarkIrc>, _batch: BatchGuardPtr) { 529 let _ = std::fs::write(nick_filename(), self_.nick.get()); 530 } 531 on_modify.when_change(self.nick.prop(), save_nick); 532 533 // `apply_settings` is triggered if any setting changes 534 for setting_node in self.settings.setting_root.get_children().iter() { 535 on_modify.when_change( 536 setting_node.get_property("value").clone().unwrap(), 537 Self::apply_settings, 538 ); 539 } 540 541 let ev_sub = self.event_graph.event_pub.clone().subscribe().await; 542 let ev_task = ex.spawn(self.clone().relay_events(ev_sub)); 543 544 // Sync the DAG 545 let channel_sub = self.p2p.hosts().subscribe_channel().await; 546 let dag_task = ex.spawn(self.clone().dag_sync(channel_sub)); 547 548 let mut tasks = vec![send_method_task, ev_task, dag_task]; 549 tasks.append(&mut on_modify.tasks); 550 self.tasks.set(tasks).unwrap(); 551 } 552 }