server.rs
1 use std::sync::atomic::{AtomicBool, AtomicU64}; 2 use std::sync::{Arc, Mutex}; 3 4 use crate::processor::Processor; 5 use crate::scanner::Scanner; 6 use anyhow::Result; 7 use dashmap::DashMap; 8 use indicatif::{MultiProgress, ProgressDrawTarget}; 9 use rand::Rng; 10 use threadpool::ThreadPool; 11 12 use crate::fileinfo::FileInfo; 13 use crate::params::Params; 14 15 pub struct Server { 16 filequeue: Arc<Mutex<Vec<FileInfo>>>, 17 sw_duplicate_set: Arc<DashMap<u64, Vec<FileInfo>>>, 18 pub hw_duplicate_set: Arc<DashMap<u128, Vec<FileInfo>>>, 19 threadpool: ThreadPool, 20 app_args: Arc<Params>, 21 pub max_file_path_len: Arc<AtomicU64>, 22 } 23 24 impl Server { 25 pub fn new(opts: Params) -> Self { 26 Self { 27 filequeue: Arc::new(Mutex::new(Vec::new())), 28 sw_duplicate_set: Arc::new(DashMap::new()), 29 hw_duplicate_set: Arc::new(DashMap::new()), 30 threadpool: ThreadPool::new(4), 31 app_args: Arc::new(opts), 32 max_file_path_len: Arc::new(AtomicU64::new(0)), 33 } 34 } 35 36 pub fn start(&self) -> Result<()> { 37 let progbarbox = Arc::new(MultiProgress::new()); 38 let mut rng = rand::rng(); 39 let seed: i64 = rng.random(); 40 41 if !self.app_args.progress { 42 progbarbox.set_draw_target(ProgressDrawTarget::hidden()); 43 } 44 45 let (app_args_sc, app_args_sw, app_args_hw) = ( 46 Arc::clone(&self.app_args), 47 Arc::clone(&self.app_args), 48 Arc::clone(&self.app_args), 49 ); 50 let (file_queue_sc, file_queue_pr) = ( 51 Arc::clone(&self.filequeue), 52 Arc::clone(&self.filequeue), 53 ); 54 let scanner_finished = Arc::new(AtomicBool::new(false)); 55 let sw_sort_finished = Arc::new(AtomicBool::new(false)); 56 let (sfin_sc, sfin_pr) = ( 57 Arc::clone(&scanner_finished), 58 Arc::clone(&scanner_finished), 59 ); 60 let (swfin_pr_sw, swfin_pr_hw) = ( 61 Arc::clone(&sw_sort_finished), 62 Arc::clone(&sw_sort_finished), 63 ); 64 let (store_sw, store_sw2, store_hw) = ( 65 Arc::clone(&self.sw_duplicate_set), 66 Arc::clone(&self.sw_duplicate_set), 67 Arc::clone(&self.hw_duplicate_set), 68 ); 69 let max_file_path_len = Arc::clone(&self.max_file_path_len); 70 let (prog_sc, prog_sw, prog_hw) = ( 71 Arc::clone(&progbarbox), 72 Arc::clone(&progbarbox), 73 Arc::clone(&progbarbox), 74 ); 75 76 self.threadpool.execute(move || { 77 Scanner::new(app_args_sc) 78 .expect("unable to initialize scanner.") 79 .scan(file_queue_sc, prog_sc) 80 .expect("scanner failed."); 81 82 sfin_sc.store(true, std::sync::atomic::Ordering::Relaxed); 83 }); 84 85 self.threadpool.execute(move || { 86 Processor::sizewise( 87 app_args_sw, 88 sfin_pr, 89 store_sw, 90 file_queue_pr, 91 prog_sw, 92 ) 93 .expect("sizewise scanner failed."); 94 95 swfin_pr_sw.store(true, std::sync::atomic::Ordering::Relaxed); 96 }); 97 98 self.threadpool.execute(move || { 99 Processor::hashwise( 100 app_args_hw, 101 store_sw2, 102 store_hw, 103 prog_hw, 104 max_file_path_len, 105 seed, 106 swfin_pr_hw, 107 ) 108 .expect("sizewise scanner failed."); 109 }); 110 111 progbarbox.clear()?; 112 113 self.threadpool.join(); 114 115 Ok(()) 116 } 117 }