processor.rs
1 use anyhow::Result; 2 use dashmap::DashMap; 3 use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; 4 use rayon::iter::IntoParallelRefMutIterator; 5 use rayon::prelude::{IntoParallelIterator, ParallelIterator}; 6 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 7 use std::sync::{Arc, Mutex, TryLockError, TryLockResult}; 8 use std::time::Duration; 9 use unicode_segmentation::UnicodeSegmentation; 10 11 use crate::fileinfo::FileInfo; 12 use crate::params::Params; 13 14 pub struct Processor {} 15 16 impl Processor { 17 pub fn hashwise( 18 app_args: Arc<Params>, 19 sw_store: Arc<DashMap<u64, Vec<FileInfo>>>, 20 hw_store: Arc<DashMap<u128, Vec<FileInfo>>>, 21 progress_bar_box: Arc<MultiProgress>, 22 max_file_size: Arc<AtomicU64>, 23 seed: i64, 24 sw_sorting_finished: Arc<AtomicBool>, 25 ) -> Result<()> { 26 let progress_bar = match app_args.progress { 27 true => progress_bar_box.add(ProgressBar::new_spinner()), 28 false => ProgressBar::hidden(), 29 }; 30 31 let progress_style = ProgressStyle::with_template("[{elapsed_precise}] {pos:>7} {msg}")?; 32 progress_bar.set_style(progress_style); 33 progress_bar.enable_steady_tick(Duration::from_millis(50)); 34 progress_bar.set_message("files grouped by hash."); 35 36 loop { 37 let keys: Vec<u64> = sw_store 38 .clone() 39 .iter() 40 .filter(|i| !i.value().iter().all(|x| x.is_sw_processed())) 41 .filter(|i| i.value().len() > 1) 42 .map(|i| *i.key()) 43 .collect(); 44 45 if keys.is_empty() { 46 match sw_sorting_finished.load(std::sync::atomic::Ordering::Relaxed) { 47 true => { 48 progress_bar.finish_with_message("files grouped by hash."); 49 break Ok(()); 50 } 51 false => continue, 52 } 53 } else { 54 keys.into_par_iter().for_each(|key| { 55 let mut group: Vec<FileInfo> = sw_store.get(&key).unwrap().to_vec(); 56 if group.len() > 1 { 57 group.par_iter_mut().for_each(|file| { 58 progress_bar.inc(1); 59 file.sw_processed(); 60 61 let fhash = match app_args.strict { 62 true => file.hash(seed).expect("hashing file failed."), 63 false => file.initpages_hash(seed).expect("hashing file failed."), 64 }; 65 66 Self::compare_and_update_max_path_len( 67 max_file_size.clone(), 68 file.path.to_string_lossy().graphemes(true).count() as u64, 69 ); 70 71 hw_store 72 .entry(fhash) 73 .and_modify(|fileset| fileset.push(file.clone())) 74 .or_insert_with(|| vec![file.clone()]); 75 }); 76 }; 77 }); 78 } 79 } 80 } 81 82 pub fn compare_and_update_max_path_len(current: Arc<AtomicU64>, next: u64) { 83 if current.load(Ordering::Relaxed) < next { 84 current.store(next, Ordering::Release); 85 } 86 } 87 88 pub fn sizewise( 89 app_args: Arc<Params>, 90 scanner_finished: Arc<AtomicBool>, 91 store: Arc<DashMap<u64, Vec<FileInfo>>>, 92 files: Arc<Mutex<Vec<FileInfo>>>, 93 progress_bar_box: Arc<MultiProgress>, 94 ) -> Result<()> { 95 let progress_bar = match app_args.progress { 96 true => progress_bar_box.add(ProgressBar::new_spinner()), 97 false => ProgressBar::hidden(), 98 }; 99 100 let progress_style = ProgressStyle::with_template("[{elapsed_precise}] {pos:>7} {msg}")?; 101 progress_bar.set_style(progress_style); 102 progress_bar.enable_steady_tick(Duration::from_millis(50)); 103 progress_bar.set_message("files grouped by size"); 104 105 loop { 106 let fileopt: Option<FileInfo> = { 107 match files.try_lock() { 108 Ok(mut flist) => flist.pop(), 109 TryLockResult::Err(TryLockError::WouldBlock) => None, 110 _ => None, 111 } 112 }; 113 114 match fileopt { 115 Some(file) => { 116 progress_bar.inc(1); 117 store 118 .entry(file.size) 119 .and_modify(|fileset| fileset.push(file.clone())) 120 .or_insert_with(|| vec![file]); 121 continue; 122 } 123 None => match scanner_finished.load(std::sync::atomic::Ordering::Relaxed) { 124 true => { 125 progress_bar.finish_with_message("files grouped by size"); 126 break Ok(()); 127 } 128 false => continue, 129 }, 130 } 131 } 132 } 133 } 134 135 #[cfg(test)] 136 mod tests { 137 use anyhow::Result; 138 use dashmap::DashMap; 139 use indicatif::MultiProgress; 140 use rand::Rng; 141 use std::fs::File; 142 use std::io::Write; 143 use std::sync::atomic::{AtomicBool, AtomicU64}; 144 use std::sync::{Arc, Mutex}; 145 use tempfile::TempDir; 146 147 use crate::{fileinfo::FileInfo, params::Params}; 148 149 use super::Processor; 150 151 fn generate_bytes(size: usize) -> Vec<u8> { 152 let mut rng = rand::rng(); 153 (0..size).map(|_| rng.random::<u8>()).collect::<Vec<u8>>() 154 } 155 156 #[test] 157 fn hashwise_sorting_two_files_with_identical_init_pages_only_strict_mode() -> Result<()> { 158 let root = TempDir::new()?; 159 let content = generate_bytes(16384); 160 161 let mut content_x = content.clone(); 162 let mut content_y = content.clone(); 163 164 content_x.extend(generate_bytes(1720320)); 165 content_y.extend(generate_bytes(1720320)); 166 167 let files = [ 168 (root.path().join("fileone.bin"), content_x), 169 (root.path().join("filetwo.bin"), content_y), 170 ]; 171 172 for (fpath, content) in files.iter() { 173 let mut f = File::create_new(fpath)?; 174 f.write_all(content)?; 175 } 176 177 let dupstore = Arc::new(DashMap::new()); 178 let file_queue = Arc::new(Mutex::new( 179 files 180 .iter() 181 .map(|f| FileInfo::new(f.0.clone()).unwrap()) 182 .collect::<Vec<FileInfo>>(), 183 )); 184 185 let hw_dupstore = Arc::new(DashMap::new()); 186 Processor::sizewise( 187 Arc::new(Params::default()), 188 Arc::new(AtomicBool::new(true)), 189 dupstore.clone(), 190 file_queue, 191 Arc::new(MultiProgress::new()), 192 )?; 193 194 let args = Params { 195 strict: true, 196 ..Default::default() 197 }; 198 199 Processor::hashwise( 200 Arc::new(args), 201 dupstore.clone(), 202 hw_dupstore.clone(), 203 Arc::new(MultiProgress::new()), 204 Arc::new(AtomicU64::new(32)), 205 300, 206 Arc::new(AtomicBool::new(true)), 207 )?; 208 209 assert_eq!(hw_dupstore.len(), 2); 210 211 Ok(()) 212 } 213 214 #[test] 215 fn hashwise_sorting_two_files_with_identical_init_pages_only_fast_mode() -> Result<()> { 216 let root = TempDir::new()?; 217 let content = generate_bytes(16384); 218 219 let mut content_x = content.clone(); 220 let mut content_y = content.clone(); 221 222 content_x.extend(generate_bytes(1720320)); 223 content_y.extend(generate_bytes(1720320)); 224 225 let files = [ 226 (root.path().join("fileone.bin"), content_x), 227 (root.path().join("filetwo.bin"), content_y), 228 ]; 229 230 for (fpath, content) in files.iter() { 231 let mut f = File::create_new(fpath)?; 232 f.write_all(content)?; 233 } 234 235 let dupstore = Arc::new(DashMap::new()); 236 let file_queue = Arc::new(Mutex::new( 237 files 238 .iter() 239 .map(|f| FileInfo::new(f.0.clone()).unwrap()) 240 .collect::<Vec<FileInfo>>(), 241 )); 242 243 let hw_dupstore = Arc::new(DashMap::new()); 244 Processor::sizewise( 245 Arc::new(Params::default()), 246 Arc::new(AtomicBool::new(true)), 247 dupstore.clone(), 248 file_queue, 249 Arc::new(MultiProgress::new()), 250 )?; 251 252 Processor::hashwise( 253 Arc::new(Params::default()), 254 dupstore.clone(), 255 hw_dupstore.clone(), 256 Arc::new(MultiProgress::new()), 257 Arc::new(AtomicU64::new(32)), 258 300, 259 Arc::new(AtomicBool::new(true)), 260 )?; 261 262 assert_eq!(hw_dupstore.len(), 1); 263 264 Ok(()) 265 } 266 267 #[test] 268 fn hashwise_sorting_two_files_with_identical_data() -> Result<()> { 269 let root = TempDir::new()?; 270 let content = generate_bytes(282624); 271 let files = [ 272 (root.path().join("fileone.bin"), content.clone()), 273 (root.path().join("filetwo.bin"), content.clone()), 274 ]; 275 276 for (fpath, content) in files.iter() { 277 let mut f = File::create_new(fpath)?; 278 f.write_all(content)?; 279 } 280 281 let dupstore = Arc::new(DashMap::new()); 282 let file_queue = Arc::new(Mutex::new( 283 files 284 .iter() 285 .map(|f| FileInfo::new(f.0.clone()).unwrap()) 286 .collect::<Vec<FileInfo>>(), 287 )); 288 289 let hw_dupstore = Arc::new(DashMap::new()); 290 Processor::sizewise( 291 Arc::new(Params::default()), 292 Arc::new(AtomicBool::new(true)), 293 dupstore.clone(), 294 file_queue, 295 Arc::new(MultiProgress::new()), 296 )?; 297 298 Processor::hashwise( 299 Arc::new(Params::default()), 300 dupstore.clone(), 301 hw_dupstore.clone(), 302 Arc::new(MultiProgress::new()), 303 Arc::new(AtomicU64::new(32)), 304 300, 305 Arc::new(AtomicBool::new(true)), 306 )?; 307 308 assert_eq!(hw_dupstore.len(), 1); 309 310 Ok(()) 311 } 312 313 #[test] 314 fn sizewise_sorting_two_files_of_different_sizes() -> Result<()> { 315 let root = TempDir::new()?; 316 let files = [ 317 (root.path().join("fileone.bin"), generate_bytes(282624)), 318 (root.path().join("filetwo.bin"), generate_bytes(1720320)), 319 ]; 320 321 for (fpath, content) in files.iter() { 322 let mut f = File::create_new(fpath)?; 323 f.write_all(content)?; 324 } 325 326 let file_queue = Arc::new(Mutex::new( 327 files 328 .iter() 329 .map(|f| FileInfo::new(f.0.clone()).unwrap()) 330 .collect::<Vec<FileInfo>>(), 331 )); 332 333 let dupstore = Arc::new(DashMap::new()); 334 335 Processor::sizewise( 336 Arc::new(Params::default()), 337 Arc::new(AtomicBool::new(true)), 338 dupstore.clone(), 339 file_queue, 340 Arc::new(MultiProgress::new()), 341 )?; 342 343 assert_eq!(dupstore.len(), 2); 344 345 Ok(()) 346 } 347 348 #[test] 349 fn sizewise_sorting_two_files_of_same_size() -> Result<()> { 350 let root = TempDir::new()?; 351 let files = [ 352 (root.path().join("fileone.bin"), generate_bytes(282624)), 353 (root.path().join("filetwo.bin"), generate_bytes(282624)), 354 ]; 355 356 for (fpath, content) in files.iter() { 357 let mut f = File::create_new(fpath)?; 358 f.write_all(content)?; 359 } 360 361 let file_queue = Arc::new(Mutex::new( 362 files 363 .iter() 364 .map(|f| FileInfo::new(f.0.clone()).unwrap()) 365 .collect::<Vec<FileInfo>>(), 366 )); 367 368 let dupstore = Arc::new(DashMap::new()); 369 370 Processor::sizewise( 371 Arc::new(Params::default()), 372 Arc::new(AtomicBool::new(true)), 373 dupstore.clone(), 374 file_queue, 375 Arc::new(MultiProgress::new()), 376 )?; 377 378 assert_eq!(dupstore.len(), 1); 379 380 Ok(()) 381 } 382 }