/ src / processor.rs
processor.rs
  1  use anyhow::Result;
  2  use dashmap::DashMap;
  3  use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
  4  use rayon::iter::IntoParallelRefMutIterator;
  5  use rayon::prelude::{IntoParallelIterator, ParallelIterator};
  6  use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
  7  use std::sync::{Arc, Mutex, TryLockError, TryLockResult};
  8  use std::time::Duration;
  9  use unicode_segmentation::UnicodeSegmentation;
 10  
 11  use crate::fileinfo::FileInfo;
 12  use crate::params::Params;
 13  
 14  pub struct Processor {}
 15  
 16  impl Processor {
 17      pub fn hashwise(
 18          app_args: Arc<Params>,
 19          sw_store: Arc<DashMap<u64, Vec<FileInfo>>>,
 20          hw_store: Arc<DashMap<u128, Vec<FileInfo>>>,
 21          progress_bar_box: Arc<MultiProgress>,
 22          max_file_size: Arc<AtomicU64>,
 23          seed: i64,
 24          sw_sorting_finished: Arc<AtomicBool>,
 25      ) -> Result<()> {
 26          let progress_bar = match app_args.progress {
 27              true => progress_bar_box.add(ProgressBar::new_spinner()),
 28              false => ProgressBar::hidden(),
 29          };
 30  
 31          let progress_style = ProgressStyle::with_template("[{elapsed_precise}] {pos:>7} {msg}")?;
 32          progress_bar.set_style(progress_style);
 33          progress_bar.enable_steady_tick(Duration::from_millis(50));
 34          progress_bar.set_message("files grouped by hash.");
 35  
 36          loop {
 37              let keys: Vec<u64> = sw_store
 38                  .clone()
 39                  .iter()
 40                  .filter(|i| !i.value().iter().all(|x| x.is_sw_processed()))
 41                  .filter(|i| i.value().len() > 1)
 42                  .map(|i| *i.key())
 43                  .collect();
 44  
 45              if keys.is_empty() {
 46                  match sw_sorting_finished.load(std::sync::atomic::Ordering::Relaxed) {
 47                      true => {
 48                          progress_bar.finish_with_message("files grouped by hash.");
 49                          break Ok(());
 50                      }
 51                      false => continue,
 52                  }
 53              } else {
 54                  keys.into_par_iter().for_each(|key| {
 55                      let mut group: Vec<FileInfo> = sw_store.get(&key).unwrap().to_vec();
 56                      if group.len() > 1 {
 57                          group.par_iter_mut().for_each(|file| {
 58                              progress_bar.inc(1);
 59                              file.sw_processed();
 60  
 61                              let fhash = match app_args.strict {
 62                                  true => file.hash(seed).expect("hashing file failed."),
 63                                  false => file.initpages_hash(seed).expect("hashing file failed."),
 64                              };
 65  
 66                              Self::compare_and_update_max_path_len(
 67                                  max_file_size.clone(),
 68                                  file.path.to_string_lossy().graphemes(true).count() as u64,
 69                              );
 70  
 71                              hw_store
 72                                  .entry(fhash)
 73                                  .and_modify(|fileset| fileset.push(file.clone()))
 74                                  .or_insert_with(|| vec![file.clone()]);
 75                          });
 76                      };
 77                  });
 78              }
 79          }
 80      }
 81  
 82      pub fn compare_and_update_max_path_len(current: Arc<AtomicU64>, next: u64) {
 83          if current.load(Ordering::Relaxed) < next {
 84              current.store(next, Ordering::Release);
 85          }
 86      }
 87  
 88      pub fn sizewise(
 89          app_args: Arc<Params>,
 90          scanner_finished: Arc<AtomicBool>,
 91          store: Arc<DashMap<u64, Vec<FileInfo>>>,
 92          files: Arc<Mutex<Vec<FileInfo>>>,
 93          progress_bar_box: Arc<MultiProgress>,
 94      ) -> Result<()> {
 95          let progress_bar = match app_args.progress {
 96              true => progress_bar_box.add(ProgressBar::new_spinner()),
 97              false => ProgressBar::hidden(),
 98          };
 99  
100          let progress_style = ProgressStyle::with_template("[{elapsed_precise}] {pos:>7} {msg}")?;
101          progress_bar.set_style(progress_style);
102          progress_bar.enable_steady_tick(Duration::from_millis(50));
103          progress_bar.set_message("files grouped by size");
104  
105          loop {
106              let fileopt: Option<FileInfo> = {
107                  match files.try_lock() {
108                      Ok(mut flist) => flist.pop(),
109                      TryLockResult::Err(TryLockError::WouldBlock) => None,
110                      _ => None,
111                  }
112              };
113  
114              match fileopt {
115                  Some(file) => {
116                      progress_bar.inc(1);
117                      store
118                          .entry(file.size)
119                          .and_modify(|fileset| fileset.push(file.clone()))
120                          .or_insert_with(|| vec![file]);
121                      continue;
122                  }
123                  None => match scanner_finished.load(std::sync::atomic::Ordering::Relaxed) {
124                      true => {
125                          progress_bar.finish_with_message("files grouped by size");
126                          break Ok(());
127                      }
128                      false => continue,
129                  },
130              }
131          }
132      }
133  }
134  
135  #[cfg(test)]
136  mod tests {
137      use anyhow::Result;
138      use dashmap::DashMap;
139      use indicatif::MultiProgress;
140      use rand::Rng;
141      use std::fs::File;
142      use std::io::Write;
143      use std::sync::atomic::{AtomicBool, AtomicU64};
144      use std::sync::{Arc, Mutex};
145      use tempfile::TempDir;
146  
147      use crate::{fileinfo::FileInfo, params::Params};
148  
149      use super::Processor;
150  
151      fn generate_bytes(size: usize) -> Vec<u8> {
152          let mut rng = rand::rng();
153          (0..size).map(|_| rng.random::<u8>()).collect::<Vec<u8>>()
154      }
155  
156      #[test]
157      fn hashwise_sorting_two_files_with_identical_init_pages_only_strict_mode() -> Result<()> {
158          let root = TempDir::new()?;
159          let content = generate_bytes(16384);
160  
161          let mut content_x = content.clone();
162          let mut content_y = content.clone();
163  
164          content_x.extend(generate_bytes(1720320));
165          content_y.extend(generate_bytes(1720320));
166  
167          let files = [
168              (root.path().join("fileone.bin"), content_x),
169              (root.path().join("filetwo.bin"), content_y),
170          ];
171  
172          for (fpath, content) in files.iter() {
173              let mut f = File::create_new(fpath)?;
174              f.write_all(content)?;
175          }
176  
177          let dupstore = Arc::new(DashMap::new());
178          let file_queue = Arc::new(Mutex::new(
179              files
180                  .iter()
181                  .map(|f| FileInfo::new(f.0.clone()).unwrap())
182                  .collect::<Vec<FileInfo>>(),
183          ));
184  
185          let hw_dupstore = Arc::new(DashMap::new());
186          Processor::sizewise(
187              Arc::new(Params::default()),
188              Arc::new(AtomicBool::new(true)),
189              dupstore.clone(),
190              file_queue,
191              Arc::new(MultiProgress::new()),
192          )?;
193  
194          let args = Params {
195              strict: true,
196              ..Default::default()
197          };
198  
199          Processor::hashwise(
200              Arc::new(args),
201              dupstore.clone(),
202              hw_dupstore.clone(),
203              Arc::new(MultiProgress::new()),
204              Arc::new(AtomicU64::new(32)),
205              300,
206              Arc::new(AtomicBool::new(true)),
207          )?;
208  
209          assert_eq!(hw_dupstore.len(), 2);
210  
211          Ok(())
212      }
213  
214      #[test]
215      fn hashwise_sorting_two_files_with_identical_init_pages_only_fast_mode() -> Result<()> {
216          let root = TempDir::new()?;
217          let content = generate_bytes(16384);
218  
219          let mut content_x = content.clone();
220          let mut content_y = content.clone();
221  
222          content_x.extend(generate_bytes(1720320));
223          content_y.extend(generate_bytes(1720320));
224  
225          let files = [
226              (root.path().join("fileone.bin"), content_x),
227              (root.path().join("filetwo.bin"), content_y),
228          ];
229  
230          for (fpath, content) in files.iter() {
231              let mut f = File::create_new(fpath)?;
232              f.write_all(content)?;
233          }
234  
235          let dupstore = Arc::new(DashMap::new());
236          let file_queue = Arc::new(Mutex::new(
237              files
238                  .iter()
239                  .map(|f| FileInfo::new(f.0.clone()).unwrap())
240                  .collect::<Vec<FileInfo>>(),
241          ));
242  
243          let hw_dupstore = Arc::new(DashMap::new());
244          Processor::sizewise(
245              Arc::new(Params::default()),
246              Arc::new(AtomicBool::new(true)),
247              dupstore.clone(),
248              file_queue,
249              Arc::new(MultiProgress::new()),
250          )?;
251  
252          Processor::hashwise(
253              Arc::new(Params::default()),
254              dupstore.clone(),
255              hw_dupstore.clone(),
256              Arc::new(MultiProgress::new()),
257              Arc::new(AtomicU64::new(32)),
258              300,
259              Arc::new(AtomicBool::new(true)),
260          )?;
261  
262          assert_eq!(hw_dupstore.len(), 1);
263  
264          Ok(())
265      }
266  
267      #[test]
268      fn hashwise_sorting_two_files_with_identical_data() -> Result<()> {
269          let root = TempDir::new()?;
270          let content = generate_bytes(282624);
271          let files = [
272              (root.path().join("fileone.bin"), content.clone()),
273              (root.path().join("filetwo.bin"), content.clone()),
274          ];
275  
276          for (fpath, content) in files.iter() {
277              let mut f = File::create_new(fpath)?;
278              f.write_all(content)?;
279          }
280  
281          let dupstore = Arc::new(DashMap::new());
282          let file_queue = Arc::new(Mutex::new(
283              files
284                  .iter()
285                  .map(|f| FileInfo::new(f.0.clone()).unwrap())
286                  .collect::<Vec<FileInfo>>(),
287          ));
288  
289          let hw_dupstore = Arc::new(DashMap::new());
290          Processor::sizewise(
291              Arc::new(Params::default()),
292              Arc::new(AtomicBool::new(true)),
293              dupstore.clone(),
294              file_queue,
295              Arc::new(MultiProgress::new()),
296          )?;
297  
298          Processor::hashwise(
299              Arc::new(Params::default()),
300              dupstore.clone(),
301              hw_dupstore.clone(),
302              Arc::new(MultiProgress::new()),
303              Arc::new(AtomicU64::new(32)),
304              300,
305              Arc::new(AtomicBool::new(true)),
306          )?;
307  
308          assert_eq!(hw_dupstore.len(), 1);
309  
310          Ok(())
311      }
312  
313      #[test]
314      fn sizewise_sorting_two_files_of_different_sizes() -> Result<()> {
315          let root = TempDir::new()?;
316          let files = [
317              (root.path().join("fileone.bin"), generate_bytes(282624)),
318              (root.path().join("filetwo.bin"), generate_bytes(1720320)),
319          ];
320  
321          for (fpath, content) in files.iter() {
322              let mut f = File::create_new(fpath)?;
323              f.write_all(content)?;
324          }
325  
326          let file_queue = Arc::new(Mutex::new(
327              files
328                  .iter()
329                  .map(|f| FileInfo::new(f.0.clone()).unwrap())
330                  .collect::<Vec<FileInfo>>(),
331          ));
332  
333          let dupstore = Arc::new(DashMap::new());
334  
335          Processor::sizewise(
336              Arc::new(Params::default()),
337              Arc::new(AtomicBool::new(true)),
338              dupstore.clone(),
339              file_queue,
340              Arc::new(MultiProgress::new()),
341          )?;
342  
343          assert_eq!(dupstore.len(), 2);
344  
345          Ok(())
346      }
347  
348      #[test]
349      fn sizewise_sorting_two_files_of_same_size() -> Result<()> {
350          let root = TempDir::new()?;
351          let files = [
352              (root.path().join("fileone.bin"), generate_bytes(282624)),
353              (root.path().join("filetwo.bin"), generate_bytes(282624)),
354          ];
355  
356          for (fpath, content) in files.iter() {
357              let mut f = File::create_new(fpath)?;
358              f.write_all(content)?;
359          }
360  
361          let file_queue = Arc::new(Mutex::new(
362              files
363                  .iter()
364                  .map(|f| FileInfo::new(f.0.clone()).unwrap())
365                  .collect::<Vec<FileInfo>>(),
366          ));
367  
368          let dupstore = Arc::new(DashMap::new());
369  
370          Processor::sizewise(
371              Arc::new(Params::default()),
372              Arc::new(AtomicBool::new(true)),
373              dupstore.clone(),
374              file_queue,
375              Arc::new(MultiProgress::new()),
376          )?;
377  
378          assert_eq!(dupstore.len(), 1);
379  
380          Ok(())
381      }
382  }