summaryrefslogtreecommitdiffstats
path: root/ignore/src
diff options
context:
space:
mode:
authorEd Page <eopage@gmail.com>2019-10-26 19:02:10 -0600
committerAndrew Gallant <jamslam@gmail.com>2020-02-17 17:16:28 -0500
commit9f7c2ebc09caec2282065974725f9cd2326b2355 (patch)
tree1b6929dc1bbb85fc96ff0c3f630b1ea95208505c /ignore/src
parent5c1eac41a364c9bbf08abc1b3ca4698c4e8f929d (diff)
ignore: allow parallel walker to borrow data
This makes it so the caller can more easily refactor from single-threaded to multi-threaded walking. If they want to support both, this makes it easier to do so with a single initialization code-path. In particular, it side-steps the need to put everything into an `Arc`. This is not a breaking change because it strictly increases the number of allowed inputs to `WalkParallel::run`. Closes #1410, Closes #1432
Diffstat (limited to 'ignore/src')
-rw-r--r--ignore/src/walk.rs141
1 files changed, 77 insertions, 64 deletions
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs
index e00b29a5..658c2dbb 100644
--- a/ignore/src/walk.rs
+++ b/ignore/src/walk.rs
@@ -1068,6 +1068,10 @@ impl WalkState {
}
}
+type FnVisitor<'s> = Box<
+ dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's
+>;
+
/// WalkParallel is a parallel recursive directory iterator over files paths
/// in one or more directories.
///
@@ -1091,11 +1095,10 @@ impl WalkParallel {
/// Execute the parallel recursive directory iterator. `mkf` is called
/// for each thread used for iteration. The function produced by `mkf`
/// is then in turn called for each visited file path.
- pub fn run<F>(self, mut mkf: F)
+ pub fn run<'s, F>(mut self, mut mkf: F)
where
- F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
+ F: FnMut() -> FnVisitor<'s>,
{
- let mut f = mkf();
let threads = self.threads();
// TODO: Figure out how to use a bounded channel here. With an
// unbounded channel, the workers can run away and fill up memory
@@ -1106,21 +1109,37 @@ impl WalkParallel {
// this. The best case scenario would be finding a way to use rayon
// to do this.
let (tx, rx) = channel::unbounded();
- let mut any_work = false;
- // Send the initial set of root paths to the pool of workers.
- // Note that we only send directories. For files, we send to them the
- // callback directly.
- for path in self.paths {
- let (dent, root_device) = if path == Path::new("-") {
- (DirEntry::new_stdin(), None)
- } else {
- let root_device = if !self.same_file_system {
- None
+ {
+ let mut f = mkf();
+ let mut any_work = false;
+ let mut paths = Vec::new().into_iter();
+ std::mem::swap(&mut paths, &mut self.paths);
+ // Send the initial set of root paths to the pool of workers. Note
+ // that we only send directories. For files, we send to them the
+ // callback directly.
+ for path in paths {
+ let (dent, root_device) = if path == Path::new("-") {
+ (DirEntry::new_stdin(), None)
} else {
- match device_num(&path) {
- Ok(root_device) => Some(root_device),
+ let root_device = if !self.same_file_system {
+ None
+ } else {
+ match device_num(&path) {
+ Ok(root_device) => Some(root_device),
+ Err(err) => {
+ let err = Error::Io(err).with_path(path);
+ if f(Err(err)).is_quit() {
+ return;
+ }
+ continue;
+ }
+ }
+ };
+ match DirEntryRaw::from_path(0, path, false) {
+ Ok(dent) => {
+ (DirEntry::new_raw(dent, None), root_device)
+ }
Err(err) => {
- let err = Error::Io(err).with_path(path);
if f(Err(err)).is_quit() {
return;
}
@@ -1128,56 +1147,50 @@ impl WalkParallel {
}
}
};
- match DirEntryRaw::from_path(0, path, false) {
- Ok(dent) => (DirEntry::new_raw(dent, None), root_device),
- Err(err) => {
- if f(Err(err)).is_quit() {
- return;
- }
- continue;
- }
- }
- };
- tx.send(Message::Work(Work {
- dent: dent,
- ignore: self.ig_root.clone(),
- root_device: root_device,
- }))
- .unwrap();
- any_work = true;
- }
- // ... but there's no need to start workers if we don't need them.
- if !any_work {
- return;
+ tx.send(Message::Work(Work {
+ dent: dent,
+ ignore: self.ig_root.clone(),
+ root_device: root_device,
+ }))
+ .unwrap();
+ any_work = true;
+ }
+ // ... but there's no need to start workers if we don't need them.
+ if !any_work {
+ return;
+ }
}
// Create the workers and then wait for them to finish.
let num_waiting = Arc::new(AtomicUsize::new(0));
let num_quitting = Arc::new(AtomicUsize::new(0));
let quit_now = Arc::new(AtomicBool::new(false));
- let mut handles = vec![];
- for _ in 0..threads {
- let worker = Worker {
- f: mkf(),
- tx: tx.clone(),
- rx: rx.clone(),
- quit_now: quit_now.clone(),
- is_waiting: false,
- is_quitting: false,
- num_waiting: num_waiting.clone(),
- num_quitting: num_quitting.clone(),
- threads: threads,
- max_depth: self.max_depth,
- max_filesize: self.max_filesize,
- follow_links: self.follow_links,
- skip: self.skip.clone(),
- };
- handles.push(thread::spawn(|| worker.run()));
- }
- drop(tx);
- drop(rx);
- for handle in handles {
- handle.join().unwrap();
- }
+ crossbeam_utils::thread::scope(|s| {
+ let mut handles = vec![];
+ for _ in 0..threads {
+ let worker = Worker {
+ f: mkf(),
+ tx: tx.clone(),
+ rx: rx.clone(),
+ quit_now: quit_now.clone(),
+ is_waiting: false,
+ is_quitting: false,
+ num_waiting: num_waiting.clone(),
+ num_quitting: num_quitting.clone(),
+ threads: threads,
+ max_depth: self.max_depth,
+ max_filesize: self.max_filesize,
+ follow_links: self.follow_links,
+ skip: self.skip.clone(),
+ };
+ handles.push(s.spawn(|_| worker.run()));
+ }
+ drop(tx);
+ drop(rx);
+ for handle in handles {
+ handle.join().unwrap();
+ }
+ })
+ .unwrap(); // Pass along panics from threads
}
fn threads(&self) -> usize {
@@ -1267,9 +1280,9 @@ impl Work {
/// ignore matchers, producing new work and invoking the caller's callback.
///
/// Note that a worker is *both* a producer and a consumer.
-struct Worker {
+struct Worker<'s> {
/// The caller's callback.
- f: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
+ f: FnVisitor<'s>,
/// The push side of our mpmc queue.
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
@@ -1303,7 +1316,7 @@ struct Worker {
skip: Option<Arc<Handle>>,
}
-impl Worker {
+impl<'s> Worker<'s> {
/// Runs this worker until there is no more work left to do.
///
/// The worker will call the caller's callback for all entries that aren't