/ bin / fud / fud / src / proto.rs
proto.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use async_trait::async_trait;
 20  use smol::Executor;
 21  use std::{path::StripPrefixError, sync::Arc};
 22  use tracing::{debug, error, info};
 23  
 24  use darkfi::{
 25      dht::DhtHandler,
 26      geode::hash_to_string,
 27      impl_p2p_message,
 28      net::{
 29          metering::{MeteringConfiguration, DEFAULT_METERING_CONFIGURATION},
 30          ChannelPtr, Message, MessageSubscription, P2pPtr, ProtocolBase, ProtocolBasePtr,
 31          ProtocolJobsManager, ProtocolJobsManagerPtr,
 32      },
 33      Error, Result,
 34  };
 35  use darkfi_sdk::crypto::schnorr::{SchnorrSecret, Signature};
 36  use darkfi_serial::{SerialDecodable, SerialEncodable};
 37  
 38  use crate::{
 39      dht::{FudNode, FudSeeder},
 40      Fud,
 41  };
 42  
 43  /// Message representing a file reply from the network
 44  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 45  pub struct FudFileReply {
 46      pub chunk_hashes: Vec<blake3::Hash>,
 47  }
 48  impl_p2p_message!(FudFileReply, "FudFileReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
 49  
 50  /// Message representing a directory reply from the network
 51  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 52  pub struct FudDirectoryReply {
 53      pub chunk_hashes: Vec<blake3::Hash>,
 54      pub files: Vec<(String, u64)>, // Vec of (file path, file size)
 55  }
 56  impl_p2p_message!(FudDirectoryReply, "FudDirectoryReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
 57  
 58  /// Message representing a node announcing a key on the network
 59  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 60  pub struct FudAnnounce {
 61      pub key: blake3::Hash,
 62      pub seeders: Vec<FudSeeder>,
 63  }
 64  impl_p2p_message!(FudAnnounce, "FudAnnounce", 0, 0, DEFAULT_METERING_CONFIGURATION);
 65  
 66  /// Message representing a chunk reply from the network
 67  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 68  pub struct FudChunkReply {
 69      // TODO: This should be a chunk-sized array, but then we need padding?
 70      pub chunk: Vec<u8>,
 71  }
 72  impl_p2p_message!(FudChunkReply, "FudChunkReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
 73  
 74  /// Message representing a chunk reply when a file is not found
 75  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 76  pub struct FudNotFound;
 77  impl_p2p_message!(FudNotFound, "FudNotFound", 0, 0, DEFAULT_METERING_CONFIGURATION);
 78  
 79  /// Message representing a ping request on the network
 80  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 81  pub struct FudPingRequest {
 82      pub random: u64,
 83  }
 84  impl_p2p_message!(FudPingRequest, "FudPingRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
 85  
 86  /// Message representing a ping reply on the network
 87  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 88  pub struct FudPingReply {
 89      pub node: FudNode,
 90      /// Signature of the random u64 from the ping request
 91      pub sig: Signature,
 92  }
 93  impl_p2p_message!(FudPingReply, "FudPingReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
 94  
 95  /// Message representing a find file/chunk request from the network
 96  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
 97  pub struct FudFindRequest {
 98      pub info: Option<blake3::Hash>,
 99      pub key: blake3::Hash,
100  }
101  impl_p2p_message!(FudFindRequest, "FudFindRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
102  
103  /// Message representing a find nodes request on the network
104  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
105  pub struct FudFindNodesRequest {
106      pub key: blake3::Hash,
107  }
108  impl_p2p_message!(FudFindNodesRequest, "FudFindNodesRequest", 0, 0, DEFAULT_METERING_CONFIGURATION);
109  
110  /// Message representing a find nodes reply on the network
111  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
112  pub struct FudFindNodesReply {
113      pub nodes: Vec<FudNode>,
114  }
115  impl_p2p_message!(FudFindNodesReply, "FudFindNodesReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
116  
117  /// Message representing a find seeders request on the network
118  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
119  pub struct FudFindSeedersRequest {
120      pub key: blake3::Hash,
121  }
122  impl_p2p_message!(
123      FudFindSeedersRequest,
124      "FudFindSeedersRequest",
125      0,
126      0,
127      DEFAULT_METERING_CONFIGURATION
128  );
129  
130  /// Message representing a find seeders reply on the network
131  #[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
132  pub struct FudFindSeedersReply {
133      pub seeders: Vec<FudSeeder>,
134      pub nodes: Vec<FudNode>,
135  }
136  impl_p2p_message!(FudFindSeedersReply, "FudFindSeedersReply", 0, 0, DEFAULT_METERING_CONFIGURATION);
137  
138  /// P2P protocol implementation for fud.
139  pub struct ProtocolFud {
140      channel: ChannelPtr,
141      ping_request_sub: MessageSubscription<FudPingRequest>,
142      find_request_sub: MessageSubscription<FudFindRequest>,
143      find_nodes_request_sub: MessageSubscription<FudFindNodesRequest>,
144      find_seeders_request_sub: MessageSubscription<FudFindSeedersRequest>,
145      announce_sub: MessageSubscription<FudAnnounce>,
146      fud: Arc<Fud>,
147      jobsman: ProtocolJobsManagerPtr,
148  }
149  
150  impl ProtocolFud {
151      pub async fn init(fud: Arc<Fud>, channel: ChannelPtr, _: P2pPtr) -> Result<ProtocolBasePtr> {
152          debug!(
153              target: "fud::proto::ProtocolFud::init()",
154              "Adding ProtocolFud to the protocol registry"
155          );
156  
157          let msg_subsystem = channel.message_subsystem();
158          msg_subsystem.add_dispatch::<FudPingRequest>().await;
159          msg_subsystem.add_dispatch::<FudFindRequest>().await;
160          msg_subsystem.add_dispatch::<FudFindNodesRequest>().await;
161          msg_subsystem.add_dispatch::<FudFindSeedersRequest>().await;
162          msg_subsystem.add_dispatch::<FudAnnounce>().await;
163  
164          let ping_request_sub = channel.subscribe_msg::<FudPingRequest>().await?;
165          let find_request_sub = channel.subscribe_msg::<FudFindRequest>().await?;
166          let find_nodes_request_sub = channel.subscribe_msg::<FudFindNodesRequest>().await?;
167          let find_seeders_request_sub = channel.subscribe_msg::<FudFindSeedersRequest>().await?;
168          let announce_sub = channel.subscribe_msg::<FudAnnounce>().await?;
169  
170          Ok(Arc::new(Self {
171              channel: channel.clone(),
172              ping_request_sub,
173              find_request_sub,
174              find_nodes_request_sub,
175              find_seeders_request_sub,
176              announce_sub,
177              fud,
178              jobsman: ProtocolJobsManager::new("ProtocolFud", channel.clone()),
179          }))
180      }
181  
182      async fn handle_fud_ping_request(self: Arc<Self>) -> Result<()> {
183          debug!(target: "fud::ProtocolFud::handle_fud_ping_request()", "START");
184  
185          loop {
186              let ping_req = match self.ping_request_sub.receive().await {
187                  Ok(v) => v,
188                  Err(Error::ChannelStopped) => continue,
189                  Err(e) => {
190                      error!("{e}");
191                      continue
192                  }
193              };
194              info!(target: "fud::ProtocolFud::handle_fud_ping_request()", "Received PING REQUEST");
195  
196              let reply = FudPingReply {
197                  node: self.fud.node().await,
198                  sig: self.fud.secret_key.read().await.sign(&ping_req.random.to_be_bytes()),
199              };
200              match self.channel.send(&reply).await {
201                  Ok(()) => continue,
202                  Err(_e) => continue,
203              }
204          }
205      }
206  
207      async fn handle_fud_find_request(self: Arc<Self>) -> Result<()> {
208          debug!(target: "fud::ProtocolFud::handle_fud_find_request()", "START");
209  
210          loop {
211              let request = match self.find_request_sub.receive().await {
212                  Ok(v) => v,
213                  Err(Error::ChannelStopped) => continue,
214                  Err(e) => {
215                      error!("{e}");
216                      continue
217                  }
218              };
219              info!(target: "fud::ProtocolFud::handle_fud_find_request()", "Received FIND for {}", hash_to_string(&request.key));
220  
221              let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
222              if let Some(node) = node {
223                  self.fud.dht.update_node(&node).await;
224              }
225  
226              if self.handle_fud_chunk_request(&request).await {
227                  continue;
228              }
229  
230              if self.handle_fud_metadata_request(&request).await {
231                  continue;
232              }
233  
234              // Request did not match anything we have
235              let reply = FudNotFound {};
236              info!(target: "fud::ProtocolFud::handle_fud_find_request()", "We do not have {}", hash_to_string(&request.key));
237              let _ = self.channel.send(&reply).await;
238          }
239      }
240  
241      /// If the FudFindRequest matches a chunk we have, handle it.
242      /// Returns true if the chunk was found.
243      async fn handle_fud_chunk_request(&self, request: &FudFindRequest) -> bool {
244          let hash = request.info;
245          if hash.is_none() {
246              return false;
247          }
248          let hash = hash.unwrap();
249  
250          let path = self.fud.hash_to_path(&hash).ok().flatten();
251          if path.is_none() {
252              return false;
253          }
254          let path = path.unwrap();
255  
256          let chunked = self.fud.geode.get(&hash, &path).await;
257          if chunked.is_err() {
258              return false;
259          }
260  
261          let chunk = self.fud.geode.get_chunk(&mut chunked.unwrap(), &request.key).await;
262          if let Ok(chunk) = chunk {
263              if !self.fud.geode.verify_chunk(&request.key, &chunk) {
264                  // TODO: Run geode GC
265                  return false;
266              }
267              let reply = FudChunkReply { chunk };
268              info!(target: "fud::ProtocolFud::handle_fud_chunk_request()", "Sending chunk {}", hash_to_string(&request.key));
269              let _ = self.channel.send(&reply).await;
270              return true;
271          }
272  
273          false
274      }
275  
276      /// If the FudFindRequest matches a file we have, handle it
277      /// Returns true if the file was found.
278      async fn handle_fud_metadata_request(&self, request: &FudFindRequest) -> bool {
279          let path = self.fud.hash_to_path(&request.key).ok().flatten();
280          if path.is_none() {
281              return false;
282          }
283          let path = path.unwrap();
284  
285          let chunked_file = self.fud.geode.get(&request.key, &path).await.ok();
286          if chunked_file.is_none() {
287              return false;
288          }
289          let mut chunked_file = chunked_file.unwrap();
290  
291          // If it's a file with a single chunk, just reply with the chunk
292          if chunked_file.len() == 1 && !chunked_file.is_dir() {
293              let chunk_hash = chunked_file.get_chunks()[0].0;
294              let chunk = self.fud.geode.get_chunk(&mut chunked_file, &chunk_hash).await;
295              if let Ok(chunk) = chunk {
296                  if blake3::hash(blake3::hash(&chunk).as_bytes()) != request.key {
297                      // TODO: Run geode GC
298                      return false;
299                  }
300                  let reply = FudChunkReply { chunk };
301                  info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending chunk (file has a single chunk) {}", hash_to_string(&chunk_hash));
302                  let _ = self.channel.send(&reply).await;
303                  return true;
304              }
305              return false;
306          }
307  
308          // Otherwise reply with the metadata
309          match chunked_file.is_dir() {
310              false => {
311                  let reply = FudFileReply {
312                      chunk_hashes: chunked_file
313                          .get_chunks()
314                          .iter()
315                          .map(|(chunk, _)| *chunk)
316                          .collect(),
317                  };
318                  info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending file metadata {}", hash_to_string(&request.key));
319                  let _ = self.channel.send(&reply).await;
320              }
321              true => {
322                  let files = chunked_file
323                      .get_files()
324                      .iter()
325                      .map(|(file_path, size)| match file_path.strip_prefix(path.clone()) {
326                          Ok(rel_path) => Ok((rel_path.to_string_lossy().to_string(), *size)),
327                          Err(e) => Err(e),
328                      })
329                      .collect::<std::result::Result<Vec<_>, StripPrefixError>>();
330                  if let Err(e) = files {
331                      error!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Error parsing file paths before sending directory metadata: {e}");
332                      return false;
333                  }
334                  let reply = FudDirectoryReply {
335                      chunk_hashes: chunked_file
336                          .get_chunks()
337                          .iter()
338                          .map(|(chunk, _)| *chunk)
339                          .collect(),
340                      files: files.unwrap(),
341                  };
342                  info!(target: "fud::ProtocolFud::handle_fud_metadata_request()", "Sending directory metadata {}", hash_to_string(&request.key));
343                  let _ = self.channel.send(&reply).await;
344              }
345          };
346  
347          true
348      }
349  
350      async fn handle_fud_find_nodes_request(self: Arc<Self>) -> Result<()> {
351          debug!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "START");
352  
353          loop {
354              let request = match self.find_nodes_request_sub.receive().await {
355                  Ok(v) => v,
356                  Err(Error::ChannelStopped) => continue,
357                  Err(e) => {
358                      error!("{e}");
359                      continue
360                  }
361              };
362              info!(target: "fud::ProtocolFud::handle_fud_find_nodes_request()", "Received FIND NODES for {}", hash_to_string(&request.key));
363  
364              let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
365              if let Some(node) = node {
366                  self.fud.dht.update_node(&node).await;
367              }
368  
369              let reply = FudFindNodesReply {
370                  nodes: self.fud.dht().find_neighbors(&request.key, self.fud.dht().settings.k).await,
371              };
372              match self.channel.send(&reply).await {
373                  Ok(()) => continue,
374                  Err(_e) => continue,
375              }
376          }
377      }
378  
379      async fn handle_fud_find_seeders_request(self: Arc<Self>) -> Result<()> {
380          debug!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "START");
381  
382          loop {
383              let request = match self.find_seeders_request_sub.receive().await {
384                  Ok(v) => v,
385                  Err(Error::ChannelStopped) => continue,
386                  Err(e) => {
387                      error!("{e}");
388                      continue
389                  }
390              };
391              info!(target: "fud::ProtocolFud::handle_fud_find_seeders_request()", "Received FIND SEEDERS for {}", hash_to_string(&request.key));
392  
393              let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
394              if let Some(node) = node {
395                  self.fud.dht.update_node(&node).await;
396              }
397  
398              let router = self.fud.dht.hash_table.read().await;
399              let peers = router.get(&request.key);
400  
401              match peers {
402                  Some(seeders) => {
403                      let _ = self
404                          .channel
405                          .send(&FudFindSeedersReply {
406                              seeders: seeders.to_vec(),
407                              nodes: self
408                                  .fud
409                                  .dht()
410                                  .find_neighbors(&request.key, self.fud.dht().settings.k)
411                                  .await,
412                          })
413                          .await;
414                  }
415                  None => {
416                      let _ = self
417                          .channel
418                          .send(&FudFindSeedersReply {
419                              seeders: vec![],
420                              nodes: self
421                                  .fud
422                                  .dht()
423                                  .find_neighbors(&request.key, self.fud.dht().settings.k)
424                                  .await,
425                          })
426                          .await;
427                  }
428              };
429          }
430      }
431  
432      async fn handle_fud_announce(self: Arc<Self>) -> Result<()> {
433          debug!(target: "fud::ProtocolFud::handle_fud_announce()", "START");
434  
435          loop {
436              let request = match self.announce_sub.receive().await {
437                  Ok(v) => v,
438                  Err(Error::ChannelStopped) => continue,
439                  Err(e) => {
440                      error!("{e}");
441                      continue
442                  }
443              };
444              info!(target: "fud::ProtocolFud::handle_fud_announce()", "Received ANNOUNCE for {}", hash_to_string(&request.key));
445  
446              let node = self.fud.dht().get_node_from_channel(self.channel.info.id).await;
447              if let Some(node) = node {
448                  self.fud.dht.update_node(&node).await;
449              }
450  
451              let mut seeders = vec![];
452  
453              for seeder in request.seeders.clone() {
454                  if seeder.node.addresses.is_empty() {
455                      continue
456                  }
457                  // TODO: Verify each address
458                  seeders.push(seeder);
459              }
460  
461              self.fud.add_value(&request.key, &seeders).await;
462          }
463      }
464  }
465  
466  #[async_trait]
467  impl ProtocolBase for ProtocolFud {
468      async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
469          debug!(target: "fud::ProtocolFud::start()", "START");
470          self.jobsman.clone().start(executor.clone());
471          self.jobsman.clone().spawn(self.clone().handle_fud_ping_request(), executor.clone()).await;
472          self.jobsman.clone().spawn(self.clone().handle_fud_find_request(), executor.clone()).await;
473          self.jobsman
474              .clone()
475              .spawn(self.clone().handle_fud_find_nodes_request(), executor.clone())
476              .await;
477          self.jobsman
478              .clone()
479              .spawn(self.clone().handle_fud_find_seeders_request(), executor.clone())
480              .await;
481          self.jobsman.clone().spawn(self.clone().handle_fud_announce(), executor.clone()).await;
482          debug!(target: "fud::ProtocolFud::start()", "END");
483          Ok(())
484      }
485  
486      fn name(&self) -> &'static str {
487          "ProtocolFud"
488      }
489  }