diff options
author | Ed Page <eopage@gmail.com> | 2019-10-26 19:02:10 -0600 |
---|---|---|
committer | Andrew Gallant <jamslam@gmail.com> | 2020-02-17 17:16:28 -0500 |
commit | 9f7c2ebc09caec2282065974725f9cd2326b2355 (patch) | |
tree | 1b6929dc1bbb85fc96ff0c3f630b1ea95208505c /ignore/src | |
parent | 5c1eac41a364c9bbf08abc1b3ca4698c4e8f929d (diff) |
ignore: allow parallel walker to borrow data
This makes it so the caller can more easily refactor from
single-threaded to multi-threaded walking. If they want to support both,
this makes it easier to do so with a single initialization code-path. In
particular, it side-steps the need to put everything into an `Arc`.
This is not a breaking change because it strictly increases the number
of allowed inputs to `WalkParallel::run`.
Closes #1410, Closes #1432
Diffstat (limited to 'ignore/src')
-rw-r--r-- | ignore/src/walk.rs | 141 |
1 files changed, 77 insertions, 64 deletions
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index e00b29a5..658c2dbb 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -1068,6 +1068,10 @@ impl WalkState { } } +type FnVisitor<'s> = Box< + dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's +>; + /// WalkParallel is a parallel recursive directory iterator over files paths /// in one or more directories. /// @@ -1091,11 +1095,10 @@ impl WalkParallel { /// Execute the parallel recursive directory iterator. `mkf` is called /// for each thread used for iteration. The function produced by `mkf` /// is then in turn called for each visited file path. - pub fn run<F>(self, mut mkf: F) + pub fn run<'s, F>(mut self, mut mkf: F) where - F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>, + F: FnMut() -> FnVisitor<'s>, { - let mut f = mkf(); let threads = self.threads(); // TODO: Figure out how to use a bounded channel here. With an // unbounded channel, the workers can run away and fill up memory @@ -1106,21 +1109,37 @@ impl WalkParallel { // this. The best case scenario would be finding a way to use rayon // to do this. let (tx, rx) = channel::unbounded(); - let mut any_work = false; - // Send the initial set of root paths to the pool of workers. - // Note that we only send directories. For files, we send to them the - // callback directly. - for path in self.paths { - let (dent, root_device) = if path == Path::new("-") { - (DirEntry::new_stdin(), None) - } else { - let root_device = if !self.same_file_system { - None + { + let mut f = mkf(); + let mut any_work = false; + let mut paths = Vec::new().into_iter(); + std::mem::swap(&mut paths, &mut self.paths); + // Send the initial set of root paths to the pool of workers. Note + // that we only send directories. For files, we send to them the + // callback directly. + for path in paths { + let (dent, root_device) = if path == Path::new("-") { + (DirEntry::new_stdin(), None) } else { - match device_num(&path) { - Ok(root_device) => Some(root_device), + let root_device = if !self.same_file_system { + None + } else { + match device_num(&path) { + Ok(root_device) => Some(root_device), + Err(err) => { + let err = Error::Io(err).with_path(path); + if f(Err(err)).is_quit() { + return; + } + continue; + } + } + }; + match DirEntryRaw::from_path(0, path, false) { + Ok(dent) => { + (DirEntry::new_raw(dent, None), root_device) + } Err(err) => { - let err = Error::Io(err).with_path(path); if f(Err(err)).is_quit() { return; } @@ -1128,56 +1147,50 @@ impl WalkParallel { } } }; - match DirEntryRaw::from_path(0, path, false) { - Ok(dent) => (DirEntry::new_raw(dent, None), root_device), - Err(err) => { - if f(Err(err)).is_quit() { - return; - } - continue; - } - } - }; - tx.send(Message::Work(Work { - dent: dent, - ignore: self.ig_root.clone(), - root_device: root_device, - })) - .unwrap(); - any_work = true; - } - // ... but there's no need to start workers if we don't need them. - if !any_work { - return; + tx.send(Message::Work(Work { + dent: dent, + ignore: self.ig_root.clone(), + root_device: root_device, + })) + .unwrap(); + any_work = true; + } + // ... but there's no need to start workers if we don't need them. + if !any_work { + return; + } } // Create the workers and then wait for them to finish. let num_waiting = Arc::new(AtomicUsize::new(0)); let num_quitting = Arc::new(AtomicUsize::new(0)); let quit_now = Arc::new(AtomicBool::new(false)); - let mut handles = vec![]; - for _ in 0..threads { - let worker = Worker { - f: mkf(), - tx: tx.clone(), - rx: rx.clone(), - quit_now: quit_now.clone(), - is_waiting: false, - is_quitting: false, - num_waiting: num_waiting.clone(), - num_quitting: num_quitting.clone(), - threads: threads, - max_depth: self.max_depth, - max_filesize: self.max_filesize, - follow_links: self.follow_links, - skip: self.skip.clone(), - }; - handles.push(thread::spawn(|| worker.run())); - } - drop(tx); - drop(rx); - for handle in handles { - handle.join().unwrap(); - } + crossbeam_utils::thread::scope(|s| { + let mut handles = vec![]; + for _ in 0..threads { + let worker = Worker { + f: mkf(), + tx: tx.clone(), + rx: rx.clone(), + quit_now: quit_now.clone(), + is_waiting: false, + is_quitting: false, + num_waiting: num_waiting.clone(), + num_quitting: num_quitting.clone(), + threads: threads, + max_depth: self.max_depth, + max_filesize: self.max_filesize, + follow_links: self.follow_links, + skip: self.skip.clone(), + }; + handles.push(s.spawn(|_| worker.run())); + } + drop(tx); + drop(rx); + for handle in handles { + handle.join().unwrap(); + } + }) + .unwrap(); // Pass along panics from threads } fn threads(&self) -> usize { @@ -1267,9 +1280,9 @@ impl Work { /// ignore matchers, producing new work and invoking the caller's callback. /// /// Note that a worker is *both* a producer and a consumer. -struct Worker { +struct Worker<'s> { /// The caller's callback. - f: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>, + f: FnVisitor<'s>, /// The push side of our mpmc queue. tx: channel::Sender<Message>, /// The receive side of our mpmc queue. @@ -1303,7 +1316,7 @@ struct Worker { skip: Option<Arc<Handle>>, } -impl Worker { +impl<'s> Worker<'s> { /// Runs this worker until there is no more work left to do. /// /// The worker will call the caller's callback for all entries that aren't |