summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTavian Barnes <tavianator@tavianator.com>2021-12-10 13:56:28 -0500
committerGitHub <noreply@github.com>2021-12-10 13:56:28 -0500
commit81669f4c10a70a1c105ae487fa20cb2d686d0d1a (patch)
tree8c1a9e24e51a75b2455d7f6fab6fcdbe0a13e175
parent5ea7cb7a05db0712f8d7073f0aa98a16bdcf7f7d (diff)
parent4baefb2f5a4c09f2e6043b451214abb656a3dee6 (diff)
Merge pull request #902 from tavianator/quit-senders
Quit senders more aggressively
-rw-r--r--src/output.rs17
-rw-r--r--src/walk.rs61
2 files changed, 44 insertions, 34 deletions
diff --git a/src/output.rs b/src/output.rs
index f51d41f..bcb9505 100644
--- a/src/output.rs
+++ b/src/output.rs
@@ -1,7 +1,6 @@
use std::borrow::Cow;
use std::io::{self, Write};
use std::path::Path;
-use std::sync::atomic::{AtomicBool, Ordering};
use lscolors::{Indicator, LsColors, Style};
@@ -15,12 +14,7 @@ fn replace_path_separator(path: &str, new_path_separator: &str) -> String {
}
// TODO: this function is performance critical and can probably be optimized
-pub fn print_entry<W: Write>(
- stdout: &mut W,
- entry: &Path,
- config: &Config,
- wants_to_quit: &AtomicBool,
-) {
+pub fn print_entry<W: Write>(stdout: &mut W, entry: &Path, config: &Config) {
let path = if config.strip_cwd_prefix {
strip_current_dir(entry)
} else {
@@ -28,7 +22,7 @@ pub fn print_entry<W: Write>(
};
let r = if let Some(ref ls_colors) = config.ls_colors {
- print_entry_colorized(stdout, path, config, ls_colors, wants_to_quit)
+ print_entry_colorized(stdout, path, config, ls_colors)
} else {
print_entry_uncolorized(stdout, path, config)
};
@@ -50,7 +44,6 @@ fn print_entry_colorized<W: Write>(
path: &Path,
config: &Config,
ls_colors: &LsColors,
- wants_to_quit: &AtomicBool,
) -> io::Result<()> {
// Split the path between the parent and the last component
let mut offset = 0;
@@ -92,12 +85,6 @@ fn print_entry_colorized<W: Write>(
writeln!(stdout)?;
}
- if wants_to_quit.load(Ordering::Relaxed) {
- // Ignore any errors on flush, because we're about to exit anyway
- let _ = stdout.flush();
- ExitCode::KilledBySigint.exit();
- }
-
Ok(())
}
diff --git a/src/walk.rs b/src/walk.rs
index 151d568..d69c813 100644
--- a/src/walk.rs
+++ b/src/walk.rs
@@ -134,11 +134,19 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
let parallel_walker = walker.threads(config.threads).build_parallel();
- let wants_to_quit = Arc::new(AtomicBool::new(false));
+ // Flag for cleanly shutting down the parallel walk
+ let quit_flag = Arc::new(AtomicBool::new(false));
+ // Flag specifically for quitting due to ^C
+ let interrupt_flag = Arc::new(AtomicBool::new(false));
+
if config.ls_colors.is_some() && config.command.is_none() {
- let wq = Arc::clone(&wants_to_quit);
+ let quit_flag = Arc::clone(&quit_flag);
+ let interrupt_flag = Arc::clone(&interrupt_flag);
+
ctrlc::set_handler(move || {
- if wq.fetch_or(true, Ordering::Relaxed) {
+ quit_flag.store(true, Ordering::Relaxed);
+
+ if interrupt_flag.fetch_or(true, Ordering::Relaxed) {
// Ctrl-C has been pressed twice, exit NOW
ExitCode::KilledBySigint.exit();
}
@@ -147,15 +155,15 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
}
// Spawn the thread that receives all results through the channel.
- let receiver_thread = spawn_receiver(&config, &wants_to_quit, rx);
+ let receiver_thread = spawn_receiver(&config, &quit_flag, &interrupt_flag, rx);
// Spawn the sender threads.
- spawn_senders(&config, &wants_to_quit, pattern, parallel_walker, tx);
+ spawn_senders(&config, &quit_flag, pattern, parallel_walker, tx);
// Wait for the receiver thread to print out all results.
let exit_code = receiver_thread.join().unwrap();
- if wants_to_quit.load(Ordering::Relaxed) {
+ if interrupt_flag.load(Ordering::Relaxed) {
Ok(ExitCode::KilledBySigint)
} else {
Ok(exit_code)
@@ -166,8 +174,10 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
struct ReceiverBuffer<W> {
/// The configuration.
config: Arc<Config>,
+ /// For shutting down the senders.
+ quit_flag: Arc<AtomicBool>,
/// The ^C notifier.
- wants_to_quit: Arc<AtomicBool>,
+ interrupt_flag: Arc<AtomicBool>,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
/// Standard output.
@@ -186,7 +196,8 @@ impl<W: Write> ReceiverBuffer<W> {
/// Create a new receiver buffer.
fn new(
config: Arc<Config>,
- wants_to_quit: Arc<AtomicBool>,
+ quit_flag: Arc<AtomicBool>,
+ interrupt_flag: Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
stdout: W,
) -> Self {
@@ -195,7 +206,8 @@ impl<W: Write> ReceiverBuffer<W> {
Self {
config,
- wants_to_quit,
+ quit_flag,
+ interrupt_flag,
rx,
stdout,
mode: ReceiverMode::Buffering,
@@ -209,6 +221,7 @@ impl<W: Write> ReceiverBuffer<W> {
fn process(&mut self) -> ExitCode {
loop {
if let Err(ec) = self.poll() {
+ self.quit_flag.store(true, Ordering::Relaxed);
return ec;
}
}
@@ -244,7 +257,7 @@ impl<W: Write> ReceiverBuffer<W> {
}
}
ReceiverMode::Streaming => {
- self.print(&path);
+ self.print(&path)?;
self.flush()?;
}
}
@@ -273,8 +286,16 @@ impl<W: Write> ReceiverBuffer<W> {
}
/// Output a path.
- fn print(&mut self, path: &Path) {
- output::print_entry(&mut self.stdout, path, &self.config, &self.wants_to_quit)
+ fn print(&mut self, path: &Path) -> Result<(), ExitCode> {
+ output::print_entry(&mut self.stdout, path, &self.config);
+
+ if self.interrupt_flag.load(Ordering::Relaxed) {
+ // Ignore any errors on flush, because we're about to exit anyway
+ let _ = self.flush();
+ return Err(ExitCode::KilledBySigint);
+ }
+
+ Ok(())
}
/// Switch ourselves into streaming mode.
@@ -283,7 +304,7 @@ impl<W: Write> ReceiverBuffer<W> {
let buffer = mem::take(&mut self.buffer);
for path in buffer {
- self.print(&path);
+ self.print(&path)?;
}
self.flush()
@@ -315,11 +336,13 @@ impl<W: Write> ReceiverBuffer<W> {
fn spawn_receiver(
config: &Arc<Config>,
- wants_to_quit: &Arc<AtomicBool>,
+ quit_flag: &Arc<AtomicBool>,
+ interrupt_flag: &Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
) -> thread::JoinHandle<ExitCode> {
let config = Arc::clone(config);
- let wants_to_quit = Arc::clone(wants_to_quit);
+ let quit_flag = Arc::clone(quit_flag);
+ let interrupt_flag = Arc::clone(interrupt_flag);
let show_filesystem_errors = config.show_filesystem_errors;
let threads = config.threads;
@@ -375,7 +398,7 @@ fn spawn_receiver(
let stdout = stdout.lock();
let stdout = io::BufWriter::new(stdout);
- let mut rxbuffer = ReceiverBuffer::new(config, wants_to_quit, rx, stdout);
+ let mut rxbuffer = ReceiverBuffer::new(config, quit_flag, interrupt_flag, rx, stdout);
rxbuffer.process()
}
})
@@ -439,7 +462,7 @@ impl DirEntry {
fn spawn_senders(
config: &Arc<Config>,
- wants_to_quit: &Arc<AtomicBool>,
+ quit_flag: &Arc<AtomicBool>,
pattern: Arc<Regex>,
parallel_walker: ignore::WalkParallel,
tx: Sender<WorkerResult>,
@@ -448,10 +471,10 @@ fn spawn_senders(
let config = Arc::clone(config);
let pattern = Arc::clone(&pattern);
let tx_thread = tx.clone();
- let wants_to_quit = Arc::clone(wants_to_quit);
+ let quit_flag = Arc::clone(quit_flag);
Box::new(move |entry_o| {
- if wants_to_quit.load(Ordering::Relaxed) {
+ if quit_flag.load(Ordering::Relaxed) {
return ignore::WalkState::Quit;
}