/ src / geode / file_sequence.rs
file_sequence.rs
  1  /* This file is part of DarkFi (https://dark.fi)
  2   *
  3   * Copyright (C) 2020-2025 Dyne.org foundation
  4   *
  5   * This program is free software: you can redistribute it and/or modify
  6   * it under the terms of the GNU Affero General Public License as
  7   * published by the Free Software Foundation, either version 3 of the
  8   * License, or (at your option) any later version.
  9   *
 10   * This program is distributed in the hope that it will be useful,
 11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
 12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 13   * GNU Affero General Public License for more details.
 14   *
 15   * You should have received a copy of the GNU Affero General Public License
 16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 17   */
 18  
 19  use futures::{
 20      task::{Context, Poll},
 21      AsyncRead, AsyncSeek, AsyncWrite,
 22  };
 23  use smol::{
 24      fs::{File, OpenOptions},
 25      io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
 26  };
 27  use std::{collections::HashSet, path::PathBuf, pin::Pin};
 28  
 29  /// `FileSequence` is an object that implements `AsyncRead`, `AsyncSeek`, and
 30  /// `AsyncWrite` for an ordered list of (file path, file size).
 31  ///
 32  /// You can use it to read and write from/to a list a file, without having to
 33  /// manage individual file operations explicitly.
 34  ///
 35  /// This allows seamless handling of multiple files as if they were a single
 36  /// continuous file. It automatically opens the next file in the list when the
 37  /// current file is exhausted.
 38  ///
 39  /// It's also made so that files in `files` that do not exist on the filesystem
 40  /// will get skipped, without returning an Error. All files you want to read,
 41  /// write, and seek to should be created before using the FileSequence.
 42  #[derive(Debug)]
 43  pub struct FileSequence {
 44      /// List of (file path, file size). File sizes are not the sizes of the
 45      /// files as they currently are on the file system, but the sizes we want
 46      files: Vec<(PathBuf, u64)>,
 47      /// Currently opened file
 48      current_file: Option<File>,
 49      /// Index of the currently opened file in the `files` vector
 50      current_file_index: Option<usize>,
 51  
 52      position: u64,
 53      /// Set to `true` to automatically set the length of the file on the
 54      /// filesystem to it's size as defined in the `files` vector, after a write
 55      auto_set_len: bool,
 56  }
 57  
 58  impl FileSequence {
 59      pub fn new(files: &[(PathBuf, u64)], auto_set_len: bool) -> Self {
 60          Self {
 61              files: files.to_vec(),
 62              current_file: None,
 63              current_file_index: None,
 64              position: 0,
 65              auto_set_len,
 66          }
 67      }
 68  
 69      /// Update a single file size.
 70      pub fn set_file_size(&mut self, file_index: usize, file_size: u64) {
 71          self.files[file_index].1 = file_size;
 72      }
 73  
 74      /// Return `current_file`.
 75      pub fn get_current_file(&self) -> &Option<File> {
 76          &self.current_file
 77      }
 78  
 79      /// Return `files`.
 80      pub fn get_files(&self) -> &Vec<(PathBuf, u64)> {
 81          &self.files
 82      }
 83  
 84      /// Return the combined file size of all files.
 85      pub fn len(&self) -> u64 {
 86          self.files.iter().map(|(_, size)| size).sum()
 87      }
 88  
 89      /// Return `true` if the `FileSequence` contains no file.
 90      pub fn is_empty(&self) -> bool {
 91          self.files.is_empty()
 92      }
 93  
 94      /// Return the combined file size of all files.
 95      pub fn subset_len(&self, files: HashSet<PathBuf>) -> u64 {
 96          self.files.iter().filter(|(path, _)| files.contains(path)).map(|(_, size)| size).sum()
 97      }
 98  
 99      /// Compute the starting position of the file (in bytes) by suming up
100      /// the size of the previous files.
101      pub fn get_file_position(&self, file_index: usize) -> u64 {
102          let mut pos = 0;
103          for i in 0..file_index {
104              pos += self.files[i].1;
105          }
106          pos
107      }
108  
109      /// Open the file at (`current_file_index` + 1).
110      /// If no file is currently open (`current_file_index` is None), it opens
111      /// the first file.
112      async fn open_next_file(&mut self) -> io::Result<()> {
113          self.current_file = None;
114          self.current_file_index = match self.current_file_index {
115              Some(i) => Some(i + 1),
116              None => Some(0),
117          };
118          if self.current_file_index.unwrap() >= self.files.len() {
119              return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "No more files to open"))
120          }
121          let file = OpenOptions::new()
122              .read(true)
123              .write(true)
124              .create(false)
125              .open(self.files[self.current_file_index.unwrap()].0.clone())
126              .await?;
127          self.current_file = Some(file);
128          Ok(())
129      }
130  
131      /// Open the file at `file_index`.
132      async fn open_file(&mut self, file_index: usize) -> io::Result<()> {
133          self.current_file = None;
134          self.current_file_index = Some(file_index);
135          let file = OpenOptions::new()
136              .read(true)
137              .write(true)
138              .create(false)
139              .open(self.files[file_index].0.clone())
140              .await?;
141          self.current_file = Some(file);
142          Ok(())
143      }
144  }
145  
146  impl AsyncRead for FileSequence {
147      fn poll_read(
148          self: Pin<&mut Self>,
149          _: &mut Context<'_>,
150          buf: &mut [u8],
151      ) -> Poll<io::Result<usize>> {
152          let this = self.get_mut();
153          let mut total_read = 0;
154  
155          while total_read < buf.len() {
156              if this.current_file.is_none() {
157                  if let Some(file_index) = this.current_file_index {
158                      // Stop if there are no more files to read
159                      if file_index >= this.files.len() - 1 {
160                          return Poll::Ready(Ok(total_read));
161                      }
162                      let start_pos = this.get_file_position(file_index);
163                      let file_size = this.files[file_index].1 as usize;
164                      let file_pos = this.position - start_pos;
165                      let space_left = file_size - file_pos as usize;
166                      let skip_bytes = (buf.len() - total_read).min(space_left);
167                      this.position += skip_bytes as u64;
168                      total_read += skip_bytes;
169                  }
170  
171                  // Open the next file
172                  match smol::block_on(this.open_next_file()) {
173                      Ok(_) => {}
174                      Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
175                          return Poll::Ready(Ok(total_read));
176                      }
177                      Err(e) if e.kind() == io::ErrorKind::NotFound => {
178                          this.current_file = None;
179                          continue; // Skip to next file
180                      }
181                      Err(e) => return Poll::Ready(Err(e)),
182                  }
183              }
184  
185              // Read from the current file
186              let file = this.current_file.as_mut().unwrap();
187              match smol::block_on(file.read(&mut buf[total_read..])) {
188                  Ok(bytes_read) => {
189                      if bytes_read == 0 {
190                          this.current_file = None; // Move to the next file
191                      } else {
192                          total_read += bytes_read;
193                          this.position += bytes_read as u64;
194                      }
195                  }
196                  Err(e) => return Poll::Ready(Err(e)),
197              }
198          }
199  
200          Poll::Ready(Ok(total_read))
201      }
202  }
203  
204  impl AsyncSeek for FileSequence {
205      fn poll_seek(
206          self: Pin<&mut Self>,
207          _: &mut Context<'_>,
208          pos: SeekFrom,
209      ) -> Poll<io::Result<u64>> {
210          let this = self.get_mut();
211  
212          let abs_pos = match pos {
213              SeekFrom::Start(offset) => offset,
214              _ => todo!(), // TODO
215          };
216  
217          // Determine which file to seek in
218          let mut file_index = 0;
219          let mut bytes_offset = 0;
220  
221          while file_index < this.files.len() {
222              if bytes_offset + this.files[file_index].1 >= abs_pos {
223                  break;
224              }
225              bytes_offset += this.files[file_index].1;
226              file_index += 1;
227          }
228  
229          if file_index >= this.files.len() {
230              return Poll::Ready(Err(io::Error::new(
231                  io::ErrorKind::InvalidInput,
232                  "Seek position out of bounds",
233              )))
234          }
235  
236          this.position = abs_pos; // Update FileSequence position
237  
238          // Open the file
239          if this.current_file.is_none() ||
240              this.current_file_index.is_some() && this.current_file_index.unwrap() != file_index
241          {
242              match smol::block_on(this.open_file(file_index)) {
243                  Ok(_) => {}
244                  Err(e) if e.kind() == io::ErrorKind::NotFound => {
245                      // If the file does not exist, return without actually seeking it
246                      return Poll::Ready(Ok(this.position));
247                  }
248                  Err(e) => return Poll::Ready(Err(e)),
249              };
250          }
251  
252          let file = this.current_file.as_mut().unwrap();
253          let file_pos = abs_pos - bytes_offset;
254  
255          // Seek in the current file
256          match smol::block_on(file.seek(SeekFrom::Start(file_pos))) {
257              Ok(_) => Poll::Ready(Ok(this.position)),
258              Err(e) => Poll::Ready(Err(e)),
259          }
260      }
261  }
262  
263  impl AsyncWrite for FileSequence {
264      fn poll_write(
265          self: Pin<&mut Self>,
266          _: &mut Context<'_>,
267          buf: &[u8],
268      ) -> Poll<io::Result<usize>> {
269          let this = self.get_mut();
270          let mut total_bytes_written = 0;
271          let mut remaining_buf = buf;
272          let auto_set_len = this.auto_set_len;
273  
274          let finalize_current_file = |file: &mut File, max_size: u64| {
275              if auto_set_len {
276                  smol::block_on(file.set_len(max_size))?;
277              }
278              smol::block_on(file.flush())?;
279              Ok(())
280          };
281  
282          loop {
283              // Ensure the current file is open
284              if this.current_file.is_none() {
285                  if let Some(file_index) = this.current_file_index {
286                      if file_index >= this.files.len() - 1 {
287                          break; // No more files
288                      }
289                      if remaining_buf.is_empty() {
290                          break; // No more data to write
291                      }
292                      let start_pos = this.get_file_position(file_index);
293                      let file_size = this.files[file_index].1 as usize;
294                      let file_pos = this.position - start_pos;
295                      let space_left = file_size - file_pos as usize;
296                      let skip_bytes = remaining_buf.len().min(space_left);
297                      this.position += skip_bytes as u64;
298                      remaining_buf = &remaining_buf[skip_bytes..]; // Update the remaining buffer
299                  }
300  
301                  // Switch to the next file
302                  match smol::block_on(this.open_next_file()) {
303                      Ok(_) => {}
304                      Err(e) if e.kind() == io::ErrorKind::NotFound => {
305                          this.current_file = None;
306                          continue; // Skip to next file
307                      }
308                      Err(e) => return Poll::Ready(Err(e)),
309                  }
310              }
311  
312              let file = this.current_file.as_mut().unwrap();
313              let max_size = this.files[this.current_file_index.unwrap()].1;
314  
315              // Check how much space is left in the current file
316              let current_position = smol::block_on(file.seek(io::SeekFrom::Current(0)))?;
317              let space_left = max_size - current_position;
318              let bytes_to_write = remaining_buf.len().min(space_left as usize);
319  
320              if bytes_to_write == 0 {
321                  // Continue to the next iteration to check the new file
322                  if let Err(e) = finalize_current_file(file, max_size) {
323                      return Poll::Ready(Err(e));
324                  }
325                  this.current_file = None;
326                  continue;
327              }
328  
329              // Write to the current file
330              match smol::block_on(file.write(&remaining_buf[..bytes_to_write])) {
331                  Ok(bytes_written) => {
332                      total_bytes_written += bytes_written;
333                      this.position += bytes_written as u64;
334                      remaining_buf = &remaining_buf[bytes_written..]; // Update the remaining buffer
335                      if remaining_buf.is_empty() {
336                          if let Err(e) = finalize_current_file(file, max_size) {
337                              return Poll::Ready(Err(e));
338                          }
339                          break; // No more data to write
340                      }
341  
342                      // We wrote to the end of this file, use new file on next iteration
343                      if bytes_written == bytes_to_write {
344                          if let Err(e) = finalize_current_file(file, max_size) {
345                              return Poll::Ready(Err(e));
346                          }
347                          this.current_file = None;
348                      }
349                  }
350                  Err(e) => return Poll::Ready(Err(e)), // Return error if write fails
351              }
352          }
353  
354          Poll::Ready(Ok(total_bytes_written))
355      }
356  
357      fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
358          Poll::Ready(Ok(())) // TODO
359      }
360  
361      fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
362          let this = self.get_mut();
363          if let Some(file) = this.current_file.take() {
364              match smol::block_on(file.sync_all()) {
365                  Ok(()) => Poll::Ready(Ok(())),
366                  Err(e) => Poll::Ready(Err(e)),
367              }
368          } else {
369              Poll::Ready(Ok(())) // No file to close
370          }
371      }
372  }