proto.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use async_trait::async_trait; 20 use smol::Executor; 21 use std::{path::StripPrefixError, sync::Arc}; 22 use tracing::{debug, error, info}; 23 24 use darkfi::{ 25 dht::DhtHandler, 26 geode::hash_to_string, 27 impl_p2p_message, 28 net::{ 29 metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION}, 30 ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr, 31 ProtocolJobsManager, ProtocolJobsManagerPtr, 32 }, 33 Error, Result, 34 }; 35 use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature}; 36 use darkfi_serial::{SerialDecodable, SerialEncodable}; 37 38 use crate::{ 39 dht::{FudNode, FudSeeder}, 40 Fud, 41 }; 42 43 /// Message representing a file reply from the network 44 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 45 pub struct FudFileReply { 46 pub chunk_hashes: Vec<blake3::Hash>, 47 } 48 impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 49 50 /// Message representing a directory reply from the network 51 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 52 pub struct FudDirectoryReply { 53 pub chunk_hashes: Vec<blake3::Hash>, 54 pub files: Vec<(String, u64)>, // Vec of (file path, file size) 55 } 56 impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 57 58 /// Message representing a node announcing a key on the network 59 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 60 pub struct FudAnnounce { 61 pub key: blake3::Hash, 62 pub seeders: Vec<FudSeeder>, 63 } 64 impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION); 65 66 /// Message representing a chunk reply from the network 67 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 68 pub struct FudChunkReply { 69 // TODO: This should be a chunk-sized array, but then we need padding? 70 pub chunk: Vec<u8>, 71 } 72 impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 73 74 /// Message representing a chunk reply when a file is not found 75 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 76 pub struct FudNotFound; 77 impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION); 78 79 /// Message representing a ping request on the network 80 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 81 pub struct FudPingRequest { 82 pub random: u64, 83 } 84 impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); 85 86 /// Message representing a ping reply on the network 87 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 88 pub struct FudPingReply { 89 pub node: FudNode, 90 /// Signature of the random u64 from the ping request 91 pub sig: Signature, 92 } 93 impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 94 95 /// Message representing a find file/chunk request from the network 96 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 97 pub struct FudFindRequest { 98 pub info: Option<blake3::Hash>, 99 pub key: blake3::Hash, 100 } 101 impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); 102 103 /// Message representing a find nodes request on the network 104 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 105 pub struct FudFindNodesRequest { 106 pub key: blake3::Hash, 107 } 108 impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION); 109 110 /// Message representing a find nodes reply on the network 111 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 112 pub struct FudFindNodesReply { 113 pub nodes: Vec<FudNode>, 114 } 115 impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 116 117 /// Message representing a find seeders request on the network 118 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 119 pub struct FudFindSeedersRequest { 120 pub key: blake3::Hash, 121 } 122 impl_p2p_message!( 123 FudFindSeedersRequest, 124 "FudFindSeedersRequest", 125 0, 126 0, 127 DEFAULT_METERING_CONFIGURATION 128 ); 129 130 /// Message representing a find seeders reply on the network 131 #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] 132 pub struct FudFindSeedersReply { 133 pub seeders: Vec<FudSeeder>, 134 pub nodes: Vec<FudNode>, 135 } 136 impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION); 137 138 /// P2P protocol implementation for fud. 139 pub struct ProtocolFud { 140 channel: ChannelPtr, 141 ping_request_sub: MessageSubscription<FudPingRequest>, 142 find_request_sub: MessageSubscription<FudFindRequest>, 143 find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>, 144 find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>, 145 announce_sub: MessageSubscription<FudAnnounce>, 146 fud: Arc<Fud>, 147 jobsman: ProtocolJobsManagerPtr, 148 } 149 150 impl ProtocolFud { 151 pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> { 152 debug!( 153 target: "fud::proto::ProtocolFud::init()", 154 "Adding ProtocolFud to the protocol registry" 155 ); 156 157 let msg_subsystem = channel.message_subsystem(); 158 msg_subsystem.add_dispatch::<FudPingRequest>().await; 159 msg_subsystem.add_dispatch::<FudFindRequest>().await; 160 msg_subsystem.add_dispatch::<FudFindNodesRequest>().await; 161 msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await; 162 msg_subsystem.add_dispatch::<FudAnnounce>().await; 163 164 let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?; 165 let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?; 166 let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?; 167 let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?; 168 let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?; 169 170 Ok(Arc::new(Self { 171 channel: channel.clone(), 172 ping_request_sub, 173 find_request_sub, 174 find_nodes_request_sub, 175 find_seeders_request_sub, 176 announce_sub, 177 fud, 178 jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()), 179 })) 180 } 181 182 async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> { 183 debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START"); 184 185 loop { 186 let ping_req = match self.ping_request_sub.receive().await { 187 Ok(v) => v, 188 Err(Error::ChannelStopped) => continue, 189 Err(e) => { 190 error!("{e}"); 191 continue 192 } 193 }; 194 info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST"); 195 196 let reply = FudPingReply { 197 node: self.fud.node().await, 198 sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()), 199 }; 200 match self.channel.send(&reply).await { 201 Ok(()) => continue, 202 Err(_e) => continue, 203 } 204 } 205 } 206 207 async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> { 208 debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START"); 209 210 loop { 211 let request = match self.find_request_sub.receive().await { 212 Ok(v) => v, 213 Err(Error::ChannelStopped) => continue, 214 Err(e) => { 215 error!("{e}"); 216 continue 217 } 218 }; 219 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key)); 220 221 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; 222 if let Some(node) = node { 223 self.fud.dht.update_node(&node).await; 224 } 225 226 if self.handle_fud_chunk_request(&request).await { 227 continue; 228 } 229 230 if self.handle_fud_metadata_request(&request).await { 231 continue; 232 } 233 234 // Request did not match anything we have 235 let reply = FudNotFound {}; 236 info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key)); 237 let _ = self.channel.send(&reply).await; 238 } 239 } 240 241 /// If the FudFindRequest matches a chunk we have, handle it. 242 /// Returns true if the chunk was found. 243 async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool { 244 let hash = request.info; 245 if hash.is_none() { 246 return false; 247 } 248 let hash = hash.unwrap(); 249 250 let path = self.fud.hash_to_path(&hash).ok().flatten(); 251 if path.is_none() { 252 return false; 253 } 254 let path = path.unwrap(); 255 256 let chunked = self.fud.geode.get(&hash, &path).await; 257 if chunked.is_err() { 258 return false; 259 } 260 261 let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await; 262 if let Ok(chunk) = chunk { 263 if !self.fud.geode.verify_chunk(&request.key, &chunk) { 264 // TODO: Run geode GC 265 return false; 266 } 267 let reply = FudChunkReply { chunk }; 268 info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key)); 269 let _ = self.channel.send(&reply).await; 270 return true; 271 } 272 273 false 274 } 275 276 /// If the FudFindRequest matches a file we have, handle it 277 /// Returns true if the file was found. 278 async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool { 279 let path = self.fud.hash_to_path(&request.key).ok().flatten(); 280 if path.is_none() { 281 return false; 282 } 283 let path = path.unwrap(); 284 285 let chunked_file = self.fud.geode.get(&request.key, &path).await.ok(); 286 if chunked_file.is_none() { 287 return false; 288 } 289 let mut chunked_file = chunked_file.unwrap(); 290 291 // If it's a file with a single chunk, just reply with the chunk 292 if chunked_file.len() == 1 && !chunked_file.is_dir() { 293 let chunk_hash = chunked_file.get_chunks()[0].0; 294 let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await; 295 if let Ok(chunk) = chunk { 296 if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key { 297 // TODO: Run geode GC 298 return false; 299 } 300 let reply = FudChunkReply { chunk }; 301 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash)); 302 let _ = self.channel.send(&reply).await; 303 return true; 304 } 305 return false; 306 } 307 308 // Otherwise reply with the metadata 309 match chunked_file.is_dir() { 310 false => { 311 let reply = FudFileReply { 312 chunk_hashes: chunked_file 313 .get_chunks() 314 .iter() 315 .map(|(chunk, _)| *chunk) 316 .collect(), 317 }; 318 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key)); 319 let _ = self.channel.send(&reply).await; 320 } 321 true => { 322 let files = chunked_file 323 .get_files() 324 .iter() 325 .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) { 326 Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)), 327 Err(e) => Err(e), 328 }) 329 .collect::<std::result::Result<Vec<_>, StripPrefixError>>(); 330 if let Err(e) = files { 331 error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}"); 332 return false; 333 } 334 let reply = FudDirectoryReply { 335 chunk_hashes: chunked_file 336 .get_chunks() 337 .iter() 338 .map(|(chunk, _)| *chunk) 339 .collect(), 340 files: files.unwrap(), 341 }; 342 info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key)); 343 let _ = self.channel.send(&reply).await; 344 } 345 }; 346 347 true 348 } 349 350 async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> { 351 debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START"); 352 353 loop { 354 let request = match self.find_nodes_request_sub.receive().await { 355 Ok(v) => v, 356 Err(Error::ChannelStopped) => continue, 357 Err(e) => { 358 error!("{e}"); 359 continue 360 } 361 }; 362 info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key)); 363 364 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; 365 if let Some(node) = node { 366 self.fud.dht.update_node(&node).await; 367 } 368 369 let reply = FudFindNodesReply { 370 nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await, 371 }; 372 match self.channel.send(&reply).await { 373 Ok(()) => continue, 374 Err(_e) => continue, 375 } 376 } 377 } 378 379 async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> { 380 debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START"); 381 382 loop { 383 let request = match self.find_seeders_request_sub.receive().await { 384 Ok(v) => v, 385 Err(Error::ChannelStopped) => continue, 386 Err(e) => { 387 error!("{e}"); 388 continue 389 } 390 }; 391 info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key)); 392 393 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; 394 if let Some(node) = node { 395 self.fud.dht.update_node(&node).await; 396 } 397 398 let router = self.fud.dht.hash_table.read().await; 399 let peers = router.get(&request.key); 400 401 match peers { 402 Some(seeders) => { 403 let _ = self 404 .channel 405 .send(&FudFindSeedersReply { 406 seeders: seeders.to_vec(), 407 nodes: self 408 .fud 409 .dht() 410 .find_neighbors(&request.key, self.fud.dht().settings.k) 411 .await, 412 }) 413 .await; 414 } 415 None => { 416 let _ = self 417 .channel 418 .send(&FudFindSeedersReply { 419 seeders: vec![], 420 nodes: self 421 .fud 422 .dht() 423 .find_neighbors(&request.key, self.fud.dht().settings.k) 424 .await, 425 }) 426 .await; 427 } 428 }; 429 } 430 } 431 432 async fn handle_fud_announce(self: Arc<Self>) -> Result<()> { 433 debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START"); 434 435 loop { 436 let request = match self.announce_sub.receive().await { 437 Ok(v) => v, 438 Err(Error::ChannelStopped) => continue, 439 Err(e) => { 440 error!("{e}"); 441 continue 442 } 443 }; 444 info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key)); 445 446 let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await; 447 if let Some(node) = node { 448 self.fud.dht.update_node(&node).await; 449 } 450 451 let mut seeders = vec![]; 452 453 for seeder in request.seeders.clone() { 454 if seeder.node.addresses.is_empty() { 455 continue 456 } 457 // TODO: Verify each address 458 seeders.push(seeder); 459 } 460 461 self.fud.add_value(&request.key, &seeders).await; 462 } 463 } 464 } 465 466 #[async_trait] 467 impl ProtocolBase for ProtocolFud { 468 async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> { 469 debug!(target: "fud::ProtocolFud::start()", "START"); 470 self.jobsman.clone().start(executor.clone()); 471 self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await; 472 self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await; 473 self.jobsman 474 .clone() 475 .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone()) 476 .await; 477 self.jobsman 478 .clone() 479 .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone()) 480 .await; 481 self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await; 482 debug!(target: "fud::ProtocolFud::start()", "END"); 483 Ok(()) 484 } 485 486 fn name(&self) -> &'static str { 487 "ProtocolFud" 488 } 489 }