gatherer.rs
1 //! The Gatherer manages threads which walking directories. For each element found a custom processor function is called which may 2 //! add directories back to the list of directories to process and found entries to an output queue. 3 use std::io; 4 use std::sync::{Arc, Weak}; 5 use std::thread; 6 use std::ops::DerefMut; 7 use std::path::Path; 8 use std::ffi::OsStr; 9 10 use mpmcpq::*; 11 use crossbeam_channel::{bounded, Receiver, Sender}; 12 use parking_lot::Mutex; 13 14 #[allow(unused_imports)] 15 use crate::{debug, error, info, trace, warn}; 16 use crate::*; 17 18 /// The type of the user supplied closure/function to process entries. Takes a GathererHandle 19 /// which defines the API for pushing things back on the Gatherers queues and a ProcessMessage 20 /// which wraps the things to be handled. 21 // PLANNED: Dir will become part of ObjectPath 22 pub type ProcessFn = dyn Fn(GathererHandle, ProcessMessage) + Send + Sync; 23 24 /// The input to the ProcessFn which dispatches actions on the queues via the GathererHandle. 25 pub enum ProcessMessage { 26 /// Either an openat::Entry for iterated directory entries or an error. 27 Result(io::Result<openat::Entry>, ObjectPath), 28 /// A notification that a ObjectPath processing is finished. 29 ObjectDone(ObjectPath), 30 /// The ProcessFn is called with this after all entries of an directory are processed (but 31 /// not its subdirectories). This is used to notify that no more entries of the saied 32 /// directory are to be expected. Note that ObjectDone messages may still appear. 33 EndOfDirectory(ObjectPath), 34 } 35 36 pub(crate) type GathererStash<'a> = Stash<'a, DirectoryGatherMessage, u64>; 37 38 /// The Gatherer manages the queues and threads traversing directories. 39 /// There should be only one 'Gatherer' around as it is used to merge hardlinks 40 /// and needs to have a global picture of all indexed files. 41 pub struct Gatherer(Arc<GathererInner>); 42 43 impl Gatherer { 44 /// Creates a gatherer builder used to configure the gatherer. Uses conservative defaults, 45 /// 16 threads and 64k backlog. 46 #[must_use = "configure the Gatherer and finally call .start()"] 47 pub fn build() -> GathererBuilder { 48 GathererBuilder::new() 49 } 50 51 /// Try to upgrade a Weak reference into a Gatherer. 52 pub(crate) fn upgrade(weak: &Weak<GathererInner>) -> Option<Gatherer> { 53 weak.upgrade().map(Gatherer) 54 } 55 56 /// Get a Weak reference from a Gatherer. 57 pub(crate) fn downgrade(&self) -> Weak<GathererInner> { 58 Arc::downgrade(&self.0) 59 } 60 61 /// Access to the inner data. 62 pub(crate) fn inner(&self) -> &GathererInner { 63 &*self.0 64 } 65 66 /// Returns the an Arc of the receiver side of output channel 'n'. 67 pub fn channel(&self, n: usize) -> Arc<Receiver<InventoryEntryMessage>> { 68 self.0.output_channels[n].1.clone() 69 } 70 71 /// Returns the number of output channels. 72 pub fn num_channels(&self) -> usize { 73 self.0.output_channels.len() as usize 74 } 75 76 /// Returns a Vec with all receiving sides of the output channels. 77 pub fn channels_as_vec(&self) -> Vec<Arc<Receiver<InventoryEntryMessage>>> { 78 self.0 79 .output_channels 80 .iter() 81 .map(|(_, r)| r.clone()) 82 .collect() 83 } 84 85 /// Adds a name to the interned names, returns that. 86 pub fn name_interning(&self, name: &OsStr) -> InternedName { 87 self.0.names.interning(name) 88 } 89 90 /// Adds a directory to the processing queue of the inventory. This is the main function 91 /// to initiate a directory traversal. 92 // pub fn load_dir_recursive(&self, path: ObjectPath) { 93 pub fn load_dir_recursive<P: AsRef<Path>>(&self, path: P, watch: bool) { 94 let path = ObjectPath::new(path, self); 95 if let Ok(dir) = path.dir() { 96 self.0.dirs_lru.preserve(dir); 97 }; 98 99 path.watch(watch); 100 101 let mut stash = self.0.kickoff_stash.lock(); 102 self.0.send_dir( 103 DirectoryGatherMessage::new_dir(path), 104 u64::MAX, /* initial message priority instead depth/inode calculation, added 105 * directories are processed at the lowest priority */ 106 Some(stash.deref_mut()), 107 ); 108 self.0.dirs_queue.sync(stash.deref_mut()); 109 } 110 111 // TODO: fn shutdown, there is currently no way to free a Gatherer as the threads keep it alive 112 113 /// called when a watched ObjectPath's strong_count becomes dropped equal or below 2. 114 pub(crate) fn notify_path_dropped(&self, path: ObjectPath) { 115 (self.0.processor)( 116 GathererHandle::new( 117 // infallible since caller checked already 118 &path.gatherer().unwrap(), 119 None, 120 ), 121 ProcessMessage::ObjectDone(path), 122 ); 123 } 124 125 /// Spawns a single gatherer thread 126 fn spawn_gather_thread(&self, n: usize) -> io::Result<thread::JoinHandle<()>> { 127 thread::Builder::new().name(format!("gather/{}", n)).spawn({ 128 let gatherer = Gatherer(self.0.clone()); 129 move || { 130 debug!("thread started"); 131 let stash: GathererStash = Stash::new(&gatherer.0.dirs_queue); 132 loop { 133 use DirectoryGatherMessage::*; 134 135 // TODO: messages for dir enter/leave on the ouput queue 136 match gatherer.0.dirs_queue.recv_guard().message() { 137 mpmcpq::Message::Msg(TraverseDirectory { path }, prio) => { 138 gatherer.0.dirs_lru.expire_until(gatherer.0.lru_batch, &|| { 139 used_handles() < gatherer.0.fd_limit 140 }); 141 142 path.dir() 143 .map(|dir| { 144 trace!( 145 "opened fd {:?}: for {:?}: depth {}", 146 dir, 147 path.to_pathbuf(), 148 path.depth() 149 ); 150 151 gatherer.0.dirs_lru.preserve(dir.clone()); 152 153 // Clone the path to increment the refcount to trigger notification reliably 154 let path = path.clone(); 155 dir.list_self() 156 .map(|dir_iter| { 157 dir_iter.for_each(|entry| { 158 (gatherer.0.processor)( 159 GathererHandle::new(&gatherer, Some(&stash)), 160 ProcessMessage::Result(entry, path.clone()), 161 ); 162 }); 163 164 (gatherer.0.processor)( 165 GathererHandle::new(&gatherer, Some(&stash)), 166 ProcessMessage::EndOfDirectory(path.clone()), 167 ); 168 169 gatherer.0.dirs_queue.sync(&stash); 170 crate::dirhandle::dec_handles(); 171 }) 172 .map_err(|err| { 173 if err.raw_os_error() == Some(libc::EMFILE) { 174 gatherer.0.resend_dir( 175 TraverseDirectory { path: path.clone() }, 176 *prio, 177 &stash, 178 ); 179 } else { 180 (gatherer.0.processor)( 181 GathererHandle::new(&gatherer, Some(&stash)), 182 ProcessMessage::Result(Err(err), path.clone()), 183 ); 184 } 185 }) 186 }) 187 .map_err(|err| { 188 if err.raw_os_error() == Some(libc::EMFILE) { 189 warn!("filehandles exhausted"); 190 gatherer.0.dirs_lru.expire_all(); 191 gatherer.0.resend_dir( 192 TraverseDirectory { path: path.clone() }, 193 *prio, 194 &stash, 195 ); 196 } else { 197 (gatherer.0.processor)( 198 GathererHandle::new(&gatherer, Some(&stash)), 199 ProcessMessage::Result(Err(err), path.clone()), 200 ); 201 } 202 }) 203 .ok(); 204 } 205 mpmcpq::Message::Drained => { 206 trace!("drained!!!"); 207 (0..gatherer.0.output_channels.len()).for_each(|n| { 208 gatherer.0.send_entry(n, InventoryEntryMessage::Done) 209 }); 210 } 211 _ => unimplemented!(), 212 } 213 } 214 } 215 }) 216 } 217 } 218 219 pub(crate) struct GathererInner { 220 /// All file/dir names are interned here 221 names: InternedNames<32>, 222 223 /// The processing function 224 processor: Box<ProcessFn>, 225 226 // message queues 227 /// The input PriorityQueue fed with directories to be processed 228 dirs_queue: PriorityQueue<DirectoryGatherMessage, u64>, 229 /// The output channels where the results are send to. 230 #[allow(clippy::type_complexity)] 231 output_channels: Vec<( 232 Sender<InventoryEntryMessage>, 233 Arc<Receiver<InventoryEntryMessage>>, 234 )>, 235 236 /// Sending an initial directory requires an stash. 237 // PLANNED: Also used when one wants to push multiple directories. 238 kickoff_stash: Mutex<GathererStash<'static>>, 239 240 /// The maximum number of file descriptors this Gatherer may use. 241 fd_limit: usize, 242 243 /// LruList for keeping dir handles alive. 244 dirs_lru: LruList<Dir>, 245 lru_batch: usize, 246 247 /// Number of DirectoryGather Messages batched together 248 message_batch: usize, 249 } 250 251 impl GathererInner { 252 /// put a DirectoryGatherMessage on the input queue (traverse sub directories). 253 pub(crate) fn send_dir( 254 &self, 255 message: DirectoryGatherMessage, 256 prio: u64, 257 stash: Option<&GathererStash>, 258 ) { 259 if let Some(stash) = stash { 260 self.dirs_queue 261 .send_batched(message, prio, self.message_batch, stash); 262 } else { 263 self.dirs_queue.send_nostash(message, prio); 264 } 265 } 266 267 // resend a dir after 5ms pause to recover from filehandle depletion 268 fn resend_dir(&self, message: DirectoryGatherMessage, prio: u64, stash: &GathererStash) { 269 self.send_dir(message, prio, Some(stash)); 270 thread::sleep(std::time::Duration::from_millis(5)); 271 } 272 273 /// Put a message on an output channel. The channels are used modulo the 274 /// output_channels.len(), thus can never overflow and a user may use a hash/larger number 275 /// than available. 276 pub(crate) fn send_entry(&self, channel: usize, message: InventoryEntryMessage) { 277 // Ignore result, the user may have dropped the receiver, but there is nothing we 278 // should do about it. 279 let _ = unsafe { 280 self.output_channels 281 .get_unchecked(channel % self.output_channels.len()) 282 .0 283 .send(message) 284 }; 285 } 286 } 287 288 /// Configures a Gatherer 289 pub struct GathererBuilder { 290 num_gather_threads: usize, 291 num_output_channels: usize, 292 inventory_backlog: usize, 293 fd_limit: usize, 294 lru_batch: usize, 295 message_batch: usize, 296 } 297 298 impl Default for GathererBuilder { 299 fn default() -> Self { 300 Self::new() 301 } 302 } 303 304 impl GathererBuilder { 305 fn new() -> Self { 306 GathererBuilder { 307 num_gather_threads: 16, 308 num_output_channels: 1, 309 inventory_backlog: 0, 310 fd_limit: 512, 311 lru_batch: 4, 312 message_batch: 512, 313 } 314 } 315 316 /// Starts the Gatherer. Takes the user defined processing function as argument. This 317 /// function is used to process every directory entry seen. It should be small and fast 318 /// selecting which sub directories to be traversed and which entries to pass to the 319 /// output channels. Any more work should be done on the output then. 320 pub fn start(&self, processor: Box<ProcessFn>) -> io::Result<Gatherer> { 321 let output_channels = (0..self.num_output_channels) 322 .map(|_| { 323 let (sender, receiver) = bounded( 324 // when inventory backlog is not set, set it automatically to 4k per thread 325 if self.inventory_backlog == 0 { 326 4096 * self.num_gather_threads 327 } else { 328 self.inventory_backlog 329 }, 330 ); 331 (sender, Arc::new(receiver)) 332 }) 333 .collect(); 334 335 let gatherer = Gatherer(Arc::new(GathererInner { 336 names: InternedNames::new(), 337 dirs_queue: PriorityQueue::new(), 338 output_channels, 339 processor, 340 kickoff_stash: Mutex::new(GathererStash::new_without_priority_queue()), 341 fd_limit: self.fd_limit, 342 dirs_lru: LruList::new(), 343 lru_batch: self.lru_batch, 344 message_batch: self.message_batch, 345 })); 346 347 (0..self.num_gather_threads).try_for_each(|n| -> io::Result<()> { 348 gatherer.spawn_gather_thread(n)?; 349 Ok(()) 350 })?; 351 352 debug!("created gatherer"); 353 Ok(gatherer) 354 } 355 356 /// Sets the number of threads which traverse the directories. These are IO-bound 357 /// operations and the more threads are used the better are the opportunities for the 358 /// kernel to optimize IO-Requests. Tests have shown that on fast SSD's and cached data 359 /// thread numbers in the hundrededs still show some benefits (at high resource 360 /// costs). For general operation and on slower HDD's / non cached data 8-64 threads 361 /// should be good enough. Default is 16 threads. 362 #[must_use = "GathererBuilder must be used, call .start()"] 363 pub fn with_gather_threads(mut self, num_threads: usize) -> Self { 364 assert!(num_threads > 0, "Must at least use one thread"); 365 self.num_gather_threads = num_threads; 366 self 367 } 368 369 /// Sets the number of threads which traverse the directories. These are IO-bound 370 /// operations and the more threads are used the better are the opportunities for the 371 /// kernel to optimize IO-Requests. Tests have shown that on fast SSD's and cached data 372 /// thread numbers in the hundrededs still show some benefits (at high resource 373 /// costs). For general operation and on slower HDD's / non cached data 8-64 threads 374 /// should be good enough. Default is 16 threads. 375 #[must_use = "GathererBuilder must be used, call .start()"] 376 pub fn with_output_channels(mut self, num_channels: usize) -> Self { 377 assert!(num_channels > 0, "Must at least use one channel"); 378 self.num_output_channels = num_channels; 379 self 380 } 381 382 /// Sets the amount of messages the output channels can hold. For cached and readahead data, 383 /// the kernel can send bursts entries to the gatherer threads at very high speeds, since 384 /// we don't want to stall the gathering, the is adds some output buffering. Usually 385 /// values from 64k to 512k should be fine here. When zero (the default) 4k per thread are 386 /// used. 387 #[must_use = "GathererBuilder must be used, call .start()"] 388 pub fn with_inventory_backlog(mut self, backlog_size: usize) -> Self { 389 self.inventory_backlog = backlog_size; 390 self 391 } 392 393 /// Sets the maximum number of directory handles the Gatherer may use. The Gatherer has a 394 /// build-in strategy handle fd exhaustion when this happens earlier, but keep in mind 395 /// that then there are no fd's for the other parts of the application 396 /// available. Constraining the number of file handles too much will make its slow and 397 /// eventually deadlock. Limit them to no less than num_threads+100 handles! The limits 398 /// are not enforced since the actual amount needed depends a lot factors. Defaults to 512 399 /// fd's which should be plenty for most cases. 400 #[must_use = "GathererBuilder must be used, call .start()"] 401 pub fn with_fd_limit(mut self, fd_limit: usize) -> Self { 402 self.fd_limit = fd_limit; 403 self 404 } 405 406 /// Sets size of message batched together. This reduces contention on the priority 407 /// lock. While it won't improve performance it can reduce the CPU load (often 408 /// insignificantly). Defaults to 512, shouldn't need adjustments except for benchmarking. 409 #[must_use = "GathererBuilder must be used, call .start()"] 410 pub fn with_message_batch(mut self, message_batch: usize) -> Self { 411 self.message_batch = message_batch; 412 self 413 } 414 415 /// Sets size of dir lru expires batched together. 416 #[must_use = "GathererBuilder must be used, call .start()"] 417 pub fn with_lru_batch(mut self, lru_batch: usize) -> Self { 418 self.lru_batch = lru_batch; 419 self 420 } 421 } 422 423 #[cfg(test)] 424 mod test { 425 use std::io::Write; 426 use std::os::unix::ffi::OsStrExt; 427 428 use super::*; 429 430 // tests 431 #[test] 432 fn smoke() { 433 crate::test::init_env_logging(); 434 let _ = Gatherer::build().with_gather_threads(1).start(Box::new( 435 |_gatherer: GathererHandle, _entry: ProcessMessage| {}, 436 )); 437 } 438 439 #[test] 440 fn dir_smoke() { 441 crate::test::init_env_logging(); 442 443 let gatherer = Gatherer::build() 444 .with_gather_threads(4) 445 .start(Box::new( 446 |gatherer: GathererHandle, entry: ProcessMessage| match entry { 447 ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() { 448 Some(openat::SimpleType::Dir) => { 449 trace!("{:?}/{:?}", parent_path.to_pathbuf(), entry.file_name()); 450 gatherer.traverse_dir(&entry, parent_path.clone(), false); 451 } 452 _ => { 453 trace!("{:?}/{:?}", parent_path.to_pathbuf(), entry.file_name()); 454 gatherer.output_entry(0, &entry, parent_path, false); 455 } 456 }, 457 ProcessMessage::Result(Err(err), parent_path) => { 458 gatherer.output_error(0, Box::new(err), parent_path); 459 } 460 _ => {} 461 }, 462 )) 463 .unwrap(); 464 465 gatherer.load_dir_recursive("src", false); 466 467 gatherer 468 .channel(0) 469 .iter() 470 .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done)) 471 .for_each(|msg| { 472 if let Some(path) = msg.path() { 473 trace!("{:?}", path.to_pathbuf()); 474 } else if msg.is_error() { 475 error!("{:?}", msg) 476 } 477 }); 478 } 479 480 #[test] 481 #[ignore] 482 fn load_dir() { 483 crate::test::init_env_logging(); 484 485 let gatherer = Gatherer::build() 486 .with_gather_threads(64) 487 .with_fd_limit(768) 488 .start(Box::new( 489 |gatherer: GathererHandle, entry: ProcessMessage| match entry { 490 ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() { 491 Some(openat::SimpleType::Dir) => { 492 gatherer.traverse_dir(&entry, parent_path.clone(), false); 493 gatherer.output_entry(0, &entry, parent_path.clone(), false); 494 } 495 _ => { 496 gatherer.output_entry(0, &entry, parent_path, false); 497 } 498 }, 499 ProcessMessage::Result(Err(err), parent_path) => { 500 gatherer.output_error(0, Box::new(err), parent_path); 501 } 502 _ => {} 503 }, 504 )) 505 .unwrap(); 506 507 gatherer.load_dir_recursive(".", false); 508 509 let mut stdout = std::io::stdout(); 510 511 gatherer 512 .channel(0) 513 .iter() 514 .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done)) 515 .for_each(|msg| { 516 if let Some(path) = msg.path() { 517 let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes()); 518 let _ = stdout.write_all(b"\n"); 519 } else if msg.is_error() { 520 error!("{:?}", msg) 521 } 522 }); 523 } 524 525 #[test] 526 fn entry_messages() { 527 crate::test::init_env_logging(); 528 529 let gatherer = Gatherer::build() 530 .start(Box::new( 531 |gatherer: GathererHandle, entry: ProcessMessage| match entry { 532 ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() { 533 Some(openat::SimpleType::Dir) => { 534 gatherer.traverse_dir(&entry, parent_path, false); 535 } 536 _ => { 537 gatherer.output_entry(0, &entry, parent_path, false); 538 } 539 }, 540 ProcessMessage::Result(Err(err), parent_path) => { 541 gatherer.output_error(0, Box::new(err), parent_path); 542 } 543 _ => {} 544 }, 545 )) 546 .unwrap(); 547 548 gatherer.load_dir_recursive("src", false); 549 550 let mut stdout = std::io::stdout(); 551 552 gatherer 553 .channel(0) 554 .iter() 555 .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done)) 556 .for_each(|msg| { 557 if let Some(path) = msg.path() { 558 let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes()); 559 let _ = stdout.write_all(b"\n"); 560 } 561 }); 562 } 563 564 #[test] 565 fn metadata_messages() { 566 crate::test::init_env_logging(); 567 568 let gatherer = Gatherer::build() 569 .start(Box::new( 570 |gatherer: GathererHandle, entry: ProcessMessage| match entry { 571 ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() { 572 Some(openat::SimpleType::Dir) => { 573 gatherer.traverse_dir(&entry, parent_path.clone(), false); 574 } 575 _ => match parent_path.dir().unwrap().metadata(entry.file_name()) { 576 Ok(metadata) => { 577 gatherer.output_metadata(0, &entry, parent_path, metadata, false); 578 } 579 Err(err) => { 580 gatherer.output_error(0, Box::new(err), parent_path); 581 } 582 }, 583 }, 584 ProcessMessage::Result(Err(err), parent_path) => { 585 gatherer.output_error(0, Box::new(err), parent_path); 586 } 587 _ => {} 588 }, 589 )) 590 .unwrap(); 591 gatherer.load_dir_recursive("src", false); 592 593 let mut stdout = std::io::stdout(); 594 595 gatherer 596 .channel(0) 597 .iter() 598 .take_while(|msg| !matches!(msg, InventoryEntryMessage::Done)) 599 .for_each(|msg| { 600 if let Some(path) = msg.path() { 601 let _ = stdout.write_all(path.to_pathbuf().as_os_str().as_bytes()); 602 let _ = stdout.write_all(b"\n"); 603 } 604 }); 605 } 606 607 #[test] 608 #[ignore] 609 fn done_notifier() { 610 crate::test::init_env_logging(); 611 612 let gatherer = Gatherer::build() 613 .start(Box::new( 614 |gatherer: GathererHandle, entry: ProcessMessage| match entry { 615 ProcessMessage::Result(Ok(entry), parent_path) => match entry.simple_type() { 616 Some(openat::SimpleType::Dir) => { 617 gatherer.traverse_dir(&entry, parent_path, true); 618 } 619 _ => { 620 gatherer.output_entry(0, &entry, parent_path, true); 621 } 622 }, 623 ProcessMessage::Result(Err(err), parent_path) => { 624 gatherer.output_error(0, Box::new(err), parent_path); 625 } 626 ProcessMessage::ObjectDone(path) => { 627 trace!("got notified: {:?}", path); 628 gatherer.output_object_done(0, path); 629 } 630 _ => {} 631 }, 632 )) 633 .unwrap(); 634 635 gatherer.load_dir_recursive("src", true); 636 637 gatherer.channel(0).iter().for_each(|msg| { 638 trace!("output: {:?}", msg); 639 }); 640 } 641 }