summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Gallant <jamslam@gmail.com>2020-02-20 15:08:37 -0500
committerAndrew Gallant <jamslam@gmail.com>2020-02-20 16:07:51 -0500
commitf314b0d55f2fa7c9f19b677624a19a7ba24c7cf4 (patch)
treeb9f9cf80d7ac7ed9540f336eac8daa821637d1c1
parentfab5c812f31627ea5d57d5710cd3ccbd73244a2b (diff)
ignore: fix parallel traversal
It turns out that the previous version wasn't quite correct. Namely, it was possible for the following sequence to occur: 1. Consider that all workers, except for one, are `waiting`. 2. The last remaining worker finds one more job to do and sends it on the channel. 3. One of the previously `waiting` workers wakes up from the job that the last running worker sent, but `self.resume()` has not been called yet. 4. The last worker, from (2), calls `get_work` and sees that the channel has nothing on it, so it executes `self.waiting() == 1`. Since the worker in (3) hasn't called `self.resume()` yet, `self.waiting() == 1` evaluates to true. 5. This sets off a chain reaction that stops all workers, despite that fact that (3) got more work (which could itself spawn more work). The end result is that the traversal may terminate while their are still outstanding work items to process. This problem was observed through spurious failures in CI. I was not actually able to reproduce the bug locally. We fix this by changing our strategy to detect termination using a counter. Namely, we increment the counter just before sending new work and decrement the counter just after finishing work. In this way, we guarantee that the counter only ever reaches 0 once there is no more work to process. See #1337 for more discussion. Many thanks to @zsugabubus for helping me work through this.
-rw-r--r--crates/ignore/src/walk.rs193
1 files changed, 99 insertions, 94 deletions
diff --git a/crates/ignore/src/walk.rs b/crates/ignore/src/walk.rs
index b2063cde..ecf87b8d 100644
--- a/crates/ignore/src/walk.rs
+++ b/crates/ignore/src/walk.rs
@@ -1073,6 +1073,10 @@ pub enum WalkState {
}
impl WalkState {
+ fn is_continue(&self) -> bool {
+ *self == WalkState::Continue
+ }
+
fn is_quit(&self) -> bool {
*self == WalkState::Quit
}
@@ -1191,6 +1195,7 @@ impl WalkParallel {
// this. The best case scenario would be finding a way to use rayon
// to do this.
let (tx, rx) = channel::unbounded();
+ let num_pending = Arc::new(AtomicUsize::new(0));
{
let mut visitor = builder.build();
let mut any_work = false;
@@ -1229,6 +1234,7 @@ impl WalkParallel {
}
}
};
+ num_pending.fetch_add(1, Ordering::SeqCst);
tx.send(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
@@ -1243,7 +1249,6 @@ impl WalkParallel {
}
}
// Create the workers and then wait for them to finish.
- let num_running = Arc::new(AtomicUsize::new(threads));
let quit_now = Arc::new(AtomicBool::new(false));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
@@ -1253,7 +1258,7 @@ impl WalkParallel {
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
- num_running: num_running.clone(),
+ num_pending: num_pending.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@@ -1368,8 +1373,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
- /// The number of workers waiting for more work.
- num_running: Arc<AtomicUsize>,
+ /// The number of outstanding work items.
+ num_pending: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@@ -1390,91 +1395,89 @@ impl<'s> Worker<'s> {
/// The worker will call the caller's callback for all entries that aren't
/// skipped by the ignore matcher.
fn run(mut self) {
- 'get_work: while let Some(mut work) = self.get_work() {
- // If the work is not a directory, then we can just execute the
- // caller's callback immediately and move on.
- if work.is_symlink() || !work.is_dir() {
- if self.visitor.visit(Ok(work.dent)).is_quit() {
- self.quit_now();
- }
- continue;
- }
- if let Some(err) = work.add_parents() {
- if self.visitor.visit(Err(err)).is_quit() {
- self.quit_now();
- continue;
- }
+ while let Some(work) = self.get_work() {
+ if let WalkState::Quit = self.run_one(work) {
+ self.quit_now();
}
+ self.work_done();
+ }
+ }
- let descend = if let Some(root_device) = work.root_device {
- match is_same_file_system(root_device, work.dent.path()) {
- Ok(true) => true,
- Ok(false) => false,
- Err(err) => {
- if self.visitor.visit(Err(err)).is_quit() {
- self.quit_now();
- continue;
- }
- false
- }
- }
- } else {
- true
- };
-
- // Try to read the directory first before we transfer ownership
- // to the provided closure. Do not unwrap it immediately, though,
- // as we may receive an `Err` value e.g. in the case when we do not
- // have sufficient read permissions to list the directory.
- // In that case we still want to provide the closure with a valid
- // entry before passing the error value.
- let readdir = work.read_dir();
- let depth = work.dent.depth();
- match self.visitor.visit(Ok(work.dent)) {
- WalkState::Continue => {}
- WalkState::Skip => continue,
- WalkState::Quit => {
- self.quit_now();
- continue;
- }
- }
- if !descend {
- continue;
+ fn run_one(&mut self, mut work: Work) -> WalkState {
+ // If the work is not a directory, then we can just execute the
+ // caller's callback immediately and move on.
+ if work.is_symlink() || !work.is_dir() {
+ return self.visitor.visit(Ok(work.dent));
+ }
+ if let Some(err) = work.add_parents() {
+ let state = self.visitor.visit(Err(err));
+ if state.is_quit() {
+ return state;
}
+ }
- let readdir = match readdir {
- Ok(readdir) => readdir,
+ let descend = if let Some(root_device) = work.root_device {
+ match is_same_file_system(root_device, work.dent.path()) {
+ Ok(true) => true,
+ Ok(false) => false,
Err(err) => {
- if self.visitor.visit(Err(err)).is_quit() {
- self.quit_now();
+ let state = self.visitor.visit(Err(err));
+ if state.is_quit() {
+ return state;
}
- continue;
+ false
}
- };
+ }
+ } else {
+ true
+ };
- if self.max_depth.map_or(false, |max| depth >= max) {
- continue;
+ // Try to read the directory first before we transfer ownership
+ // to the provided closure. Do not unwrap it immediately, though,
+ // as we may receive an `Err` value e.g. in the case when we do not
+ // have sufficient read permissions to list the directory.
+ // In that case we still want to provide the closure with a valid
+ // entry before passing the error value.
+ let readdir = work.read_dir();
+ let depth = work.dent.depth();
+ let state = self.visitor.visit(Ok(work.dent));
+ if !state.is_continue() {
+ return state;
+ }
+ if !descend {
+ return WalkState::Skip;
+ }
+
+ let readdir = match readdir {
+ Ok(readdir) => readdir,
+ Err(err) => {
+ return self.visitor.visit(Err(err));
}
- for result in readdir {
- let state = self.run_one(
- &work.ignore,
- depth + 1,
- work.root_device,
- result,
- );
- if state.is_quit() {
- self.quit_now();
- continue 'get_work;
- }
+ };
+
+ if self.max_depth.map_or(false, |max| depth >= max) {
+ return WalkState::Skip;
+ }
+ for result in readdir {
+ let state = self.generate_work(
+ &work.ignore,
+ depth + 1,
+ work.root_device,
+ result,
+ );
+ if state.is_quit() {
+ return state;
}
}
+ WalkState::Continue
}
- /// Runs the worker on a single entry from a directory iterator.
+ /// Decides whether to submit the given directory entry as a file to
+ /// search.
///
/// If the entry is a path that should be ignored, then this is a no-op.
/// Otherwise, the entry is pushed on to the queue. (The actual execution
- /// of the callback happens in `run`.)
+ /// of the callback happens in `run_one`.)
///
/// If an error occurs while reading the entry, then it is sent to the
/// caller's callback.
@@ -1482,7 +1485,7 @@ impl<'s> Worker<'s> {
/// `ig` is the `Ignore` matcher for the parent directory. `depth` should
/// be the depth of this entry. `result` should be the item yielded by
/// a directory iterator.
- fn run_one(
+ fn generate_work(
&mut self,
ig: &Ignore,
depth: usize,
@@ -1540,13 +1543,7 @@ impl<'s> Worker<'s> {
};
if !should_skip_path && !should_skip_filesize {
- self.tx
- .send(Message::Work(Work {
- dent: dent,
- ignore: ig.clone(),
- root_device: root_device,
- }))
- .unwrap();
+ self.send(Work { dent, ignore: ig.clone(), root_device });
}
WalkState::Continue
}
@@ -1571,15 +1568,19 @@ impl<'s> Worker<'s> {
// Repeat quit message to wake up sleeping threads, if
// any. The domino effect will ensure that every thread
// will quit.
- self.waiting();
self.tx.send(Message::Quit).unwrap();
return None;
}
Err(TryRecvError::Empty) => {
- // If it was the last running thread, then no more work can
- // arrive, thus we can safely start quitting. Otherwise, a
- // thread may spawn new work to be done.
- if self.waiting() == 1 {
+ // Once num_pending reaches 0, it is impossible for it to
+ // ever increase again. Namely, it only reaches 0 once
+ // all jobs have run such that no jobs have produced more
+ // work. We have this guarantee because num_pending is
+ // always incremented before each job is submitted and only
+ // decremented once each job is completely finished.
+ // Therefore, if this reaches zero, then there can be no
+ // other job running.
+ if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
self.tx.send(Message::Quit).unwrap();
@@ -1590,7 +1591,6 @@ impl<'s> Worker<'s> {
.rx
.recv()
.expect("channel disconnected while worker is alive"));
- self.resume();
}
Err(TryRecvError::Disconnected) => {
unreachable!("channel disconnected while worker is alive");
@@ -1609,15 +1609,20 @@ impl<'s> Worker<'s> {
self.quit_now.load(Ordering::SeqCst)
}
- /// Sets this worker's "running" state to false. Returns the previous
- /// number of running workers.
- fn waiting(&self) -> usize {
- self.num_running.fetch_sub(1, Ordering::SeqCst)
+ /// Returns the number of pending jobs.
+ fn num_pending(&self) -> usize {
+ self.num_pending.load(Ordering::SeqCst)
+ }
+
+ /// Send work.
+ fn send(&self, work: Work) {
+ self.num_pending.fetch_add(1, Ordering::SeqCst);
+ self.tx.send(Message::Work(work)).unwrap();
}
- /// Sets this worker's "running" state to true.
- fn resume(&self) {
- self.num_running.fetch_add(1, Ordering::SeqCst);
+ /// Signal that work has been received.
+ fn work_done(&self) {
+ self.num_pending.fetch_sub(1, Ordering::SeqCst);
}
}