list.rs
1 use anyhow::{Context as _, Result}; 2 3 use crate::{ 4 config::CONFIG, 5 util::{config_path, get_hostname, home, paths_equal, rerun_with_root_args, system_path}, 6 }; 7 use std::{ 8 fs::{self}, 9 iter, 10 path::{Path, PathBuf}, 11 sync::{ 12 Mutex, 13 atomic::{AtomicUsize, Ordering}, 14 }, 15 thread, 16 }; 17 18 #[derive(Default)] 19 struct PendingPaths { 20 queue: Mutex<Vec<PathBuf>>, 21 /// the len of the queue 22 len: AtomicUsize, 23 /// the amount of threads currently waiting to lock the queue 24 waiting: AtomicUsize, 25 } 26 impl PendingPaths { 27 /// Push to the queue. 28 /// Note that this may block the current thread 29 #[expect(clippy::expect_used)] // We only panic if another thread already did 30 fn push(&self, value: PathBuf) { 31 self.queue 32 .lock() 33 .expect("No other threads should panic") 34 .push(value); 35 self.len.fetch_add(1, Ordering::AcqRel); 36 } 37 /// Pop from the queue. 38 /// Note that this may block the current thread 39 #[expect(clippy::expect_used)] // We only panic if another thread already did 40 fn pop(&self) -> Option<PathBuf> { 41 self.queue 42 .lock() 43 .expect("No other threads should panic") 44 .pop() 45 .inspect(|_| { 46 // successful pop -> decrement self.len 47 self.len.fetch_sub(1, Ordering::AcqRel); 48 }) 49 } 50 fn len(&self) -> usize { 51 self.len.load(Ordering::Acquire) 52 } 53 fn waiting(&self) -> usize { 54 self.waiting.load(Ordering::Acquire) 55 } 56 fn start_waiting(&self) { 57 self.waiting.fetch_add(1, Ordering::AcqRel); 58 } 59 fn stop_waiting(&self) { 60 self.waiting.fetch_sub(1, Ordering::AcqRel); 61 } 62 } 63 64 /// Prints all symlinks on the system, that are probably made by dots 65 #[expect(clippy::unwrap_used)] // Cant really handle errors in worker threads, we'd unwrap them at some point anyways 66 pub fn list(rooted: bool, copy: Option<Vec<String>>) -> Result<()> { 67 if let Some(items) = copy { 68 return list_copy(items); 69 } 70 71 // Rerun with root if required 72 if CONFIG.root && !rooted { 73 rerun_with_root_args(&["--rooted"]); 74 } 75 76 let threads = thread::available_parallelism().map_or(12, Into::into); 77 78 // Set up pending paths 79 // Each thread has its own vec, to which it will push new paths 80 // If a thread's own vec is empty, it will try to get items from another thread's vec 81 // We keep an external atomic len for each vec, so the threads dont have to lock the mutex to see if there are any elements 82 // Additionally we keep a waiting field, so that threads can choose the least waited for vec 83 let pending_paths: Vec<_> = iter::repeat_with(PendingPaths::default) 84 .take(threads) 85 .collect(); 86 for (index, path) in CONFIG.list_paths.iter().enumerate() { 87 pending_paths[index].push(path.into()); 88 } 89 90 let pending = AtomicUsize::new(0); 91 92 // The borrow checker wont let us just capture i in 'for _ in ...', so we have to do this 93 let index = AtomicUsize::new(0); 94 95 thread::scope(|scope| { 96 for _ in 0..threads { 97 scope.spawn(|| { 98 let my_index = index.fetch_add(1, Ordering::Relaxed); // We dont care about ordering 99 100 loop { 101 // Try our own queue 102 if let Some(path) = pending_paths[my_index] 103 .pop() 104 // Or try stealing a path from another thread's queue 105 .or_else(|| try_steal_path(&pending_paths, my_index)) 106 { 107 process_path(&pending_paths, &pending, my_index, &path) 108 .with_context(|| format!("Failed to process path {}", path.display())) 109 .unwrap(); 110 continue; 111 } 112 113 // If no work is left, break 114 if pending.load(Ordering::Acquire) == 0 { 115 break; 116 } 117 118 // Avoid busy-looping 119 thread::yield_now(); 120 } 121 }); 122 } 123 }); 124 125 Ok(()) 126 } 127 128 /// Try to steal a pending path from another thread. 129 fn try_steal_path(pending_paths: &[PendingPaths], my_index: usize) -> Option<PathBuf> { 130 let mut candidate: Option<(usize, usize)> = None; // (thread_index, waiting) 131 132 // For all other threads 133 for (index, pending_paths) in pending_paths.iter().enumerate() { 134 // Skip ourselves 135 if index == my_index { 136 continue; 137 } 138 139 // If the other thread's queue has items 140 if pending_paths.len() > 0 { 141 let waiting = pending_paths.waiting(); 142 143 let this_candidate = Some((index, waiting)); 144 145 // If no one is currently waiting 146 if waiting == 0 { 147 // Immediately choose this thread 148 candidate = this_candidate; 149 break; 150 } 151 152 // Otherwise, choose the candidate with the smallest waiting count 153 candidate = match candidate { 154 None => this_candidate, 155 Some((_, current_waiting)) if waiting < current_waiting => this_candidate, 156 other => other, 157 }; 158 } 159 } 160 161 if let Some((other_index, _)) = candidate { 162 let other_pending_paths = &pending_paths[other_index]; 163 164 // start waiting -> pop -> stop waiting 165 other_pending_paths.start_waiting(); 166 let stolen = other_pending_paths.pop(); 167 other_pending_paths.stop_waiting(); 168 169 stolen 170 } else { 171 None 172 } 173 } 174 175 fn process_path( 176 pending_paths: &[PendingPaths], 177 pending: &AtomicUsize, 178 thread_index: usize, 179 path: &Path, 180 ) -> Result<()> { 181 // Add ourselves to pending 182 pending.fetch_add(1, Ordering::AcqRel); 183 184 if let Ok(read_dir) = fs::read_dir(path) { 185 // Ignore errors with .flatten() 186 for dir_entry in read_dir.flatten() { 187 let entry_path = dir_entry.path(); 188 189 // Get the file type 190 let file_type = dir_entry.file_type().with_context(|| { 191 format!("Failed to get file type of '{}'", entry_path.display()) 192 })?; 193 194 if file_type.is_symlink() { 195 // get the entries target 196 // Dont panic on failure 197 if let Ok(target) = fs::read_link(&entry_path) { 198 // If the target is in the files/ dir... 199 if let Ok(stripped) = target.strip_prefix(&CONFIG.files_path) 200 // ...and was plausibly created by dots... 201 && system_path(stripped)? == dir_entry.path() 202 { 203 // Convert to a string, so strip_prefix() doesnt remove leading slashes 204 if let Some(str) = stripped.to_str() { 205 let str = str.replace(&home()?, "/{home}"); 206 207 let formatted = str 208 .strip_prefix(&CONFIG.default_subdir) // If the subdir is the default one, remove it 209 .map(Into::into) 210 // If the subdir is the current hostname, replace it with {hostname} 211 .or_else(|| { 212 str.strip_prefix(&get_hostname().ok()?) 213 .map(|str| format!("{{hostname}}{str}")) 214 }) 215 .unwrap_or(str); 216 217 println!("{formatted}"); 218 } 219 } 220 } 221 } else if file_type.is_dir() { 222 let path = dir_entry.path(); 223 224 // Filter out ignored paths 225 if !CONFIG.ignore_paths.contains(&path) { 226 // Recurse into the dir 227 pending_paths[thread_index].push(path); 228 } 229 } 230 } 231 } 232 233 // Remove ourselves from pending 234 pending.fetch_sub(1, Ordering::AcqRel); 235 236 Ok(()) 237 } 238 239 fn list_copy(items: Vec<String>) -> Result<()> { 240 for item in items { 241 let path = Path::new(&item); 242 243 let config_path = config_path(path)?; 244 let system_path = system_path(path)?; 245 246 // If path exists on the system 247 if fs::exists(path).with_context( 248 || format!("checking if the path {} already exists", path.display()), 249 // And is equal to the one in the config 250 )? && paths_equal(&config_path, &system_path).is_ok() 251 { 252 // Print it 253 println!("{item}"); 254 } 255 } 256 257 Ok(()) 258 }