diff options
Diffstat (limited to 'src/aggregate.rs')
-rw-r--r-- | src/aggregate.rs | 88 |
1 files changed, 59 insertions, 29 deletions
diff --git a/src/aggregate.rs b/src/aggregate.rs index 9d0f266..2b621d9 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -3,24 +3,71 @@ use anyhow::Result; use colored::{Color, Colorize}; use filesize::PathExt; use std::{borrow::Cow, io, path::Path}; -#[cfg(feature = "aggregate-scan-progress")] use std::{ sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, thread, time::Duration, }; +/// Throttle access to an optional `io::Write` to the specified `Duration` +#[derive(Debug)] +struct ThrottleWriter<W> { + out: Option<W>, + trigger: Arc<AtomicBool>, +} + +impl<W> ThrottleWriter<W> +where + W: io::Write, +{ + fn new(out: Option<W>, duration: Duration) -> Self { + let writer = Self { + out, + trigger: Default::default(), + }; + + if writer.out.is_some() { + let trigger = Arc::downgrade(&writer.trigger); + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + while let Some(t) = trigger.upgrade() { + t.store(true, Ordering::Relaxed); + thread::sleep(duration); + } + }); + } + + writer + } + + fn throttled<F>(&mut self, f: F) + where + F: FnOnce(&mut W), + { + if self.trigger.swap(false, Ordering::Relaxed) { + self.unthrottled(f); + } + } + + fn unthrottled<F>(&mut self, f: F) + where + F: FnOnce(&mut W), + { + if let Some(ref mut out) = self.out { + f(out); + } + } +} + /// Aggregate the given `paths` and write information about them to `out` in a human-readable format. /// If `compute_total` is set, it will write an additional line with the total size across all given `paths`. /// If `sort_by_size_in_bytes` is set, we will sort all sizes (ascending) before outputting them. pub fn aggregate( mut out: impl io::Write, - #[cfg_attr(not(feature = "aggregate-scan-progress"), allow(unused_variables))] err: Option< - impl io::Write + Send + 'static, - >, + err: Option<impl io::Write>, walk_options: WalkOptions, compute_total: bool, sort_by_size_in_bytes: bool, @@ -35,28 +82,7 @@ pub fn aggregate( let mut num_roots = 0; let mut aggregates = Vec::new(); let mut inodes = InodeFilter::default(); - - #[cfg(feature = "aggregate-scan-progress")] - let shared_count = Arc::new(AtomicUsize::new(0)); - - #[cfg(feature = "aggregate-scan-progress")] - if let Some(mut out) = err { - thread::spawn({ - let shared_count = Arc::clone(&shared_count); - move || { - thread::sleep(Duration::from_secs(1)); - loop { - thread::sleep(Duration::from_millis(100)); - write!( - out, - "Enumerating {} entries\r", - shared_count.load(Ordering::Acquire) - ) - .ok(); - } - } - }); - } + let mut progress = ThrottleWriter::new(err, Duration::from_millis(100)); for path in paths.into_iter() { num_roots += 1; @@ -65,8 +91,9 @@ pub fn aggregate( let device_id = crossdev::init(path.as_ref())?; for entry in walk_options.iter_from_path(path.as_ref()) { stats.entries_traversed += 1; - #[cfg(feature = "aggregate-scan-progress")] - shared_count.fetch_add(1, Ordering::Relaxed); + progress.throttled(|out| { + write!(out, "Enumerating {} entries\r", stats.entries_traversed).ok(); + }); match entry { Ok(entry) => { let file_size = match entry.client_state { @@ -99,6 +126,9 @@ pub fn aggregate( Err(_) => num_errors += 1, } } + progress.unthrottled(|out| { + write!(out, "\x1b[2K\r").ok(); + }); if sort_by_size_in_bytes { aggregates.push((path.as_ref().to_owned(), num_bytes, num_errors)); |