/ bin / fud / fud / src / download.rs
download.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use std::{
 20      collections::HashSet,
 21      path::{Path, PathBuf},
 22      time::Instant,
 23  };
 24  
 25  use futures::{future::FutureExt, pin_mut, select};
 26  use rand::{
 27      prelude::{IteratorRandom, SliceRandom},
 28      rngs::OsRng,
 29  };
 30  use tracing::{error, info, warn};
 31  
 32  use darkfi::{
 33      dht::DhtNode,
 34      geode::{hash_to_string, ChunkedStorage},
 35      net::ChannelPtr,
 36      system::Subscription,
 37      Error, Result,
 38  };
 39  use darkfi_serial::serialize_async;
 40  
 41  use crate::{
 42      event::{self, notify_event, FudEvent},
 43      proto::{FudChunkReply, FudDirectoryReply, FudFileReply, FudFindRequest, FudNotFound},
 44      util::create_all_files,
 45      Fud, FudSeeder, ResourceStatus, ResourceType, Scrap,
 46  };
 47  
 48  /// Receive seeders from a subscription, and execute an async expression for
 49  /// each deduplicated seeder once (seeder order is random).
 50  /// It will keep going until the expression returns `Ok(())`, or there are
 51  /// no more seeders.
 52  /// It has an optional `favored_seeder` argument that will be tried first if
 53  /// specified.
 54  macro_rules! seeders_loop {
 55      ($seeders_sub:expr, $favored_seeder:expr, $code:expr) => {
 56          let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new();
 57          let mut is_done = false;
 58  
 59          // Try favored seeder
 60          let favored_seeder: Option<FudSeeder> = $favored_seeder;
 61          if let Some(seeder) = favored_seeder {
 62              queried_seeders.insert(seeder.node.id());
 63              if $code(seeder).await.is_ok() {
 64                  is_done = true;
 65              }
 66          }
 67  
 68          // Try other seeders using the subscription
 69          while !is_done {
 70              let rep = $seeders_sub.receive().await;
 71              if rep.is_none() {
 72                  break; // None means the lookup is done
 73              }
 74              let seeders = rep.unwrap().clone();
 75              let mut shuffled_seeders = {
 76                  let mut vec: Vec<_> = seeders.iter().cloned().collect();
 77                  vec.shuffle(&mut OsRng);
 78                  vec
 79              };
 80              // Loop over seeders
 81              while let Some(seeder) = shuffled_seeders.pop() {
 82                  // Only use a seeder once
 83                  if queried_seeders.iter().any(|s| *s == seeder.node.id()) {
 84                      continue;
 85                  }
 86                  queried_seeders.insert(seeder.node.id());
 87  
 88                  if $code(seeder).await.is_err() {
 89                      continue;
 90                  }
 91  
 92                  is_done = true;
 93                  break;
 94              }
 95          }
 96      };
 97      ($seeders_sub:expr, $code:expr) => {
 98          seeders_loop!($seeders_sub, None, $code)
 99      };
100  }
101  
102  enum ChunkFetchControl {
103      NextChunk,
104      NextSeeder,
105      Abort,
106  }
107  
108  struct ChunkFetchContext<'a> {
109      fud: &'a Fud,
110      hash: &'a blake3::Hash,
111      chunked: &'a mut ChunkedStorage,
112      chunks: &'a mut HashSet<blake3::Hash>,
113  }
114  
115  /// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`.
116  pub async fn fetch_chunks(
117      fud: &Fud,
118      hash: &blake3::Hash,
119      chunked: &mut ChunkedStorage,
120      seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
121      favored_seeder: Option<FudSeeder>,
122      chunks: &mut HashSet<blake3::Hash>,
123  ) -> Result<()> {
124      let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks };
125  
126      seeders_loop!(seeders_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> {
127          let channel = match fud.dht.get_channel(&seeder.node, Some(*hash)).await {
128              Ok(channel) => channel,
129              Err(e) => {
130                  warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id()));
131                  return Err(e)
132              }
133          };
134          let mut chunks_to_query = ctx.chunks.clone();
135          info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id()));
136  
137          loop {
138              // Loop over chunks
139              match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await {
140                  ChunkFetchControl::NextChunk => continue,
141                  ChunkFetchControl::NextSeeder => break,
142                  ChunkFetchControl::Abort => {
143                      fud.dht.cleanup_channel(channel).await;
144                      return Ok(())
145                  }
146              };
147          }
148  
149          fud.dht.cleanup_channel(channel).await;
150  
151          // Stop when there are no missing chunks
152          if ctx.chunks.is_empty() {
153              return Ok(())
154          }
155  
156          Err(().into())
157      });
158  
159      Ok(())
160  }
161  
162  /// Fetch a single chunk and return what should be done next
163  async fn fetch_chunk(
164      ctx: &mut ChunkFetchContext<'_>,
165      channel: &ChannelPtr,
166      seeder: &FudSeeder,
167      chunks_to_query: &mut HashSet<blake3::Hash>,
168  ) -> ChunkFetchControl {
169      // Select a chunk to request
170      let mut chunk = None;
171      if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) {
172          chunk = Some(*random_chunk);
173      }
174  
175      if chunk.is_none() {
176          // No more chunks to request from this seeder
177          return ChunkFetchControl::NextSeeder;
178      }
179  
180      let chunk_hash = chunk.unwrap();
181      chunks_to_query.remove(&chunk_hash);
182  
183      let start_time = Instant::now();
184      let msg_subsystem = channel.message_subsystem();
185      msg_subsystem.add_dispatch::<FudChunkReply>().await;
186      msg_subsystem.add_dispatch::<FudNotFound>().await;
187      let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
188      let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
189  
190      let send_res = channel.send(&FudFindRequest { info: Some(*ctx.hash), key: chunk_hash }).await;
191      if let Err(e) = send_res {
192          warn!(target: "fud::download::fetch_chunk()", "Error while sending FudFindRequest: {e}");
193          return ChunkFetchControl::NextSeeder;
194      }
195  
196      let chunk_recv = msg_subscriber_chunk.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
197      let notfound_recv = msg_subscriber_notfound.receive_with_timeout(ctx.fud.chunk_timeout).fuse();
198  
199      pin_mut!(chunk_recv, notfound_recv);
200  
201      // Wait for a FudChunkReply or FudNotFound
202      select! {
203          chunk_reply = chunk_recv => {
204              msg_subscriber_chunk.unsubscribe().await;
205              msg_subscriber_notfound.unsubscribe().await;
206              if let Err(e) = chunk_reply {
207                  warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}");
208                  return ChunkFetchControl::NextSeeder;
209              }
210              let reply = chunk_reply.unwrap();
211              handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await
212          }
213          notfound_reply = notfound_recv => {
214              msg_subscriber_chunk.unsubscribe().await;
215              msg_subscriber_notfound.unsubscribe().await;
216              if let Err(e) = notfound_reply {
217                  warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}");
218                  return ChunkFetchControl::NextSeeder;
219              }
220              info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id()));
221              notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash });
222              ChunkFetchControl::NextChunk
223          }
224      }
225  }
226  
227  /// Processes an incoming chunk
228  async fn handle_chunk_reply(
229      ctx: &mut ChunkFetchContext<'_>,
230      chunk_hash: &blake3::Hash,
231      reply: &FudChunkReply,
232      seeder: &FudSeeder,
233      start_time: &Instant,
234  ) -> ChunkFetchControl {
235      let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await;
236      if let Err(e) = write_res {
237          error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash));
238          return ChunkFetchControl::NextChunk;
239      }
240      let (inserted_hash, bytes_written) = write_res.unwrap();
241      if inserted_hash != *chunk_hash {
242          warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk");
243          return ChunkFetchControl::NextChunk;
244      }
245  
246      info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id()));
247  
248      // If we did not write the whole chunk to the filesystem,
249      // save the chunk in the scraps.
250      if bytes_written < reply.chunk.len() {
251          info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash));
252          let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await;
253          if let Err(e) = chunk_written {
254              error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}");
255              return ChunkFetchControl::NextChunk;
256          }
257          let scrap = Scrap {
258              chunk: reply.chunk.clone(),
259              hash_written: blake3::hash(&chunk_written.unwrap()),
260          };
261          if let Err(e) =
262              ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await)
263          {
264              error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash));
265              return ChunkFetchControl::NextChunk;
266          }
267      }
268  
269      // Update the resource
270      let mut resources_write = ctx.fud.resources.write().await;
271      let resource = resources_write.get_mut(ctx.hash);
272      if resource.is_none() {
273          return ChunkFetchControl::Abort // Resource was removed
274      }
275      let resource = resource.unwrap();
276      resource.status = ResourceStatus::Downloading;
277      resource.total_chunks_downloaded += 1;
278      resource.target_chunks_downloaded += 1;
279  
280      resource.total_bytes_downloaded += reply.chunk.len() as u64;
281      resource.target_bytes_downloaded +=
282          resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64;
283      resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64());
284      if resource.speeds.len() > 12 {
285          resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds
286      }
287  
288      // If we just fetched the last chunk of a file, compute
289      // `total_bytes_size` (and `target_bytes_size`) again,
290      // as `geode.write_chunk()` updated the FileSequence
291      // to the exact file size.
292      if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() {
293          if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash {
294              resource.total_bytes_size = ctx.chunked.get_fileseq().len();
295              resource.target_bytes_size = resource.total_bytes_size;
296          }
297      }
298      let resource = resource.clone();
299      drop(resources_write);
300  
301      notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource });
302      ctx.chunks.remove(chunk_hash);
303      ChunkFetchControl::NextChunk
304  }
305  
306  enum MetadataFetchReply {
307      Directory(FudDirectoryReply),
308      File(FudFileReply),
309      Chunk(FudChunkReply),
310  }
311  
312  /// Fetch a single resource metadata from seeders received from `seeders_sub`.
313  /// If the resource is a file smaller than a single chunk then seeder can send the
314  /// chunk directly, and we will create the file from it on path `path`.
315  /// 1. Wait for seeders from the subscription
316  /// 2. Request the metadata from the seeders
317  /// 3. Insert metadata to geode using the reply
318  pub async fn fetch_metadata(
319      fud: &Fud,
320      hash: &blake3::Hash,
321      seeders_sub: &Subscription<Option<Vec<FudSeeder>>>,
322      path: &Path,
323  ) -> Result<FudSeeder> {
324      let mut result: Option<(FudSeeder, MetadataFetchReply)> = None;
325  
326      seeders_loop!(seeders_sub, async |seeder: FudSeeder| -> Result<()> {
327          let channel = fud.dht.get_channel(&seeder.node, Some(*hash)).await?;
328          let msg_subsystem = channel.message_subsystem();
329          msg_subsystem.add_dispatch::<FudChunkReply>().await;
330          msg_subsystem.add_dispatch::<FudFileReply>().await;
331          msg_subsystem.add_dispatch::<FudDirectoryReply>().await;
332          msg_subsystem.add_dispatch::<FudNotFound>().await;
333          let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap();
334          let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap();
335          let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap();
336          let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap();
337  
338          let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await;
339          if let Err(e) = send_res {
340              warn!(target: "fud::download::fetch_metadata()", "Error while sending FudFindRequest: {e}");
341              msg_subscriber_chunk.unsubscribe().await;
342              msg_subscriber_file.unsubscribe().await;
343              msg_subscriber_dir.unsubscribe().await;
344              msg_subscriber_notfound.unsubscribe().await;
345              fud.dht.cleanup_channel(channel).await;
346              return Err(e)
347          }
348  
349          let chunk_recv = msg_subscriber_chunk.receive_with_timeout(fud.chunk_timeout).fuse();
350          let file_recv = msg_subscriber_file.receive_with_timeout(fud.chunk_timeout).fuse();
351          let dir_recv = msg_subscriber_dir.receive_with_timeout(fud.chunk_timeout).fuse();
352          let notfound_recv = msg_subscriber_notfound.receive_with_timeout(fud.chunk_timeout).fuse();
353  
354          pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv);
355  
356          let cleanup = async || {
357              msg_subscriber_chunk.unsubscribe().await;
358              msg_subscriber_file.unsubscribe().await;
359              msg_subscriber_dir.unsubscribe().await;
360              msg_subscriber_notfound.unsubscribe().await;
361              fud.dht.cleanup_channel(channel).await;
362          };
363  
364          // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound
365          select! {
366              // Received a chunk while requesting metadata, this is allowed to
367              // optimize fetching files smaller than a single chunk
368              chunk_reply = chunk_recv => {
369                  cleanup().await;
370                  if let Err(e) = chunk_reply {
371                      warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}");
372                      return Err(e)
373                  }
374                  let reply = chunk_reply.unwrap();
375                  let chunk_hash = blake3::hash(&reply.chunk);
376                  // Check that this is the only chunk in the file
377                  if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) {
378                      warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash");
379                      return Err(().into())
380                  }
381                  info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id()));
382                  result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone())));
383                  Ok(())
384              }
385              file_reply = file_recv => {
386                  cleanup().await;
387                  if let Err(e) = file_reply {
388                      warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}");
389                      return Err(e)
390                  }
391                  let reply = file_reply.unwrap();
392                  if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) {
393                      warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata");
394                      return Err(().into())
395                  }
396                  info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
397                  result = Some((seeder, MetadataFetchReply::File((*reply).clone())));
398                  Ok(())
399              }
400              dir_reply = dir_recv => {
401                  cleanup().await;
402                  if let Err(e) = dir_reply {
403                      warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}");
404                      return Err(e)
405                  }
406                  let reply = dir_reply.unwrap();
407  
408                  // Convert all file paths from String to PathBuf
409                  let files: Vec<_> = reply.files.clone().into_iter()
410                      .map(|(path_str, size)| (PathBuf::from(path_str), size))
411                      .collect();
412  
413                  if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) {
414                      warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata");
415                      return Err(().into())
416                  }
417                  info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
418                  result = Some((seeder, MetadataFetchReply::Directory((*reply).clone())));
419                  Ok(())
420              }
421              notfound_reply = notfound_recv => {
422                  cleanup().await;
423                  if let Err(e) = notfound_reply {
424                      warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}");
425                      return Err(e)
426                  }
427                  info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id()));
428                  Err(().into())
429              }
430          }
431      });
432  
433      // We did not find the resource
434      if result.is_none() {
435          return Err(Error::GeodeFileRouteNotFound)
436      }
437  
438      // Insert metadata to geode using the reply
439      // At this point the reply content is already verified
440      let (seeder, reply) = result.unwrap();
441      match reply {
442          MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => {
443              // Convert all file paths from String to PathBuf
444              let mut files: Vec<_> =
445                  files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect();
446  
447              fud.geode.sort_files(&mut files);
448              if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await {
449                  error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash));
450                  return Err(e)
451              }
452          }
453          MetadataFetchReply::File(FudFileReply { chunk_hashes }) => {
454              if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await {
455                  error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash));
456                  return Err(e)
457              }
458          }
459          // Looked for a file but got a chunk: the entire file fits in a single chunk
460          MetadataFetchReply::Chunk(FudChunkReply { chunk }) => {
461              info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk");
462              let chunk_hash = blake3::hash(&chunk);
463              if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await {
464                  error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash));
465                  return Err(e)
466              }
467              create_all_files(&[path.to_path_buf()]).await?;
468              let mut chunked_file = ChunkedStorage::new(
469                  &[chunk_hash],
470                  &[(path.to_path_buf(), chunk.len() as u64)],
471                  false,
472              );
473              if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await {
474                  error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash));
475                  return Err(e)
476              };
477          }
478      };
479  
480      Ok(seeder)
481  }