/ src / gatherer.rs
gatherer.rs
  1  //! The Gatherer manages threads which walking directories. For each element found a custom processor function is called which may
  2  //! add directories back to the list of directories to process and found entries to an output queue.
  3  use std::io;
  4  use std::sync::{Arc, Weak};
  5  use std::thread;
  6  use std::ops::DerefMut;
  7  use std::path::Path;
  8  use std::ffi::OsStr;
  9  
 10  use mpmcpq::*;
 11  use crossbeam_channel::{bounded, Receiver, Sender};
 12  use parking_lot::Mutex;
 13  
 14  #[allow(unused_imports)]
 15  use crate::{debug, error, info, trace, warn};
 16  use crate::*;
 17  
 18  /// The type of the user supplied closure/function to process entries.  Takes a GathererHandle
 19  /// which defines the API for pushing things back on the Gatherers queues and a ProcessMessage
 20  /// which wraps the things to be handled.
 21  // PLANNED: Dir will become part of ObjectPath
 22  pub type ProcessFn = dyn Fn(GathererHandle, ProcessMessage) + Send + Sync;
 23  
 24  /// The input to the ProcessFn which dispatches actions on the queues via the GathererHandle.
 25  pub enum ProcessMessage {
 26      /// Either an openat::Entry for iterated directory entries or an error.
 27      Result(io::Result<openat::Entry>, ObjectPath),
 28      /// A notification that a ObjectPath processing is finished.
 29      ObjectDone(ObjectPath),
 30      /// The ProcessFn is called with this after all entries of an directory are processed (but
 31      /// not its subdirectories). This is used to notify that no more entries of the saied
 32      /// directory are to be expected. Note that ObjectDone messages may still appear.
 33      EndOfDirectory(ObjectPath),
 34  }
 35  
 36  pub(crate) type GathererStash<'a> = Stash<'a, DirectoryGatherMessage, u64>;
 37  
 38  /// The Gatherer manages the queues and threads traversing directories.
 39  /// There should be only one 'Gatherer' around as it is used to merge hardlinks
 40  /// and needs to have a global picture of all indexed files.
 41  pub struct Gatherer(Arc<GathererInner>);
 42  
 43  impl Gatherer {
 44      /// Creates a gatherer builder used to configure the gatherer. Uses conservative defaults,
 45      /// 16 threads and 64k backlog.
 46      #[must_use = "configure the Gatherer and finally call .start()"]
 47      pub fn build() -> GathererBuilder {
 48          GathererBuilder::new()
 49      }
 50  
 51      /// Try to upgrade a Weak reference into a Gatherer.
 52      pub(crate) fn upgrade(weak: &Weak<GathererInner>) -> Option<Gatherer> {
 53          weak.upgrade().map(Gatherer)
 54      }
 55  
 56      /// Get a Weak reference from a Gatherer.
 57      pub(crate) fn downgrade(&self) -> Weak<GathererInner> {
 58          Arc::downgrade(&self.0)
 59      }
 60  
 61      /// Access to the inner data.
 62      pub(crate) fn inner(&self) -> &GathererInner {
 63          &*self.0
 64      }
 65  
 66      /// Returns the an Arc of the receiver side of output channel 'n'.
 67      pub fn channel(&self, n: usize) -> Arc<Receiver<InventoryEntryMessage>> {
 68          self.0.output_channels[n].1.clone()
 69      }
 70  
 71      /// Returns the number of output channels.
 72      pub fn num_channels(&self) -> usize {
 73          self.0.output_channels.len() as usize
 74      }
 75  
 76      /// Returns a Vec with all receiving sides of the output channels.
 77      pub fn channels_as_vec(&self) -> Vec<Arc<Receiver<InventoryEntryMessage>>> {
 78          self.0
 79              .output_channels
 80              .iter()
 81              .map(|(_, r)| r.clone())
 82              .collect()
 83      }
 84  
 85      /// Adds a name to the interned names, returns that.
 86      pub fn name_interning(&self, name: &OsStr) -> InternedName {
 87          self.0.names.interning(name)
 88      }
 89  
 90      /// Adds a directory to the processing queue of the inventory. This is the main function
 91      /// to initiate a directory traversal.
 92      // pub fn load_dir_recursive(&self, path: ObjectPath) {
 93      pub fn load_dir_recursive<P: AsRef<Path>>(&self, path: P, watch: bool) {
 94          let path = ObjectPath::new(path, self);
 95          if let Ok(dir) = path.dir() {
 96              self.0.dirs_lru.preserve(dir);
 97          };
 98  
 99          path.watch(watch);
100  
101          let mut stash = self.0.kickoff_stash.lock();
102          self.0.send_dir(
103              DirectoryGatherMessage::new_dir(path),
104              u64::MAX, /* initial message priority instead depth/inode calculation, added
105                         * directories are processed at the lowest priority */
106              Some(stash.deref_mut()),
107          );
108          self.0.dirs_queue.sync(stash.deref_mut());
109      }
110  
111      // TODO: fn shutdown, there is currently no way to free a Gatherer as the threads keep it alive
112  
113      /// called when a watched ObjectPath's strong_count becomes dropped equal or below 2.
114      pub(crate) fn notify_path_dropped(&self, path: ObjectPath) {
115          (self.0.processor)(
116              GathererHandle::new(
117                  // infallible since caller checked already
118                  &path.gatherer().unwrap(),
119                  None,
120              ),
121              ProcessMessage::ObjectDone(path),
122          );
123      }
124  
125      /// Spawns a single gatherer thread
126      fn spawn_gather_thread(&self, n: usize) -> io::Result<thread::JoinHandle<()>> {
127          thread::Builder::new().name(format!("gather/{}", n)).spawn({
128              let gatherer = Gatherer(self.0.clone());
129              move || {
130                  debug!("thread started");
131                  let stash: GathererStash = Stash::new(&gatherer.0.dirs_queue);
132                  loop {
133                      use DirectoryGatherMessage::*;
134  
135                      // TODO: messages for dir enter/leave on the ouput queue
136                      match gatherer.0.dirs_queue.recv_guard().message() {
137                          mpmcpq::Message::Msg(TraverseDirectory { path }, prio) => {
138                              gatherer.0.dirs_lru.expire_until(gatherer.0.lru_batch, &|| {
139                                  used_handles() < gatherer.0.fd_limit
140                              });
141  
142                              path.dir()
143                                  .map(|dir| {
144                                      trace!(
145                                          "opened fd {:?}: for {:?}: depth {}",
146                                          dir,
147                                          path.to_pathbuf(),
148                                          path.depth()
149                                      );
150  
151                                      gatherer.0.dirs_lru.preserve(dir.clone());
152  
153                                      // Clone the path to increment the refcount to trigger notification reliably
154                                      let path = path.clone();
155                                      dir.list_self()
156                                          .map(|dir_iter| {
157                                              dir_iter.for_each(|entry| {
158                                                  (gatherer.0.processor)(
159                                                      GathererHandle::new(&gatherer, Some(&stash)),
160                                                      ProcessMessage::Result(entry, path.clone()),
161                                                  );
162                                              });
163  
164                                              (gatherer.0.processor)(
165                                                  GathererHandle::new(&gatherer, Some(&stash)),
166                                                  ProcessMessage::EndOfDirectory(path.clone()),
167                                              );
168  
169                                              gatherer.0.dirs_queue.sync(&stash);
170                                              crate::dirhandle::dec_handles();
171                                          })
172                                          .map_err(|err| {
173                                              if err.raw_os_error() == Some(libc::EMFILE) {
174                                                  gatherer.0.resend_dir(
175                                                      TraverseDirectory { path: path.clone() },
176                                                      *prio,
177                                                      &stash,
178                                                  );
179                                              } else {
180                                                  (gatherer.0.processor)(
181                                                      GathererHandle::new(&gatherer, Some(&stash)),
182                                                      ProcessMessage::Result(Err(err), path.clone()),
183                                                  );
184                                              }
185                                          })
186                                  })
187                                  .map_err(|err| {
188                                      if err.raw_os_error() == Some(libc::EMFILE) {
189                                          warn!("filehandles exhausted");
190                                          gatherer.0.dirs_lru.expire_all();
191                                          gatherer.0.resend_dir(
192                                              TraverseDirectory { path: path.clone() },
193                                              *prio,
194                                              &stash,
195                                          );
196                                      } else {
197                                          (gatherer.0.processor)(
198                                              GathererHandle::new(&gatherer, Some(&stash)),
199                                              ProcessMessage::Result(Err(err), path.clone()),
200                                          );
201                                      }
202                                  })
203                                  .ok();
204                          }
205                          mpmcpq::Message::Drained => {
206                              trace!("drained!!!");
207                              (0..gatherer.0.output_channels.len()).for_each(|n| {
208                                  gatherer.0.send_entry(n, InventoryEntryMessage::Done)
209                              });
210                          }
211                          _ => unimplemented!(),
212                      }
213                  }
214              }
215          })
216      }
217  }
218  
219  pub(crate) struct GathererInner {
220      /// All file/dir names are interned here
221      names: InternedNames<32>,
222  
223      /// The processing function
224      processor: Box<ProcessFn>,
225  
226      // message queues
227      /// The input PriorityQueue fed with directories to be processed
228      dirs_queue:      PriorityQueue<DirectoryGatherMessage, u64>,
229      /// The output channels where the results are send to.
230      #[allow(clippy::type_complexity)]
231      output_channels: Vec<(
232          Sender<InventoryEntryMessage>,
233          Arc<Receiver<InventoryEntryMessage>>,
234      )>,
235  
236      /// Sending an initial directory requires an stash.
237      // PLANNED: Also used when one wants to push multiple directories.
238      kickoff_stash: Mutex<GathererStash<'static>>,
239  
240      /// The maximum number of file descriptors this Gatherer may use.
241      fd_limit: usize,
242  
243      /// LruList for keeping dir handles alive.
244      dirs_lru:  LruList<Dir>,
245      lru_batch: usize,
246  
247      /// Number of DirectoryGather Messages batched together
248      message_batch: usize,
249  }
250  
251  impl GathererInner {
252      /// put a DirectoryGatherMessage on the input queue (traverse sub directories).
253      pub(crate) fn send_dir(
254          &self,
255          message: DirectoryGatherMessage,
256          prio: u64,
257          stash: Option<&GathererStash>,
258      ) {
259          if let Some(stash) = stash {
260              self.dirs_queue
261                  .send_batched(message, prio, self.message_batch, stash);
262          } else {
263              self.dirs_queue.send_nostash(message, prio);
264          }
265      }
266  
267      // resend a dir after 5ms pause to recover from filehandle depletion
268      fn resend_dir(&self, message: DirectoryGatherMessage, prio: u64, stash: &GathererStash) {
269          self.send_dir(message, prio, Some(stash));
270          thread::sleep(std::time::Duration::from_millis(5));
271      }
272  
273      /// Put a message on an output channel. The channels are used modulo the
274      /// output_channels.len(), thus can never overflow and a user may use a hash/larger number
275      /// than available.
276      pub(crate) fn send_entry(&self, channel: usize, message: InventoryEntryMessage) {
277          // Ignore result, the user may have dropped the receiver, but there is nothing we
278          // should do about it.
279          let _ = unsafe {
280              self.output_channels
281                  .get_unchecked(channel % self.output_channels.len())
282                  .0
283                  .send(message)
284          };
285      }
286  }
287  
288  /// Configures a Gatherer
289  pub struct GathererBuilder {
290      num_gather_threads:  usize,
291      num_output_channels: usize,
292      inventory_backlog:   usize,
293      fd_limit:            usize,
294      lru_batch:           usize,
295      message_batch:       usize,
296  }
297  
298  impl Default for GathererBuilder {
299      fn default() -> Self {
300          Self::new()
301      }
302  }
303  
304  impl GathererBuilder {
305      fn new() -> Self {
306          GathererBuilder {
307              num_gather_threads:  16,
308              num_output_channels: 1,
309              inventory_backlog:   0,
310              fd_limit:            512,
311              lru_batch:           4,
312              message_batch:       512,
313          }
314      }
315  
316      /// Starts the Gatherer. Takes the user defined processing function as argument. This
317      /// function is used to process every directory entry seen. It should be small and fast
318      /// selecting which sub directories to be traversed and which entries to pass to the
319      /// output channels. Any more work should be done on the output then.
320      pub fn start(&self, processor: Box<ProcessFn>) -> io::Result<Gatherer> {
321          let output_channels = (0..self.num_output_channels)
322              .map(|_| {
323                  let (sender, receiver) = bounded(
324                      // when inventory backlog is not set, set it automatically to 4k per thread
325                      if self.inventory_backlog == 0 {
326                          4096 * self.num_gather_threads
327                      } else {
328                          self.inventory_backlog
329                      },
330                  );
331                  (sender, Arc::new(receiver))
332              })
333              .collect();
334  
335          let gatherer = Gatherer(Arc::new(GathererInner {
336              names: InternedNames::new(),
337              dirs_queue: PriorityQueue::new(),
338              output_channels,
339              processor,
340              kickoff_stash: Mutex::new(GathererStash::new_without_priority_queue()),
341              fd_limit: self.fd_limit,
342              dirs_lru: LruList::new(),
343              lru_batch: self.lru_batch,
344              message_batch: self.message_batch,
345          }));
346  
347          (0..self.num_gather_threads).try_for_each(|n| -> io::Result<()> {
348              gatherer.spawn_gather_thread(n)?;
349              Ok(())
350          })?;
351  
352          debug!("created gatherer");
353          Ok(gatherer)
354      }
355  
356      /// Sets the number of threads which traverse the directories. These are IO-bound
357      /// operations and the more threads are used the better are the opportunities for the
358      /// kernel to optimize IO-Requests. Tests have shown that on fast SSD's and cached data
359      /// thread numbers in the hundrededs still show some benefits (at high resource
360      /// costs). For general operation and on slower HDD's / non cached data 8-64 threads
361      /// should be good enough. Default is 16 threads.
362      #[must_use = "GathererBuilder must be used, call .start()"]
363      pub fn with_gather_threads(mut self, num_threads: usize) -> Self {
364          assert!(num_threads > 0, "Must at least use one thread");
365          self.num_gather_threads = num_threads;
366          self
367      }
368  
369      /// Sets the number of threads which traverse the directories. These are IO-bound
370      /// operations and the more threads are used the better are the opportunities for the
371      /// kernel to optimize IO-Requests. Tests have shown that on fast SSD's and cached data
372      /// thread numbers in the hundrededs still show some benefits (at high resource
373      /// costs). For general operation and on slower HDD's / non cached data 8-64 threads
374      /// should be good enough. Default is 16 threads.
375      #[must_use = "GathererBuilder must be used, call .start()"]
376      pub fn with_output_channels(mut self, num_channels: usize) -> Self {
377          assert!(num_channels > 0, "Must at least use one channel");
378          self.num_output_channels = num_channels;
379          self
380      }
381  
382      /// Sets the amount of messages the output channels can hold. For cached and readahead data,
383      /// the kernel can send bursts entries to the gatherer threads at very high speeds, since
384      /// we don't want to stall the gathering, the is adds some output buffering. Usually
385      /// values from 64k to 512k should be fine here. When zero (the default) 4k per thread are
386      /// used.
387      #[must_use = "GathererBuilder must be used, call .start()"]
388      pub fn with_inventory_backlog(mut self, backlog_size: usize) -> Self {
389          self.inventory_backlog = backlog_size;
390          self
391      }
392  
393      /// Sets the maximum number of directory handles the Gatherer may use. The Gatherer has a
394      /// build-in strategy handle fd exhaustion when this happens earlier, but keep in mind
395      /// that then there are no fd's for the other parts of the application
396      /// available. Constraining the number of file handles too much will make its slow and
397      /// eventually deadlock. Limit them to no less than num_threads+100 handles! The limits
398      /// are not enforced since the actual amount needed depends a lot factors. Defaults to 512
399      /// fd's which should be plenty for most cases.
400      #[must_use = "GathererBuilder must be used, call .start()"]
401      pub fn with_fd_limit(mut self, fd_limit: usize) -> Self {
402          self.fd_limit = fd_limit;
403          self
404      }
405  
406      /// Sets size of message batched together. This reduces contention on the priority
407      /// lock. While it won't improve performance it can reduce the CPU load (often
408      /// insignificantly). Defaults to 512, shouldn't need adjustments except for benchmarking.
409      #[must_use = "GathererBuilder must be used, call .start()"]
410      pub fn with_message_batch(mut self, message_batch: usize) -> Self {
411          self.message_batch = message_batch;
412          self
413      }
414  
415      /// Sets size of dir lru expires batched together.
416      #[must_use = "GathererBuilder must be used, call .start()"]
417      pub fn with_lru_batch(mut self, lru_batch: usize) -> Self {
418          self.lru_batch = lru_batch;
419          self
420      }
421  }
422  
423  #[cfg(test)]
424  mod test {
425      use std::io::Write;
426      use std::os::unix::ffi::OsStrExt;
427  
428      use super::*;
429  
430      // tests
431      #[test]
432      fn smoke() {
433          crate::test::init_env_logging();
434          let _ = Gatherer::build().with_gather_threads(1).start(Box::new(
435              |_gatherer: GathererHandle, _entry: ProcessMessage| {},
436          ));
437      }
438  
439      #[test]
440      fn dir_smoke() {
441          crate::test::init_env_logging();
442  
443          let gatherer = Gatherer::build()
444              .with_gather_threads(4)
445              .start(Box::new(
446                  |gatherer: GathererHandle, entry: ProcessMessage| match entry {
447                      ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() {
448                          Some(openat::SimpleType::Dir) => {
449                              trace!("{:?}/{:?}", parent_path.to_pathbuf(), entry.file_name());
450                              gatherer.traverse_dir(&entry, parent_path.clone(), false);
451                          }
452                          _ => {
453                              trace!("{:?}/{:?}", parent_path.to_pathbuf(), entry.file_name());
454                              gatherer.output_entry(0, &entry, parent_path, false);
455                          }
456                      },
457                      ProcessMessage::Result(Err(err), parent_path) => {
458                          gatherer.output_error(0, Box::new(err), parent_path);
459                      }
460                      _ => {}
461                  },
462              ))
463              .unwrap();
464  
465          gatherer.load_dir_recursive("src", false);
466  
467          gatherer
468              .channel(0)
469              .iter()
470              .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done))
471              .for_each(|msg| {
472                  if let Some(path) = msg.path() {
473                      trace!("{:?}", path.to_pathbuf());
474                  } else if msg.is_error() {
475                      error!("{:?}", msg)
476                  }
477              });
478      }
479  
480      #[test]
481      #[ignore]
482      fn load_dir() {
483          crate::test::init_env_logging();
484  
485          let gatherer = Gatherer::build()
486              .with_gather_threads(64)
487              .with_fd_limit(768)
488              .start(Box::new(
489                  |gatherer: GathererHandle, entry: ProcessMessage| match entry {
490                      ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() {
491                          Some(openat::SimpleType::Dir) => {
492                              gatherer.traverse_dir(&entry, parent_path.clone(), false);
493                              gatherer.output_entry(0, &entry, parent_path.clone(), false);
494                          }
495                          _ => {
496                              gatherer.output_entry(0, &entry, parent_path, false);
497                          }
498                      },
499                      ProcessMessage::Result(Err(err), parent_path) => {
500                          gatherer.output_error(0, Box::new(err), parent_path);
501                      }
502                      _ => {}
503                  },
504              ))
505              .unwrap();
506  
507          gatherer.load_dir_recursive(".", false);
508  
509          let mut stdout = std::io::stdout();
510  
511          gatherer
512              .channel(0)
513              .iter()
514              .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done))
515              .for_each(|msg| {
516                  if let Some(path) = msg.path() {
517                      let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes());
518                      let _ = stdout.write_all(b"\n");
519                  } else if msg.is_error() {
520                      error!("{:?}", msg)
521                  }
522              });
523      }
524  
525      #[test]
526      fn entry_messages() {
527          crate::test::init_env_logging();
528  
529          let gatherer = Gatherer::build()
530              .start(Box::new(
531                  |gatherer: GathererHandle, entry: ProcessMessage| match entry {
532                      ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() {
533                          Some(openat::SimpleType::Dir) => {
534                              gatherer.traverse_dir(&entry, parent_path, false);
535                          }
536                          _ => {
537                              gatherer.output_entry(0, &entry, parent_path, false);
538                          }
539                      },
540                      ProcessMessage::Result(Err(err), parent_path) => {
541                          gatherer.output_error(0, Box::new(err), parent_path);
542                      }
543                      _ => {}
544                  },
545              ))
546              .unwrap();
547  
548          gatherer.load_dir_recursive("src", false);
549  
550          let mut stdout = std::io::stdout();
551  
552          gatherer
553              .channel(0)
554              .iter()
555              .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done))
556              .for_each(|msg| {
557                  if let Some(path) = msg.path() {
558                      let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes());
559                      let _ = stdout.write_all(b"\n");
560                  }
561              });
562      }
563  
564      #[test]
565      fn metadata_messages() {
566          crate::test::init_env_logging();
567  
568          let gatherer = Gatherer::build()
569              .start(Box::new(
570                  |gatherer: GathererHandle, entry: ProcessMessage| match entry {
571                      ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() {
572                          Some(openat::SimpleType::Dir) => {
573                              gatherer.traverse_dir(&entry, parent_path.clone(), false);
574                          }
575                          _ => match parent_path.dir().unwrap().metadata(entry.file_name()) {
576                              Ok(metadata) => {
577                                  gatherer.output_metadata(0, &entry, parent_path, metadata, false);
578                              }
579                              Err(err) => {
580                                  gatherer.output_error(0, Box::new(err), parent_path);
581                              }
582                          },
583                      },
584                      ProcessMessage::Result(Err(err), parent_path) => {
585                          gatherer.output_error(0, Box::new(err), parent_path);
586                      }
587                      _ => {}
588                  },
589              ))
590              .unwrap();
591          gatherer.load_dir_recursive("src", false);
592  
593          let mut stdout = std::io::stdout();
594  
595          gatherer
596              .channel(0)
597              .iter()
598              .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done))
599              .for_each(|msg| {
600                  if let Some(path) = msg.path() {
601                      let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes());
602                      let _ = stdout.write_all(b"\n");
603                  }
604              });
605      }
606  
607      #[test]
608      #[ignore]
609      fn done_notifier() {
610          crate::test::init_env_logging();
611  
612          let gatherer = Gatherer::build()
613              .start(Box::new(
614                  |gatherer: GathererHandle, entry: ProcessMessage| match entry {
615                      ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() {
616                          Some(openat::SimpleType::Dir) => {
617                              gatherer.traverse_dir(&entry, parent_path, true);
618                          }
619                          _ => {
620                              gatherer.output_entry(0, &entry, parent_path, true);
621                          }
622                      },
623                      ProcessMessage::Result(Err(err), parent_path) => {
624                          gatherer.output_error(0, Box::new(err), parent_path);
625                      }
626                      ProcessMessage::ObjectDone(path) => {
627                          trace!("got notified: {:?}", path);
628                          gatherer.output_object_done(0, path);
629                      }
630                      _ => {}
631                  },
632              ))
633              .unwrap();
634  
635          gatherer.load_dir_recursive("src", true);
636  
637          gatherer.channel(0).iter().for_each(|msg| {
638              trace!("output: {:?}", msg);
639          });
640      }
641  }