diff options
author | Thomas Hurst <tom@hur.st> | 2022-03-19 19:40:02 +0000 |
---|---|---|
committer | Thomas Hurst <tom@hur.st> | 2022-03-19 19:40:02 +0000 |
commit | 7d83f965d620ccebeda9a7451cdbb2e40ed88c24 (patch) | |
tree | 739720d07a1e1e6f204b3dad11e7942cc9f4da55 | |
parent | 9a1da6bc4e964912a521b2f0de0bdf6124749ccd (diff) |
Improve aggregate progress reporting
Previously, aggregate mode progress reports were handled by an
infinitely-looping thread carrying a 64-bit atomic of the current count,
which it would print periodically.
This resulted in #99 - breaking on platforms without 64-bit atomics,
for which a feature was added to disable it.
It also implied a race condition, where the "Enumerating ..." message
could be printed after results had been gathered but before dua exited.
Additionally, part of the status message could be left on the display if
the first line of a report was too short to cover it.
This commit should resolve these:
* The 64-bit atomic counter is replaced with an 8-bit AtomicBool
* All printing is controlled from the main thread
* The first line is cleared prior to printing a report
The only notable drawback I see with this approach is that progress
reporting can sometimes be delayed, since the display is only evaluated
for update during periods the aggregation loop makes progress. The
practical difference appears relatively minor.
Since this should resolve #99, the aggregate-scan-progress feature is
removed.
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | src/aggregate.rs | 88 |
3 files changed, 60 insertions, 32 deletions
@@ -10,13 +10,12 @@ license = "MIT" include = ["src/**/*", "Cargo.*", "LICENSE", "README.md", "CHANGELOG.md", "!**/tests/*"] [features] -default = ["tui-crossplatform", "trash-move", "aggregate-scan-progress"] +default = ["tui-crossplatform", "trash-move"] tui-unix = ["crosstermion/tui-react-termion", "tui-shared"] tui-crossplatform = ["crosstermion/tui-react-crossterm", "tui-shared"] tui-shared = ["tui", "tui-react", "open", "unicode-segmentation"] trash-move = ["trash"] -aggregate-scan-progress = [] [dependencies] clap = { version = "3.0", features = ["derive"] } @@ -38,7 +38,6 @@ check:## run cargo-check with various features cargo check --no-default-features --features tui-unix cargo check --no-default-features --features tui-crossplatform cargo check --no-default-features --features trash-move - cargo check --no-default-features --features aggregate-scan-progress unit-tests: ## run all unit tests cargo test --all diff --git a/src/aggregate.rs b/src/aggregate.rs index 9d0f266..2b621d9 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -3,24 +3,71 @@ use anyhow::Result; use colored::{Color, Colorize}; use filesize::PathExt; use std::{borrow::Cow, io, path::Path}; -#[cfg(feature = "aggregate-scan-progress")] use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, thread, time::Duration, }; +/// Throttle access to an optional `io::Write` to the specified `Duration` +#[derive(Debug)] +struct ThrottleWriter<W> { + out: Option<W>, + trigger: Arc<AtomicBool>, +} + +impl<W> ThrottleWriter<W> +where + W: io::Write, +{ + fn new(out: Option<W>, duration: Duration) -> Self { + let writer = Self { + out, + trigger: Default::default(), + }; + + if writer.out.is_some() { + let trigger = Arc::downgrade(&writer.trigger); + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + while let Some(t) = trigger.upgrade() { + t.store(true, Ordering::Relaxed); + thread::sleep(duration); + } + }); + } + + writer + } + + fn throttled<F>(&mut self, f: F) + where + F: FnOnce(&mut W), + { + if self.trigger.swap(false, Ordering::Relaxed) { + self.unthrottled(f); + } + } + + fn unthrottled<F>(&mut self, f: F) + where + F: FnOnce(&mut W), + { + if let Some(ref mut out) = self.out { + f(out); + } + } +} + /// Aggregate the given `paths` and write information about them to `out` in a human-readable format. /// If `compute_total` is set, it will write an additional line with the total size across all given `paths`. /// If `sort_by_size_in_bytes` is set, we will sort all sizes (ascending) before outputting them. pub fn aggregate( mut out: impl io::Write, - #[cfg_attr(not(feature = "aggregate-scan-progress"), allow(unused_variables))] err: Option< - impl io::Write + Send + 'static, - >, + err: Option<impl io::Write>, walk_options: WalkOptions, compute_total: bool, sort_by_size_in_bytes: bool, @@ -35,28 +82,7 @@ pub fn aggregate( let mut num_roots = 0; let mut aggregates = Vec::new(); let mut inodes = InodeFilter::default(); - - #[cfg(feature = "aggregate-scan-progress")] - let shared_count = Arc::new(AtomicUsize::new(0)); - - #[cfg(feature = "aggregate-scan-progress")] - if let Some(mut out) = err { - thread::spawn({ - let shared_count = Arc::clone(&shared_count); - move || { - thread::sleep(Duration::from_secs(1)); - loop { - thread::sleep(Duration::from_millis(100)); - write!( - out, - "Enumerating {} entries\r", - shared_count.load(Ordering::Acquire) - ) - .ok(); - } - } - }); - } + let mut progress = ThrottleWriter::new(err, Duration::from_millis(100)); for path in paths.into_iter() { num_roots += 1; @@ -65,8 +91,9 @@ pub fn aggregate( let device_id = crossdev::init(path.as_ref())?; for entry in walk_options.iter_from_path(path.as_ref()) { stats.entries_traversed += 1; - #[cfg(feature = "aggregate-scan-progress")] - shared_count.fetch_add(1, Ordering::Relaxed); + progress.throttled(|out| { + write!(out, "Enumerating {} entries\r", stats.entries_traversed).ok(); + }); match entry { Ok(entry) => { let file_size = match entry.client_state { @@ -99,6 +126,9 @@ pub fn aggregate( Err(_) => num_errors += 1, } } + progress.unthrottled(|out| { + write!(out, "\x1b[2K\r").ok(); + }); if sort_by_size_in_bytes { aggregates.push((path.as_ref().to_owned(), num_bytes, num_errors)); |