download.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use std::{ 20 collections::HashSet, 21 path::{Path, PathBuf}, 22 time::Instant, 23 }; 24 25 use futures::{future::FutureExt, pin_mut, select}; 26 use rand::{ 27 prelude::{IteratorRandom, SliceRandom}, 28 rngs::OsRng, 29 }; 30 use tracing::{error, info, warn}; 31 32 use darkfi::{ 33 dht::DhtNode, 34 geode::{hash_to_string, ChunkedStorage}, 35 net::ChannelPtr, 36 system::Subscription, 37 Error, Result, 38 }; 39 use darkfi_serial::serialize_async; 40 41 use crate::{ 42 event::{self, notify_event, FudEvent}, 43 proto::{FudChunkReply, FudDirectoryReply, FudFileReply, FudFindRequest, FudNotFound}, 44 util::create_all_files, 45 Fud, FudSeeder, ResourceStatus, ResourceType, Scrap, 46 }; 47 48 /// Receive seeders from a subscription, and execute an async expression for 49 /// each deduplicated seeder once (seeder order is random). 50 /// It will keep going until the expression returns `Ok(())`, or there are 51 /// no more seeders. 52 /// It has an optional `favored_seeder` argument that will be tried first if 53 /// specified. 54 macro_rules! seeders_loop { 55 ($seeders_sub:expr, $favored_seeder:expr, $code:expr) => { 56 let mut queried_seeders: HashSet<blake3::Hash> = HashSet::new(); 57 let mut is_done = false; 58 59 // Try favored seeder 60 let favored_seeder: Option<FudSeeder> = $favored_seeder; 61 if let Some(seeder) = favored_seeder { 62 queried_seeders.insert(seeder.node.id()); 63 if $code(seeder).await.is_ok() { 64 is_done = true; 65 } 66 } 67 68 // Try other seeders using the subscription 69 while !is_done { 70 let rep = $seeders_sub.receive().await; 71 if rep.is_none() { 72 break; // None means the lookup is done 73 } 74 let seeders = rep.unwrap().clone(); 75 let mut shuffled_seeders = { 76 let mut vec: Vec<_> = seeders.iter().cloned().collect(); 77 vec.shuffle(&mut OsRng); 78 vec 79 }; 80 // Loop over seeders 81 while let Some(seeder) = shuffled_seeders.pop() { 82 // Only use a seeder once 83 if queried_seeders.iter().any(|s| *s == seeder.node.id()) { 84 continue; 85 } 86 queried_seeders.insert(seeder.node.id()); 87 88 if $code(seeder).await.is_err() { 89 continue; 90 } 91 92 is_done = true; 93 break; 94 } 95 } 96 }; 97 ($seeders_sub:expr, $code:expr) => { 98 seeders_loop!($seeders_sub, None, $code) 99 }; 100 } 101 102 enum ChunkFetchControl { 103 NextChunk, 104 NextSeeder, 105 Abort, 106 } 107 108 struct ChunkFetchContext<'a> { 109 fud: &'a Fud, 110 hash: &'a blake3::Hash, 111 chunked: &'a mut ChunkedStorage, 112 chunks: &'a mut HashSet<blake3::Hash>, 113 } 114 115 /// Fetch `chunks` for `chunked` (file or directory) from seeders in `seeders_sub`. 116 pub async fn fetch_chunks( 117 fud: &Fud, 118 hash: &blake3::Hash, 119 chunked: &mut ChunkedStorage, 120 seeders_sub: &Subscription<Option<Vec<FudSeeder>>>, 121 favored_seeder: Option<FudSeeder>, 122 chunks: &mut HashSet<blake3::Hash>, 123 ) -> Result<()> { 124 let mut ctx = ChunkFetchContext { fud, hash, chunked, chunks }; 125 126 seeders_loop!(seeders_sub, favored_seeder, async |seeder: FudSeeder| -> Result<()> { 127 let channel = match fud.dht.get_channel(&seeder.node, Some(*hash)).await { 128 Ok(channel) => channel, 129 Err(e) => { 130 warn!(target: "fud::download::fetch_chunks()", "Could not get a channel for node {}: {e}", hash_to_string(&seeder.node.id())); 131 return Err(e) 132 } 133 }; 134 let mut chunks_to_query = ctx.chunks.clone(); 135 info!(target: "fud::download::fetch_chunks()", "Requesting chunks from seeder {}", hash_to_string(&seeder.node.id())); 136 137 loop { 138 // Loop over chunks 139 match fetch_chunk(&mut ctx, &channel, &seeder, &mut chunks_to_query).await { 140 ChunkFetchControl::NextChunk => continue, 141 ChunkFetchControl::NextSeeder => break, 142 ChunkFetchControl::Abort => { 143 fud.dht.cleanup_channel(channel).await; 144 return Ok(()) 145 } 146 }; 147 } 148 149 fud.dht.cleanup_channel(channel).await; 150 151 // Stop when there are no missing chunks 152 if ctx.chunks.is_empty() { 153 return Ok(()) 154 } 155 156 Err(().into()) 157 }); 158 159 Ok(()) 160 } 161 162 /// Fetch a single chunk and return what should be done next 163 async fn fetch_chunk( 164 ctx: &mut ChunkFetchContext<'_>, 165 channel: &ChannelPtr, 166 seeder: &FudSeeder, 167 chunks_to_query: &mut HashSet<blake3::Hash>, 168 ) -> ChunkFetchControl { 169 // Select a chunk to request 170 let mut chunk = None; 171 if let Some(random_chunk) = chunks_to_query.iter().choose(&mut OsRng) { 172 chunk = Some(*random_chunk); 173 } 174 175 if chunk.is_none() { 176 // No more chunks to request from this seeder 177 return ChunkFetchControl::NextSeeder; 178 } 179 180 let chunk_hash = chunk.unwrap(); 181 chunks_to_query.remove(&chunk_hash); 182 183 let start_time = Instant::now(); 184 let msg_subsystem = channel.message_subsystem(); 185 msg_subsystem.add_dispatch::<FudChunkReply>().await; 186 msg_subsystem.add_dispatch::<FudNotFound>().await; 187 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap(); 188 let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap(); 189 190 let send_res = channel.send(&FudFindRequest { info: Some(*ctx.hash), key: chunk_hash }).await; 191 if let Err(e) = send_res { 192 warn!(target: "fud::download::fetch_chunk()", "Error while sending FudFindRequest: {e}"); 193 return ChunkFetchControl::NextSeeder; 194 } 195 196 let chunk_recv = msg_subscriber_chunk.receive_with_timeout(ctx.fud.chunk_timeout).fuse(); 197 let notfound_recv = msg_subscriber_notfound.receive_with_timeout(ctx.fud.chunk_timeout).fuse(); 198 199 pin_mut!(chunk_recv, notfound_recv); 200 201 // Wait for a FudChunkReply or FudNotFound 202 select! { 203 chunk_reply = chunk_recv => { 204 msg_subscriber_chunk.unsubscribe().await; 205 msg_subscriber_notfound.unsubscribe().await; 206 if let Err(e) = chunk_reply { 207 warn!(target: "fud::download::fetch_chunk()", "Error waiting for chunk reply: {e}"); 208 return ChunkFetchControl::NextSeeder; 209 } 210 let reply = chunk_reply.unwrap(); 211 handle_chunk_reply(ctx, &chunk_hash, &reply, seeder, &start_time).await 212 } 213 notfound_reply = notfound_recv => { 214 msg_subscriber_chunk.unsubscribe().await; 215 msg_subscriber_notfound.unsubscribe().await; 216 if let Err(e) = notfound_reply { 217 warn!(target: "fud::download::fetch_chunk()", "Error waiting for NOTFOUND reply: {e}"); 218 return ChunkFetchControl::NextSeeder; 219 } 220 info!(target: "fud::download::fetch_chunk()", "Received NOTFOUND {} from seeder {}", hash_to_string(&chunk_hash), hash_to_string(&seeder.node.id())); 221 notify_event!(ctx.fud, ChunkNotFound, { hash: *ctx.hash, chunk_hash }); 222 ChunkFetchControl::NextChunk 223 } 224 } 225 } 226 227 /// Processes an incoming chunk 228 async fn handle_chunk_reply( 229 ctx: &mut ChunkFetchContext<'_>, 230 chunk_hash: &blake3::Hash, 231 reply: &FudChunkReply, 232 seeder: &FudSeeder, 233 start_time: &Instant, 234 ) -> ChunkFetchControl { 235 let write_res = ctx.fud.geode.write_chunk(ctx.chunked, &reply.chunk).await; 236 if let Err(e) = write_res { 237 error!(target: "fud::download::handle_chunk_reply()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(chunk_hash)); 238 return ChunkFetchControl::NextChunk; 239 } 240 let (inserted_hash, bytes_written) = write_res.unwrap(); 241 if inserted_hash != *chunk_hash { 242 warn!(target: "fud::download::handle_chunk_reply()", "Received chunk does not match requested chunk"); 243 return ChunkFetchControl::NextChunk; 244 } 245 246 info!(target: "fud::download::handle_chunk_reply()", "Received chunk {} from seeder {}", hash_to_string(chunk_hash), hash_to_string(&seeder.node.id())); 247 248 // If we did not write the whole chunk to the filesystem, 249 // save the chunk in the scraps. 250 if bytes_written < reply.chunk.len() { 251 info!(target: "fud::download::handle_chunk_reply()", "Saving chunk {} as a scrap", hash_to_string(chunk_hash)); 252 let chunk_written = ctx.fud.geode.get_chunk(ctx.chunked, chunk_hash).await; 253 if let Err(e) = chunk_written { 254 error!(target: "fud::download::handle_chunk_reply()", "Error getting chunk: {e}"); 255 return ChunkFetchControl::NextChunk; 256 } 257 let scrap = Scrap { 258 chunk: reply.chunk.clone(), 259 hash_written: blake3::hash(&chunk_written.unwrap()), 260 }; 261 if let Err(e) = 262 ctx.fud.scrap_tree.insert(chunk_hash.as_bytes(), serialize_async(&scrap).await) 263 { 264 error!(target: "fud::download::handle_chunk_reply()", "Failed to save chunk {} as a scrap: {e}", hash_to_string(chunk_hash)); 265 return ChunkFetchControl::NextChunk; 266 } 267 } 268 269 // Update the resource 270 let mut resources_write = ctx.fud.resources.write().await; 271 let resource = resources_write.get_mut(ctx.hash); 272 if resource.is_none() { 273 return ChunkFetchControl::Abort // Resource was removed 274 } 275 let resource = resource.unwrap(); 276 resource.status = ResourceStatus::Downloading; 277 resource.total_chunks_downloaded += 1; 278 resource.target_chunks_downloaded += 1; 279 280 resource.total_bytes_downloaded += reply.chunk.len() as u64; 281 resource.target_bytes_downloaded += 282 resource.get_selected_bytes(ctx.chunked, &reply.chunk) as u64; 283 resource.speeds.push(reply.chunk.len() as f64 / start_time.elapsed().as_secs_f64()); 284 if resource.speeds.len() > 12 { 285 resource.speeds = resource.speeds.split_off(resource.speeds.len() - 12); // Only keep the last few speeds 286 } 287 288 // If we just fetched the last chunk of a file, compute 289 // `total_bytes_size` (and `target_bytes_size`) again, 290 // as `geode.write_chunk()` updated the FileSequence 291 // to the exact file size. 292 if let Some((last_chunk_hash, _)) = ctx.chunked.iter().last() { 293 if matches!(resource.rtype, ResourceType::File) && *last_chunk_hash == *chunk_hash { 294 resource.total_bytes_size = ctx.chunked.get_fileseq().len(); 295 resource.target_bytes_size = resource.total_bytes_size; 296 } 297 } 298 let resource = resource.clone(); 299 drop(resources_write); 300 301 notify_event!(ctx.fud, ChunkDownloadCompleted, { hash: *ctx.hash, chunk_hash: *chunk_hash, resource }); 302 ctx.chunks.remove(chunk_hash); 303 ChunkFetchControl::NextChunk 304 } 305 306 enum MetadataFetchReply { 307 Directory(FudDirectoryReply), 308 File(FudFileReply), 309 Chunk(FudChunkReply), 310 } 311 312 /// Fetch a single resource metadata from seeders received from `seeders_sub`. 313 /// If the resource is a file smaller than a single chunk then seeder can send the 314 /// chunk directly, and we will create the file from it on path `path`. 315 /// 1. Wait for seeders from the subscription 316 /// 2. Request the metadata from the seeders 317 /// 3. Insert metadata to geode using the reply 318 pub async fn fetch_metadata( 319 fud: &Fud, 320 hash: &blake3::Hash, 321 seeders_sub: &Subscription<Option<Vec<FudSeeder>>>, 322 path: &Path, 323 ) -> Result<FudSeeder> { 324 let mut result: Option<(FudSeeder, MetadataFetchReply)> = None; 325 326 seeders_loop!(seeders_sub, async |seeder: FudSeeder| -> Result<()> { 327 let channel = fud.dht.get_channel(&seeder.node, Some(*hash)).await?; 328 let msg_subsystem = channel.message_subsystem(); 329 msg_subsystem.add_dispatch::<FudChunkReply>().await; 330 msg_subsystem.add_dispatch::<FudFileReply>().await; 331 msg_subsystem.add_dispatch::<FudDirectoryReply>().await; 332 msg_subsystem.add_dispatch::<FudNotFound>().await; 333 let msg_subscriber_chunk = channel.subscribe_msg::<FudChunkReply>().await.unwrap(); 334 let msg_subscriber_file = channel.subscribe_msg::<FudFileReply>().await.unwrap(); 335 let msg_subscriber_dir = channel.subscribe_msg::<FudDirectoryReply>().await.unwrap(); 336 let msg_subscriber_notfound = channel.subscribe_msg::<FudNotFound>().await.unwrap(); 337 338 let send_res = channel.send(&FudFindRequest { info: None, key: *hash }).await; 339 if let Err(e) = send_res { 340 warn!(target: "fud::download::fetch_metadata()", "Error while sending FudFindRequest: {e}"); 341 msg_subscriber_chunk.unsubscribe().await; 342 msg_subscriber_file.unsubscribe().await; 343 msg_subscriber_dir.unsubscribe().await; 344 msg_subscriber_notfound.unsubscribe().await; 345 fud.dht.cleanup_channel(channel).await; 346 return Err(e) 347 } 348 349 let chunk_recv = msg_subscriber_chunk.receive_with_timeout(fud.chunk_timeout).fuse(); 350 let file_recv = msg_subscriber_file.receive_with_timeout(fud.chunk_timeout).fuse(); 351 let dir_recv = msg_subscriber_dir.receive_with_timeout(fud.chunk_timeout).fuse(); 352 let notfound_recv = msg_subscriber_notfound.receive_with_timeout(fud.chunk_timeout).fuse(); 353 354 pin_mut!(chunk_recv, file_recv, dir_recv, notfound_recv); 355 356 let cleanup = async || { 357 msg_subscriber_chunk.unsubscribe().await; 358 msg_subscriber_file.unsubscribe().await; 359 msg_subscriber_dir.unsubscribe().await; 360 msg_subscriber_notfound.unsubscribe().await; 361 fud.dht.cleanup_channel(channel).await; 362 }; 363 364 // Wait for a FudChunkReply, FudFileReply, FudDirectoryReply, or FudNotFound 365 select! { 366 // Received a chunk while requesting metadata, this is allowed to 367 // optimize fetching files smaller than a single chunk 368 chunk_reply = chunk_recv => { 369 cleanup().await; 370 if let Err(e) = chunk_reply { 371 warn!(target: "fud::download::fetch_metadata()", "Error waiting for chunk reply: {e}"); 372 return Err(e) 373 } 374 let reply = chunk_reply.unwrap(); 375 let chunk_hash = blake3::hash(&reply.chunk); 376 // Check that this is the only chunk in the file 377 if !fud.geode.verify_metadata(hash, &[chunk_hash], &[]) { 378 warn!(target: "fud::download::fetch_metadata()", "Received a chunk while fetching metadata, but the chunk did not match the file hash"); 379 return Err(().into()) 380 } 381 info!(target: "fud::download::fetch_metadata()", "Received chunk {} (for file {}) from seeder {}", hash_to_string(&chunk_hash), hash_to_string(hash), hash_to_string(&seeder.node.id())); 382 result = Some((seeder, MetadataFetchReply::Chunk((*reply).clone()))); 383 Ok(()) 384 } 385 file_reply = file_recv => { 386 cleanup().await; 387 if let Err(e) = file_reply { 388 warn!(target: "fud::download::fetch_metadata()", "Error waiting for file reply: {e}"); 389 return Err(e) 390 } 391 let reply = file_reply.unwrap(); 392 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &[]) { 393 warn!(target: "fud::download::fetch_metadata()", "Received invalid file metadata"); 394 return Err(().into()) 395 } 396 info!(target: "fud::download::fetch_metadata()", "Received file {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); 397 result = Some((seeder, MetadataFetchReply::File((*reply).clone()))); 398 Ok(()) 399 } 400 dir_reply = dir_recv => { 401 cleanup().await; 402 if let Err(e) = dir_reply { 403 warn!(target: "fud::download::fetch_metadata()", "Error waiting for directory reply: {e}"); 404 return Err(e) 405 } 406 let reply = dir_reply.unwrap(); 407 408 // Convert all file paths from String to PathBuf 409 let files: Vec<_> = reply.files.clone().into_iter() 410 .map(|(path_str, size)| (PathBuf::from(path_str), size)) 411 .collect(); 412 413 if !fud.geode.verify_metadata(hash, &reply.chunk_hashes, &files) { 414 warn!(target: "fud::download::fetch_metadata()", "Received invalid directory metadata"); 415 return Err(().into()) 416 } 417 info!(target: "fud::download::fetch_metadata()", "Received directory {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); 418 result = Some((seeder, MetadataFetchReply::Directory((*reply).clone()))); 419 Ok(()) 420 } 421 notfound_reply = notfound_recv => { 422 cleanup().await; 423 if let Err(e) = notfound_reply { 424 warn!(target: "fud::download::fetch_metadata()", "Error waiting for NOTFOUND reply: {e}"); 425 return Err(e) 426 } 427 info!(target: "fud::download::fetch_metadata()", "Received NOTFOUND {} from seeder {}", hash_to_string(hash), hash_to_string(&seeder.node.id())); 428 Err(().into()) 429 } 430 } 431 }); 432 433 // We did not find the resource 434 if result.is_none() { 435 return Err(Error::GeodeFileRouteNotFound) 436 } 437 438 // Insert metadata to geode using the reply 439 // At this point the reply content is already verified 440 let (seeder, reply) = result.unwrap(); 441 match reply { 442 MetadataFetchReply::Directory(FudDirectoryReply { files, chunk_hashes }) => { 443 // Convert all file paths from String to PathBuf 444 let mut files: Vec<_> = 445 files.into_iter().map(|(path_str, size)| (PathBuf::from(path_str), size)).collect(); 446 447 fud.geode.sort_files(&mut files); 448 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &files).await { 449 error!(target: "fud::download::fetch_metadata()", "Failed inserting directory {} to Geode: {e}", hash_to_string(hash)); 450 return Err(e) 451 } 452 } 453 MetadataFetchReply::File(FudFileReply { chunk_hashes }) => { 454 if let Err(e) = fud.geode.insert_metadata(hash, &chunk_hashes, &[]).await { 455 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode: {e}", hash_to_string(hash)); 456 return Err(e) 457 } 458 } 459 // Looked for a file but got a chunk: the entire file fits in a single chunk 460 MetadataFetchReply::Chunk(FudChunkReply { chunk }) => { 461 info!(target: "fud::download::fetch_metadata()", "File fits in a single chunk"); 462 let chunk_hash = blake3::hash(&chunk); 463 if let Err(e) = fud.geode.insert_metadata(hash, &[chunk_hash], &[]).await { 464 error!(target: "fud::download::fetch_metadata()", "Failed inserting file {} to Geode (from single chunk): {e}", hash_to_string(hash)); 465 return Err(e) 466 } 467 create_all_files(&[path.to_path_buf()]).await?; 468 let mut chunked_file = ChunkedStorage::new( 469 &[chunk_hash], 470 &[(path.to_path_buf(), chunk.len() as u64)], 471 false, 472 ); 473 if let Err(e) = fud.geode.write_chunk(&mut chunked_file, &chunk).await { 474 error!(target: "fud::download::fetch_metadata()", "Failed inserting chunk {} to Geode: {e}", hash_to_string(&chunk_hash)); 475 return Err(e) 476 }; 477 } 478 }; 479 480 Ok(seeder) 481 }