file_sequence.rs
1 /* This file is part of DarkFi (https://dark.fi) 2 * 3 * Copyright (C) 2020-2025 Dyne.org foundation 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 */ 18 19 use futures::{ 20 task::{Context, Poll}, 21 AsyncRead, AsyncSeek, AsyncWrite, 22 }; 23 use smol::{ 24 fs::{File, OpenOptions}, 25 io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, 26 }; 27 use std::{collections::HashSet, path::PathBuf, pin::Pin}; 28 29 /// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and 30 /// `AsyncWrite` for an ordered list of (file path, file size). 31 /// 32 /// You can use it to read and write from/to a list a file, without having to 33 /// manage individual file operations explicitly. 34 /// 35 /// This allows seamless handling of multiple files as if they were a single 36 /// continuous file. It automatically opens the next file in the list when the 37 /// current file is exhausted. 38 /// 39 /// It's also made so that files in `files` that do not exist on the filesystem 40 /// will get skipped, without returning an Error. All files you want to read, 41 /// write, and seek to should be created before using the FileSequence. 42 #[derive(Debug)] 43 pub struct FileSequence { 44 /// List of (file path, file size). File sizes are not the sizes of the 45 /// files as they currently are on the file system, but the sizes we want 46 files: Vec<(PathBuf, u64)>, 47 /// Currently opened file 48 current_file: Option<File>, 49 /// Index of the currently opened file in the `files` vector 50 current_file_index: Option<usize>, 51 52 position: u64, 53 /// Set to `true` to automatically set the length of the file on the 54 /// filesystem to it's size as defined in the `files` vector, after a write 55 auto_set_len: bool, 56 } 57 58 impl FileSequence { 59 pub fn new(files: &[(PathBuf, u64)], auto_set_len: bool) -> Self { 60 Self { 61 files: files.to_vec(), 62 current_file: None, 63 current_file_index: None, 64 position: 0, 65 auto_set_len, 66 } 67 } 68 69 /// Update a single file size. 70 pub fn set_file_size(&mut self, file_index: usize, file_size: u64) { 71 self.files[file_index].1 = file_size; 72 } 73 74 /// Return `current_file`. 75 pub fn get_current_file(&self) -> &Option<File> { 76 &self.current_file 77 } 78 79 /// Return `files`. 80 pub fn get_files(&self) -> &Vec<(PathBuf, u64)> { 81 &self.files 82 } 83 84 /// Return the combined file size of all files. 85 pub fn len(&self) -> u64 { 86 self.files.iter().map(|(_, size)| size).sum() 87 } 88 89 /// Return `true` if the `FileSequence` contains no file. 90 pub fn is_empty(&self) -> bool { 91 self.files.is_empty() 92 } 93 94 /// Return the combined file size of all files. 95 pub fn subset_len(&self, files: HashSet<PathBuf>) -> u64 { 96 self.files.iter().filter(|(path, _)| files.contains(path)).map(|(_, size)| size).sum() 97 } 98 99 /// Compute the starting position of the file (in bytes) by suming up 100 /// the size of the previous files. 101 pub fn get_file_position(&self, file_index: usize) -> u64 { 102 let mut pos = 0; 103 for i in 0..file_index { 104 pos += self.files[i].1; 105 } 106 pos 107 } 108 109 /// Open the file at (`current_file_index` + 1). 110 /// If no file is currently open (`current_file_index` is None), it opens 111 /// the first file. 112 async fn open_next_file(&mut self) -> io::Result<()> { 113 self.current_file = None; 114 self.current_file_index = match self.current_file_index { 115 Some(i) => Some(i + 1), 116 None => Some(0), 117 }; 118 if self.current_file_index.unwrap() >= self.files.len() { 119 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "No more files to open")) 120 } 121 let file = OpenOptions::new() 122 .read(true) 123 .write(true) 124 .create(false) 125 .open(self.files[self.current_file_index.unwrap()].0.clone()) 126 .await?; 127 self.current_file = Some(file); 128 Ok(()) 129 } 130 131 /// Open the file at `file_index`. 132 async fn open_file(&mut self, file_index: usize) -> io::Result<()> { 133 self.current_file = None; 134 self.current_file_index = Some(file_index); 135 let file = OpenOptions::new() 136 .read(true) 137 .write(true) 138 .create(false) 139 .open(self.files[file_index].0.clone()) 140 .await?; 141 self.current_file = Some(file); 142 Ok(()) 143 } 144 } 145 146 impl AsyncRead for FileSequence { 147 fn poll_read( 148 self: Pin<&mut Self>, 149 _: &mut Context<'_>, 150 buf: &mut [u8], 151 ) -> Poll<io::Result<usize>> { 152 let this = self.get_mut(); 153 let mut total_read = 0; 154 155 while total_read < buf.len() { 156 if this.current_file.is_none() { 157 if let Some(file_index) = this.current_file_index { 158 // Stop if there are no more files to read 159 if file_index >= this.files.len() - 1 { 160 return Poll::Ready(Ok(total_read)); 161 } 162 let start_pos = this.get_file_position(file_index); 163 let file_size = this.files[file_index].1 as usize; 164 let file_pos = this.position - start_pos; 165 let space_left = file_size - file_pos as usize; 166 let skip_bytes = (buf.len() - total_read).min(space_left); 167 this.position += skip_bytes as u64; 168 total_read += skip_bytes; 169 } 170 171 // Open the next file 172 match smol::block_on(this.open_next_file()) { 173 Ok(_) => {} 174 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { 175 return Poll::Ready(Ok(total_read)); 176 } 177 Err(e) if e.kind() == io::ErrorKind::NotFound => { 178 this.current_file = None; 179 continue; // Skip to next file 180 } 181 Err(e) => return Poll::Ready(Err(e)), 182 } 183 } 184 185 // Read from the current file 186 let file = this.current_file.as_mut().unwrap(); 187 match smol::block_on(file.read(&mut buf[total_read..])) { 188 Ok(bytes_read) => { 189 if bytes_read == 0 { 190 this.current_file = None; // Move to the next file 191 } else { 192 total_read += bytes_read; 193 this.position += bytes_read as u64; 194 } 195 } 196 Err(e) => return Poll::Ready(Err(e)), 197 } 198 } 199 200 Poll::Ready(Ok(total_read)) 201 } 202 } 203 204 impl AsyncSeek for FileSequence { 205 fn poll_seek( 206 self: Pin<&mut Self>, 207 _: &mut Context<'_>, 208 pos: SeekFrom, 209 ) -> Poll<io::Result<u64>> { 210 let this = self.get_mut(); 211 212 let abs_pos = match pos { 213 SeekFrom::Start(offset) => offset, 214 _ => todo!(), // TODO 215 }; 216 217 // Determine which file to seek in 218 let mut file_index = 0; 219 let mut bytes_offset = 0; 220 221 while file_index < this.files.len() { 222 if bytes_offset + this.files[file_index].1 >= abs_pos { 223 break; 224 } 225 bytes_offset += this.files[file_index].1; 226 file_index += 1; 227 } 228 229 if file_index >= this.files.len() { 230 return Poll::Ready(Err(io::Error::new( 231 io::ErrorKind::InvalidInput, 232 "Seek position out of bounds", 233 ))) 234 } 235 236 this.position = abs_pos; // Update FileSequence position 237 238 // Open the file 239 if this.current_file.is_none() || 240 this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index 241 { 242 match smol::block_on(this.open_file(file_index)) { 243 Ok(_) => {} 244 Err(e) if e.kind() == io::ErrorKind::NotFound => { 245 // If the file does not exist, return without actually seeking it 246 return Poll::Ready(Ok(this.position)); 247 } 248 Err(e) => return Poll::Ready(Err(e)), 249 }; 250 } 251 252 let file = this.current_file.as_mut().unwrap(); 253 let file_pos = abs_pos - bytes_offset; 254 255 // Seek in the current file 256 match smol::block_on(file.seek(SeekFrom::Start(file_pos))) { 257 Ok(_) => Poll::Ready(Ok(this.position)), 258 Err(e) => Poll::Ready(Err(e)), 259 } 260 } 261 } 262 263 impl AsyncWrite for FileSequence { 264 fn poll_write( 265 self: Pin<&mut Self>, 266 _: &mut Context<'_>, 267 buf: &[u8], 268 ) -> Poll<io::Result<usize>> { 269 let this = self.get_mut(); 270 let mut total_bytes_written = 0; 271 let mut remaining_buf = buf; 272 let auto_set_len = this.auto_set_len; 273 274 let finalize_current_file = |file: &mut File, max_size: u64| { 275 if auto_set_len { 276 smol::block_on(file.set_len(max_size))?; 277 } 278 smol::block_on(file.flush())?; 279 Ok(()) 280 }; 281 282 loop { 283 // Ensure the current file is open 284 if this.current_file.is_none() { 285 if let Some(file_index) = this.current_file_index { 286 if file_index >= this.files.len() - 1 { 287 break; // No more files 288 } 289 if remaining_buf.is_empty() { 290 break; // No more data to write 291 } 292 let start_pos = this.get_file_position(file_index); 293 let file_size = this.files[file_index].1 as usize; 294 let file_pos = this.position - start_pos; 295 let space_left = file_size - file_pos as usize; 296 let skip_bytes = remaining_buf.len().min(space_left); 297 this.position += skip_bytes as u64; 298 remaining_buf = &remaining_buf[skip_bytes..]; // Update the remaining buffer 299 } 300 301 // Switch to the next file 302 match smol::block_on(this.open_next_file()) { 303 Ok(_) => {} 304 Err(e) if e.kind() == io::ErrorKind::NotFound => { 305 this.current_file = None; 306 continue; // Skip to next file 307 } 308 Err(e) => return Poll::Ready(Err(e)), 309 } 310 } 311 312 let file = this.current_file.as_mut().unwrap(); 313 let max_size = this.files[this.current_file_index.unwrap()].1; 314 315 // Check how much space is left in the current file 316 let current_position = smol::block_on(file.seek(io::SeekFrom::Current(0)))?; 317 let space_left = max_size - current_position; 318 let bytes_to_write = remaining_buf.len().min(space_left as usize); 319 320 if bytes_to_write == 0 { 321 // Continue to the next iteration to check the new file 322 if let Err(e) = finalize_current_file(file, max_size) { 323 return Poll::Ready(Err(e)); 324 } 325 this.current_file = None; 326 continue; 327 } 328 329 // Write to the current file 330 match smol::block_on(file.write(&remaining_buf[..bytes_to_write])) { 331 Ok(bytes_written) => { 332 total_bytes_written += bytes_written; 333 this.position += bytes_written as u64; 334 remaining_buf = &remaining_buf[bytes_written..]; // Update the remaining buffer 335 if remaining_buf.is_empty() { 336 if let Err(e) = finalize_current_file(file, max_size) { 337 return Poll::Ready(Err(e)); 338 } 339 break; // No more data to write 340 } 341 342 // We wrote to the end of this file, use new file on next iteration 343 if bytes_written == bytes_to_write { 344 if let Err(e) = finalize_current_file(file, max_size) { 345 return Poll::Ready(Err(e)); 346 } 347 this.current_file = None; 348 } 349 } 350 Err(e) => return Poll::Ready(Err(e)), // Return error if write fails 351 } 352 } 353 354 Poll::Ready(Ok(total_bytes_written)) 355 } 356 357 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 358 Poll::Ready(Ok(())) // TODO 359 } 360 361 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 362 let this = self.get_mut(); 363 if let Some(file) = this.current_file.take() { 364 match smol::block_on(file.sync_all()) { 365 Ok(()) => Poll::Ready(Ok(())), 366 Err(e) => Poll::Ready(Err(e)), 367 } 368 } else { 369 Poll::Ready(Ok(())) // No file to close 370 } 371 } 372 }