summaryrefslogtreecommitdiffstats
path: root/ignore/src
diff options
context:
space:
mode:
authorzsugabubus <zsugabubus@users.noreply.github.com>2019-08-02 13:56:06 +0200
committerAndrew Gallant <jamslam@gmail.com>2020-02-17 17:16:28 -0500
commit3d59bd98aaea776092168f8feaf5daeeaf743cbf (patch)
tree2d028e271d80180c7cff3dd9c516ba1adfddfb5a /ignore/src
parent52d7f474206eb517c7284448777ed839c0cae8bb (diff)
ignore: rework inter-thread messaging
Change the meaning of `Quit` message. Now it means terminate. The final "dance" is unnecessary, because by the time quitting begins, no thread will ever spawn a new `Work`. The trick was to replace the heuristic spin-loop with blocking receive. Closes #1337
Diffstat (limited to 'ignore/src')
-rw-r--r--ignore/src/walk.rs167
1 files changed, 49 insertions, 118 deletions
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs
index b430981f..ed321eed 100644
--- a/ignore/src/walk.rs
+++ b/ignore/src/walk.rs
@@ -6,11 +6,9 @@ use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
-use std::thread;
-use std::time::Duration;
use std::vec;
-use channel;
+use channel::{self, TryRecvError};
use same_file::Handle;
use walkdir::{self, WalkDir};
@@ -1242,8 +1240,7 @@ impl WalkParallel {
}
}
// Create the workers and then wait for them to finish.
- let num_waiting = Arc::new(AtomicUsize::new(0));
- let num_quitting = Arc::new(AtomicUsize::new(0));
+ let num_running = Arc::new(AtomicUsize::new(threads));
let quit_now = Arc::new(AtomicBool::new(false));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
@@ -1253,11 +1250,7 @@ impl WalkParallel {
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
- is_waiting: false,
- is_quitting: false,
- num_waiting: num_waiting.clone(),
- num_quitting: num_quitting.clone(),
- threads: threads,
+ num_running: num_running.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@@ -1270,8 +1263,7 @@ impl WalkParallel {
for handle in handles {
handle.join().unwrap();
}
- })
- .unwrap(); // Pass along panics from threads
+ }).unwrap(); // Pass along panics from threads
}
fn threads(&self) -> usize {
@@ -1289,7 +1281,7 @@ enum Message {
/// Work items for entries that should be skipped or ignored should not
/// be produced.
Work(Work),
- /// This instruction indicates that the worker should start quitting.
+ /// This instruction indicates that the worker should quit.
Quit,
}
@@ -1368,21 +1360,12 @@ struct Worker<'s> {
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
rx: channel::Receiver<Message>,
- /// Whether all workers should quit at the next opportunity. Note that
- /// this is distinct from quitting because of exhausting the contents of
- /// a directory. Instead, this is used when the caller's callback indicates
- /// that the iterator should quit immediately.
+ /// Whether all workers should terminate at the next opportunity. Note
+ /// that we need this because we don't want other `Work` to be done after
+ /// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
- /// Whether this worker is waiting for more work.
- is_waiting: bool,
- /// Whether this worker has started to quit.
- is_quitting: bool,
/// The number of workers waiting for more work.
- num_waiting: Arc<AtomicUsize>,
- /// The number of workers waiting to quit.
- num_quitting: Arc<AtomicUsize>,
- /// The total number of workers.
- threads: usize,
+ num_running: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@@ -1403,20 +1386,19 @@ impl<'s> Worker<'s> {
/// The worker will call the caller's callback for all entries that aren't
/// skipped by the ignore matcher.
fn run(mut self) {
- while let Some(mut work) = self.get_work() {
+ 'get_work: while let Some(mut work) = self.get_work() {
// If the work is not a directory, then we can just execute the
// caller's callback immediately and move on.
if work.is_symlink() || !work.is_dir() {
if self.visitor.visit(Ok(work.dent)).is_quit() {
self.quit_now();
- return;
}
continue;
}
if let Some(err) = work.add_parents() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
- return;
+ continue;
}
}
@@ -1427,7 +1409,7 @@ impl<'s> Worker<'s> {
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
- return;
+ continue;
}
false
}
@@ -1449,7 +1431,7 @@ impl<'s> Worker<'s> {
WalkState::Skip => continue,
WalkState::Quit => {
self.quit_now();
- return;
+ continue;
}
}
if !descend {
@@ -1461,7 +1443,6 @@ impl<'s> Worker<'s> {
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
- return;
}
continue;
}
@@ -1479,7 +1460,7 @@ impl<'s> Worker<'s> {
);
if state.is_quit() {
self.quit_now();
- return;
+ continue 'get_work;
}
}
}
@@ -1568,64 +1549,43 @@ impl<'s> Worker<'s> {
/// If all work has been exhausted, then this returns None. The worker
/// should then subsequently quit.
fn get_work(&mut self) -> Option<Work> {
+ let mut value = self.rx.try_recv();
loop {
+ // Simulate a priority channel: If quit_now flag is set, we can
+ // receive only quit messages.
if self.is_quit_now() {
- return None;
+ value = Ok(Message::Quit)
}
- match self.rx.try_recv() {
+ match value {
Ok(Message::Work(work)) => {
- self.waiting(false);
- self.quitting(false);
return Some(work);
}
Ok(Message::Quit) => {
- // We can't just quit because a Message::Quit could be
- // spurious. For example, it's possible to observe that
- // all workers are waiting even if there's more work to
- // be done.
- //
- // Therefore, we do a bit of a dance to wait until all
- // workers have signaled that they're ready to quit before
- // actually quitting.
- //
- // If the Quit message turns out to be spurious, then the
- // loop below will break and we'll go back to looking for
- // more work.
- self.waiting(true);
- self.quitting(true);
- while !self.is_quit_now() {
- let nwait = self.num_waiting();
- let nquit = self.num_quitting();
- // If the number of waiting workers dropped, then
- // abort our attempt to quit.
- if nwait < self.threads {
- break;
- }
- // If all workers are in this quit loop, then we
- // can stop.
- if nquit == self.threads {
- return None;
- }
- // Otherwise, spin.
- }
+ // Repeat quit message to wake up sleeping threads, if
+ // any. The domino effect will ensure that every thread
+ // will quit.
+ self.waiting();
+ self.tx.send(Message::Quit).unwrap();
+ return None;
}
- Err(_) => {
- self.waiting(true);
- self.quitting(false);
- if self.num_waiting() == self.threads {
- for _ in 0..self.threads {
- self.tx.send(Message::Quit).unwrap();
- }
- } else {
- // You're right to consider this suspicious, but it's
- // a useful heuristic to permit producers to catch up
- // to consumers without burning the CPU. It is also
- // useful as a means to prevent burning the CPU if only
- // one worker is left doing actual work. It's not
- // perfect and it doesn't leave the CPU completely
- // idle, but it's not clear what else we can do. :-/
- thread::sleep(Duration::from_millis(1));
+ Err(TryRecvError::Empty) => {
+ // If it was the last running thread, then no more work can
+ // arrive, thus we can safely start quitting. Otherwise, a
+ // thread may spawn new work to be done.
+ if self.waiting() == 1 {
+ // Every other thread is blocked at the next recv().
+ // Send the initial quit message and quit.
+ self.tx.send(Message::Quit).unwrap();
+ return None;
}
+ // Wait for next `Work` or `Quit` message.
+ value = Ok(self.rx.recv().expect(
+ "channel disconnected while worker is alive",
+ ));
+ self.resume();
+ },
+ Err(TryRecvError::Disconnected) => {
+ unreachable!("channel disconnected while worker is alive");
}
}
}
@@ -1641,44 +1601,15 @@ impl<'s> Worker<'s> {
self.quit_now.load(Ordering::SeqCst)
}
- /// Returns the total number of workers waiting for work.
- fn num_waiting(&self) -> usize {
- self.num_waiting.load(Ordering::SeqCst)
- }
-
- /// Returns the total number of workers ready to quit.
- fn num_quitting(&self) -> usize {
- self.num_quitting.load(Ordering::SeqCst)
+ /// Sets this worker's "running" state to false. Returns the previous
+ /// number of running workers.
+ fn waiting(&self) -> usize {
+ self.num_running.fetch_sub(1, Ordering::SeqCst)
}
- /// Sets this worker's "quitting" state to the value of `yes`.
- fn quitting(&mut self, yes: bool) {
- if yes {
- if !self.is_quitting {
- self.is_quitting = true;
- self.num_quitting.fetch_add(1, Ordering::SeqCst);
- }
- } else {
- if self.is_quitting {
- self.is_quitting = false;
- self.num_quitting.fetch_sub(1, Ordering::SeqCst);
- }
- }
- }
-
- /// Sets this worker's "waiting" state to the value of `yes`.
- fn waiting(&mut self, yes: bool) {
- if yes {
- if !self.is_waiting {
- self.is_waiting = true;
- self.num_waiting.fetch_add(1, Ordering::SeqCst);
- }
- } else {
- if self.is_waiting {
- self.is_waiting = false;
- self.num_waiting.fetch_sub(1, Ordering::SeqCst);
- }
- }
+ /// Sets this worker's "running" state to true.
+ fn resume(&self) {
+ self.num_running.fetch_add(1, Ordering::SeqCst);
}
}