From f8e70294d5741391521f638b80ba786888896b94 Mon Sep 17 00:00:00 2001 From: Ed Page Date: Sat, 26 Oct 2019 19:02:10 -0600 Subject: ignore: allow post-processing at end-of-thread On top of the parallel-walk's closures, this provides a Visitor API. This clarifies the role of the two different closures in the `run` API and allows implementing of `Drop` for post-processing once traversal is finished. The closure API is maintained not just for compatibility but also convinience for simple cases. Fixes #469, Closes #1430 --- ignore/src/lib.rs | 5 ++- ignore/src/walk.rs | 103 ++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 91 insertions(+), 17 deletions(-) diff --git a/ignore/src/lib.rs b/ignore/src/lib.rs index c78c5e00..6cbf4af2 100644 --- a/ignore/src/lib.rs +++ b/ignore/src/lib.rs @@ -65,7 +65,10 @@ use std::fmt; use std::io; use std::path::{Path, PathBuf}; -pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState}; +pub use walk::{ + DirEntry, Walk, WalkBuilder, WalkParallel, WalkState, + ParallelVisitorBuilder, ParallelVisitor, +}; mod dir; pub mod gitignore; diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index 658c2dbb..365d049d 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -1068,10 +1068,60 @@ impl WalkState { } } +/// A builder for constructing a visitor when using +/// [`WalkParallel::visit`](struct.WalkParallel.html#method.visit). The builder +/// will be called for each thread started by `WalkParallel`. The visitor +/// returned from each builder is then called for every directory entry. +pub trait ParallelVisitorBuilder<'s> { + /// Create per-thread `ParallelVisitor`s for `WalkParallel`. + fn build(&mut self) -> Box; +} + +impl<'a, 's, P: ParallelVisitorBuilder<'s>> + ParallelVisitorBuilder<'s> for &'a mut P +{ + fn build(&mut self) -> Box { + (**self).build() + } +} + +/// Receives files and directories for the current thread. +/// +/// Setup for the traversal can be implemented as part of +/// [`ParallelVisitorBuilder::build`](trait.ParallelVisitorBuilder.html#tymethod.build). +/// Teardown when traversal finishes can be implemented by implementing the +/// `Drop` trait on your traversal type. +pub trait ParallelVisitor: Send { + /// Receives files and directories for the current thread. This is called + /// once for every directory entry visited by traversal. + fn visit(&mut self, entry: Result) -> WalkState; +} + +struct FnBuilder { + builder: F, +} + +impl<'s, F: FnMut() -> FnVisitor<'s>> ParallelVisitorBuilder<'s> for FnBuilder { + fn build(&mut self) -> Box { + let visitor = (self.builder)(); + Box::new(FnVisitorImp { visitor }) + } +} + type FnVisitor<'s> = Box< dyn FnMut(Result) -> WalkState + Send + 's >; +struct FnVisitorImp<'s> { + visitor: FnVisitor<'s>, +} + +impl<'s> ParallelVisitor for FnVisitorImp<'s> { + fn visit(&mut self, entry: Result) -> WalkState { + (self.visitor)(entry) + } +} + /// WalkParallel is a parallel recursive directory iterator over files paths /// in one or more directories. /// @@ -1095,10 +1145,31 @@ impl WalkParallel { /// Execute the parallel recursive directory iterator. `mkf` is called /// for each thread used for iteration. The function produced by `mkf` /// is then in turn called for each visited file path. - pub fn run<'s, F>(mut self, mut mkf: F) + pub fn run<'s, F>(self, mkf: F) where F: FnMut() -> FnVisitor<'s>, { + self.visit(&mut FnBuilder { builder: mkf }) + } + + /// Execute the parallel recursive directory iterator using a custom + /// visitor. + /// + /// The builder given is used to construct a visitor for every thread + /// used by this traversal. The visitor returned from each builder is then + /// called for every directory entry seen by that thread. + /// + /// Typically, creating a custom visitor is useful if you need to perform + /// some kind of cleanup once traversal is finished. This can be achieved + /// by implementing `Drop` for your builder (or for your visitor, if you + /// want to execute cleanup for every thread that is launched). + /// + /// For example, each visitor might build up a data structure of results + /// corresponding to the directory entries seen for each thread. Since each + /// visitor runs on only one thread, this build-up can be done without + /// synchronization. Then, once traversal is complete, all of the results + /// can be merged together into a single data structure. + pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) { let threads = self.threads(); // TODO: Figure out how to use a bounded channel here. With an // unbounded channel, the workers can run away and fill up memory @@ -1110,7 +1181,7 @@ impl WalkParallel { // to do this. let (tx, rx) = channel::unbounded(); { - let mut f = mkf(); + let mut visitor = builder.build(); let mut any_work = false; let mut paths = Vec::new().into_iter(); std::mem::swap(&mut paths, &mut self.paths); @@ -1128,7 +1199,7 @@ impl WalkParallel { Ok(root_device) => Some(root_device), Err(err) => { let err = Error::Io(err).with_path(path); - if f(Err(err)).is_quit() { + if visitor.visit(Err(err)).is_quit() { return; } continue; @@ -1140,7 +1211,7 @@ impl WalkParallel { (DirEntry::new_raw(dent, None), root_device) } Err(err) => { - if f(Err(err)).is_quit() { + if visitor.visit(Err(err)).is_quit() { return; } continue; @@ -1168,7 +1239,7 @@ impl WalkParallel { let mut handles = vec![]; for _ in 0..threads { let worker = Worker { - f: mkf(), + visitor: builder.build(), tx: tx.clone(), rx: rx.clone(), quit_now: quit_now.clone(), @@ -1282,7 +1353,7 @@ impl Work { /// Note that a worker is *both* a producer and a consumer. struct Worker<'s> { /// The caller's callback. - f: FnVisitor<'s>, + visitor: Box, /// The push side of our mpmc queue. tx: channel::Sender, /// The receive side of our mpmc queue. @@ -1326,14 +1397,14 @@ impl<'s> Worker<'s> { // If the work is not a directory, then we can just execute the // caller's callback immediately and move on. if work.is_symlink() || !work.is_dir() { - if (self.f)(Ok(work.dent)).is_quit() { + if self.visitor.visit(Ok(work.dent)).is_quit() { self.quit_now(); return; } continue; } if let Some(err) = work.add_parents() { - if (self.f)(Err(err)).is_quit() { + if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); return; } @@ -1341,7 +1412,7 @@ impl<'s> Worker<'s> { let readdir = match work.read_dir() { Ok(readdir) => readdir, Err(err) => { - if (self.f)(Err(err)).is_quit() { + if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); return; } @@ -1353,7 +1424,7 @@ impl<'s> Worker<'s> { Ok(true) => true, Ok(false) => false, Err(err) => { - if (self.f)(Err(err)).is_quit() { + if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); return; } @@ -1365,7 +1436,7 @@ impl<'s> Worker<'s> { }; let depth = work.dent.depth(); - match (self.f)(Ok(work.dent)) { + match self.visitor.visit(Ok(work.dent)) { WalkState::Continue => {} WalkState::Skip => continue, WalkState::Quit => { @@ -1411,13 +1482,13 @@ impl<'s> Worker<'s> { let fs_dent = match result { Ok(fs_dent) => fs_dent, Err(err) => { - return (self.f)(Err(Error::from(err).with_depth(depth))); + return self.visitor.visit(Err(Error::from(err).with_depth(depth))); } }; let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) { Ok(dent) => DirEntry::new_raw(dent, None), Err(err) => { - return (self.f)(Err(err)); + return self.visitor.visit(Err(err)); } }; let is_symlink = dent.file_type().map_or(false, |ft| ft.is_symlink()); @@ -1426,19 +1497,19 @@ impl<'s> Worker<'s> { dent = match DirEntryRaw::from_path(depth, path, true) { Ok(dent) => DirEntry::new_raw(dent, None), Err(err) => { - return (self.f)(Err(err)); + return self.visitor.visit(Err(err)); } }; if dent.is_dir() { if let Err(err) = check_symlink_loop(ig, dent.path(), depth) { - return (self.f)(Err(err)); + return self.visitor.visit(Err(err)); } } } if let Some(ref stdout) = self.skip { let is_stdout = match path_equals(&dent, stdout) { Ok(is_stdout) => is_stdout, - Err(err) => return (self.f)(Err(err)), + Err(err) => return self.visitor.visit(Err(err)), }; if is_stdout { return WalkState::Continue; -- cgit v1.2.3