diff options
author | zsugabubus <zsugabubus@users.noreply.github.com> | 2019-08-02 13:56:06 +0200 |
---|---|---|
committer | Andrew Gallant <jamslam@gmail.com> | 2020-02-17 17:16:28 -0500 |
commit | 3d59bd98aaea776092168f8feaf5daeeaf743cbf (patch) | |
tree | 2d028e271d80180c7cff3dd9c516ba1adfddfb5a /ignore/src | |
parent | 52d7f474206eb517c7284448777ed839c0cae8bb (diff) |
ignore: rework inter-thread messaging
Change the meaning of `Quit` message. Now it means terminate. The final
"dance" is unnecessary, because by the time quitting begins, no thread
will ever spawn a new `Work`. The trick was to replace the heuristic
spin-loop with blocking receive.
Closes #1337
Diffstat (limited to 'ignore/src')
-rw-r--r-- | ignore/src/walk.rs | 167 |
1 files changed, 49 insertions, 118 deletions
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index b430981f..ed321eed 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -6,11 +6,9 @@ use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::thread; -use std::time::Duration; use std::vec; -use channel; +use channel::{self, TryRecvError}; use same_file::Handle; use walkdir::{self, WalkDir}; @@ -1242,8 +1240,7 @@ impl WalkParallel { } } // Create the workers and then wait for them to finish. - let num_waiting = Arc::new(AtomicUsize::new(0)); - let num_quitting = Arc::new(AtomicUsize::new(0)); + let num_running = Arc::new(AtomicUsize::new(threads)); let quit_now = Arc::new(AtomicBool::new(false)); crossbeam_utils::thread::scope(|s| { let mut handles = vec![]; @@ -1253,11 +1250,7 @@ impl WalkParallel { tx: tx.clone(), rx: rx.clone(), quit_now: quit_now.clone(), - is_waiting: false, - is_quitting: false, - num_waiting: num_waiting.clone(), - num_quitting: num_quitting.clone(), - threads: threads, + num_running: num_running.clone(), max_depth: self.max_depth, max_filesize: self.max_filesize, follow_links: self.follow_links, @@ -1270,8 +1263,7 @@ impl WalkParallel { for handle in handles { handle.join().unwrap(); } - }) - .unwrap(); // Pass along panics from threads + }).unwrap(); // Pass along panics from threads } fn threads(&self) -> usize { @@ -1289,7 +1281,7 @@ enum Message { /// Work items for entries that should be skipped or ignored should not /// be produced. Work(Work), - /// This instruction indicates that the worker should start quitting. + /// This instruction indicates that the worker should quit. Quit, } @@ -1368,21 +1360,12 @@ struct Worker<'s> { tx: channel::Sender<Message>, /// The receive side of our mpmc queue. rx: channel::Receiver<Message>, - /// Whether all workers should quit at the next opportunity. Note that - /// this is distinct from quitting because of exhausting the contents of - /// a directory. Instead, this is used when the caller's callback indicates - /// that the iterator should quit immediately. + /// Whether all workers should terminate at the next opportunity. Note + /// that we need this because we don't want other `Work` to be done after + /// we quit. We wouldn't need this if have a priority channel. quit_now: Arc<AtomicBool>, - /// Whether this worker is waiting for more work. - is_waiting: bool, - /// Whether this worker has started to quit. - is_quitting: bool, /// The number of workers waiting for more work. - num_waiting: Arc<AtomicUsize>, - /// The number of workers waiting to quit. - num_quitting: Arc<AtomicUsize>, - /// The total number of workers. - threads: usize, + num_running: Arc<AtomicUsize>, /// The maximum depth of directories to descend. A value of `0` means no /// descension at all. max_depth: Option<usize>, @@ -1403,20 +1386,19 @@ impl<'s> Worker<'s> { /// The worker will call the caller's callback for all entries that aren't /// skipped by the ignore matcher. fn run(mut self) { - while let Some(mut work) = self.get_work() { + 'get_work: while let Some(mut work) = self.get_work() { // If the work is not a directory, then we can just execute the // caller's callback immediately and move on. if work.is_symlink() || !work.is_dir() { if self.visitor.visit(Ok(work.dent)).is_quit() { self.quit_now(); - return; } continue; } if let Some(err) = work.add_parents() { if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); - return; + continue; } } @@ -1427,7 +1409,7 @@ impl<'s> Worker<'s> { Err(err) => { if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); - return; + continue; } false } @@ -1449,7 +1431,7 @@ impl<'s> Worker<'s> { WalkState::Skip => continue, WalkState::Quit => { self.quit_now(); - return; + continue; } } if !descend { @@ -1461,7 +1443,6 @@ impl<'s> Worker<'s> { Err(err) => { if self.visitor.visit(Err(err)).is_quit() { self.quit_now(); - return; } continue; } @@ -1479,7 +1460,7 @@ impl<'s> Worker<'s> { ); if state.is_quit() { self.quit_now(); - return; + continue 'get_work; } } } @@ -1568,64 +1549,43 @@ impl<'s> Worker<'s> { /// If all work has been exhausted, then this returns None. The worker /// should then subsequently quit. fn get_work(&mut self) -> Option<Work> { + let mut value = self.rx.try_recv(); loop { + // Simulate a priority channel: If quit_now flag is set, we can + // receive only quit messages. if self.is_quit_now() { - return None; + value = Ok(Message::Quit) } - match self.rx.try_recv() { + match value { Ok(Message::Work(work)) => { - self.waiting(false); - self.quitting(false); return Some(work); } Ok(Message::Quit) => { - // We can't just quit because a Message::Quit could be - // spurious. For example, it's possible to observe that - // all workers are waiting even if there's more work to - // be done. - // - // Therefore, we do a bit of a dance to wait until all - // workers have signaled that they're ready to quit before - // actually quitting. - // - // If the Quit message turns out to be spurious, then the - // loop below will break and we'll go back to looking for - // more work. - self.waiting(true); - self.quitting(true); - while !self.is_quit_now() { - let nwait = self.num_waiting(); - let nquit = self.num_quitting(); - // If the number of waiting workers dropped, then - // abort our attempt to quit. - if nwait < self.threads { - break; - } - // If all workers are in this quit loop, then we - // can stop. - if nquit == self.threads { - return None; - } - // Otherwise, spin. - } + // Repeat quit message to wake up sleeping threads, if + // any. The domino effect will ensure that every thread + // will quit. + self.waiting(); + self.tx.send(Message::Quit).unwrap(); + return None; } - Err(_) => { - self.waiting(true); - self.quitting(false); - if self.num_waiting() == self.threads { - for _ in 0..self.threads { - self.tx.send(Message::Quit).unwrap(); - } - } else { - // You're right to consider this suspicious, but it's - // a useful heuristic to permit producers to catch up - // to consumers without burning the CPU. It is also - // useful as a means to prevent burning the CPU if only - // one worker is left doing actual work. It's not - // perfect and it doesn't leave the CPU completely - // idle, but it's not clear what else we can do. :-/ - thread::sleep(Duration::from_millis(1)); + Err(TryRecvError::Empty) => { + // If it was the last running thread, then no more work can + // arrive, thus we can safely start quitting. Otherwise, a + // thread may spawn new work to be done. + if self.waiting() == 1 { + // Every other thread is blocked at the next recv(). + // Send the initial quit message and quit. + self.tx.send(Message::Quit).unwrap(); + return None; } + // Wait for next `Work` or `Quit` message. + value = Ok(self.rx.recv().expect( + "channel disconnected while worker is alive", + )); + self.resume(); + }, + Err(TryRecvError::Disconnected) => { + unreachable!("channel disconnected while worker is alive"); } } } @@ -1641,44 +1601,15 @@ impl<'s> Worker<'s> { self.quit_now.load(Ordering::SeqCst) } - /// Returns the total number of workers waiting for work. - fn num_waiting(&self) -> usize { - self.num_waiting.load(Ordering::SeqCst) - } - - /// Returns the total number of workers ready to quit. - fn num_quitting(&self) -> usize { - self.num_quitting.load(Ordering::SeqCst) + /// Sets this worker's "running" state to false. Returns the previous + /// number of running workers. + fn waiting(&self) -> usize { + self.num_running.fetch_sub(1, Ordering::SeqCst) } - /// Sets this worker's "quitting" state to the value of `yes`. - fn quitting(&mut self, yes: bool) { - if yes { - if !self.is_quitting { - self.is_quitting = true; - self.num_quitting.fetch_add(1, Ordering::SeqCst); - } - } else { - if self.is_quitting { - self.is_quitting = false; - self.num_quitting.fetch_sub(1, Ordering::SeqCst); - } - } - } - - /// Sets this worker's "waiting" state to the value of `yes`. - fn waiting(&mut self, yes: bool) { - if yes { - if !self.is_waiting { - self.is_waiting = true; - self.num_waiting.fetch_add(1, Ordering::SeqCst); - } - } else { - if self.is_waiting { - self.is_waiting = false; - self.num_waiting.fetch_sub(1, Ordering::SeqCst); - } - } + /// Sets this worker's "running" state to true. + fn resume(&self) { + self.num_running.fetch_add(1, Ordering::SeqCst); } } |