/ src / list.rs
list.rs
  1  use anyhow::{Context as _, Result};
  2  
  3  use crate::{
  4      config::CONFIG,
  5      util::{config_path, get_hostname, home, paths_equal, rerun_with_root_args, system_path},
  6  };
  7  use std::{
  8      fs::{self},
  9      iter,
 10      path::{Path, PathBuf},
 11      sync::{
 12          Mutex,
 13          atomic::{AtomicUsize, Ordering},
 14      },
 15      thread,
 16  };
 17  
 18  #[derive(Default)]
 19  struct PendingPaths {
 20      queue: Mutex<Vec<PathBuf>>,
 21      /// the len of the queue
 22      len: AtomicUsize,
 23      /// the amount of threads currently waiting to lock the queue
 24      waiting: AtomicUsize,
 25  }
 26  impl PendingPaths {
 27      /// Push to the queue.
 28      /// Note that this may block the current thread
 29      #[expect(clippy::expect_used)] // We only panic if another thread already did
 30      fn push(&self, value: PathBuf) {
 31          self.queue
 32              .lock()
 33              .expect("No other threads should panic")
 34              .push(value);
 35          self.len.fetch_add(1, Ordering::AcqRel);
 36      }
 37      /// Pop from the queue.
 38      /// Note that this may block the current thread
 39      #[expect(clippy::expect_used)] // We only panic if another thread already did
 40      fn pop(&self) -> Option<PathBuf> {
 41          self.queue
 42              .lock()
 43              .expect("No other threads should panic")
 44              .pop()
 45              .inspect(|_| {
 46                  // successful pop -> decrement self.len
 47                  self.len.fetch_sub(1, Ordering::AcqRel);
 48              })
 49      }
 50      fn len(&self) -> usize {
 51          self.len.load(Ordering::Acquire)
 52      }
 53      fn waiting(&self) -> usize {
 54          self.waiting.load(Ordering::Acquire)
 55      }
 56      fn start_waiting(&self) {
 57          self.waiting.fetch_add(1, Ordering::AcqRel);
 58      }
 59      fn stop_waiting(&self) {
 60          self.waiting.fetch_sub(1, Ordering::AcqRel);
 61      }
 62  }
 63  
 64  /// Prints all symlinks on the system, that are probably made by dots
 65  #[expect(clippy::unwrap_used)] // Cant really handle errors in worker threads, we'd unwrap them at some point anyways
 66  pub fn list(rooted: bool, copy: Option<Vec<String>>) -> Result<()> {
 67      if let Some(items) = copy {
 68          return list_copy(items);
 69      }
 70  
 71      // Rerun with root if required
 72      if CONFIG.root && !rooted {
 73          rerun_with_root_args(&["--rooted"]);
 74      }
 75  
 76      let threads = thread::available_parallelism().map_or(12, Into::into);
 77  
 78      // Set up pending paths
 79      // Each thread has its own vec, to which it will push new paths
 80      // If a thread's own vec is empty, it will try to get items from another thread's vec
 81      // We keep an external atomic len for each vec, so the threads dont have to lock the mutex to see if there are any elements
 82      // Additionally we keep a waiting field, so that threads can choose the least waited for vec
 83      let pending_paths: Vec<_> = iter::repeat_with(PendingPaths::default)
 84          .take(threads)
 85          .collect();
 86      for (index, path) in CONFIG.list_paths.iter().enumerate() {
 87          pending_paths[index].push(path.into());
 88      }
 89  
 90      let pending = AtomicUsize::new(0);
 91  
 92      // The borrow checker wont let us just capture i in 'for _ in ...', so we have to do this
 93      let index = AtomicUsize::new(0);
 94  
 95      thread::scope(|scope| {
 96          for _ in 0..threads {
 97              scope.spawn(|| {
 98                  let my_index = index.fetch_add(1, Ordering::Relaxed); // We dont care about ordering
 99  
100                  loop {
101                      // Try our own queue
102                      if let Some(path) = pending_paths[my_index]
103                          .pop()
104                          // Or try stealing a path from another thread's queue
105                          .or_else(|| try_steal_path(&pending_paths, my_index))
106                      {
107                          process_path(&pending_paths, &pending, my_index, &path)
108                              .with_context(|| format!("Failed to process path {}", path.display()))
109                              .unwrap();
110                          continue;
111                      }
112  
113                      // If no work is left, break
114                      if pending.load(Ordering::Acquire) == 0 {
115                          break;
116                      }
117  
118                      // Avoid busy-looping
119                      thread::yield_now();
120                  }
121              });
122          }
123      });
124  
125      Ok(())
126  }
127  
128  /// Try to steal a pending path from another thread.
129  fn try_steal_path(pending_paths: &[PendingPaths], my_index: usize) -> Option<PathBuf> {
130      let mut candidate: Option<(usize, usize)> = None; // (thread_index, waiting)
131  
132      // For all other threads
133      for (index, pending_paths) in pending_paths.iter().enumerate() {
134          // Skip ourselves
135          if index == my_index {
136              continue;
137          }
138  
139          // If the other thread's queue has items
140          if pending_paths.len() > 0 {
141              let waiting = pending_paths.waiting();
142  
143              let this_candidate = Some((index, waiting));
144  
145              // If no one is currently waiting
146              if waiting == 0 {
147                  // Immediately choose this thread
148                  candidate = this_candidate;
149                  break;
150              }
151  
152              // Otherwise, choose the candidate with the smallest waiting count
153              candidate = match candidate {
154                  None => this_candidate,
155                  Some((_, current_waiting)) if waiting < current_waiting => this_candidate,
156                  other => other,
157              };
158          }
159      }
160  
161      if let Some((other_index, _)) = candidate {
162          let other_pending_paths = &pending_paths[other_index];
163  
164          // start waiting -> pop -> stop waiting
165          other_pending_paths.start_waiting();
166          let stolen = other_pending_paths.pop();
167          other_pending_paths.stop_waiting();
168  
169          stolen
170      } else {
171          None
172      }
173  }
174  
175  fn process_path(
176      pending_paths: &[PendingPaths],
177      pending: &AtomicUsize,
178      thread_index: usize,
179      path: &Path,
180  ) -> Result<()> {
181      // Add ourselves to pending
182      pending.fetch_add(1, Ordering::AcqRel);
183  
184      if let Ok(read_dir) = fs::read_dir(path) {
185          // Ignore errors with .flatten()
186          for dir_entry in read_dir.flatten() {
187              let entry_path = dir_entry.path();
188  
189              // Get the file type
190              let file_type = dir_entry.file_type().with_context(|| {
191                  format!("Failed to get file type of '{}'", entry_path.display())
192              })?;
193  
194              if file_type.is_symlink() {
195                  // get the entries target
196                  // Dont panic on failure
197                  if let Ok(target) = fs::read_link(&entry_path) {
198                      // If the target is in the files/ dir...
199                      if let Ok(stripped) = target.strip_prefix(&CONFIG.files_path)
200                  // ...and was plausibly created by dots...
201                  && system_path(stripped)? == dir_entry.path()
202                      {
203                          // Convert to a string, so strip_prefix() doesnt remove leading slashes
204                          if let Some(str) = stripped.to_str() {
205                              let str = str.replace(&home()?, "/{home}");
206  
207                              let formatted = str
208                                  .strip_prefix(&CONFIG.default_subdir) // If the subdir is the default one, remove it
209                                  .map(Into::into)
210                                  // If the subdir is the current hostname, replace it with {hostname}
211                                  .or_else(|| {
212                                      str.strip_prefix(&get_hostname().ok()?)
213                                          .map(|str| format!("{{hostname}}{str}"))
214                                  })
215                                  .unwrap_or(str);
216  
217                              println!("{formatted}");
218                          }
219                      }
220                  }
221              } else if file_type.is_dir() {
222                  let path = dir_entry.path();
223  
224                  // Filter out ignored paths
225                  if !CONFIG.ignore_paths.contains(&path) {
226                      // Recurse into the dir
227                      pending_paths[thread_index].push(path);
228                  }
229              }
230          }
231      }
232  
233      // Remove ourselves from pending
234      pending.fetch_sub(1, Ordering::AcqRel);
235  
236      Ok(())
237  }
238  
239  fn list_copy(items: Vec<String>) -> Result<()> {
240      for item in items {
241          let path = Path::new(&item);
242  
243          let config_path = config_path(path)?;
244          let system_path = system_path(path)?;
245  
246          // If path exists on the system
247          if fs::exists(path).with_context(
248              || format!("checking if the path {} already exists", path.display()),
249              // And is equal to the one in the config
250          )? && paths_equal(&config_path, &system_path).is_ok()
251          {
252              // Print it
253              println!("{item}");
254          }
255      }
256  
257      Ok(())
258  }