diff options
author | Sebastian Thiel <sebastian.thiel@icloud.com> | 2023-05-07 09:05:00 +0200 |
---|---|---|
committer | Sebastian Thiel <sebastian.thiel@icloud.com> | 2023-05-07 09:18:28 +0200 |
commit | a0a1dbd95fadae50fec894e50bb2c29c41a69bc5 (patch) | |
tree | 55f092205b8ea7bf9e8b4655d61d420a3582e49a | |
parent | b7fd4dedecb015681fe5a5162b0591c17a462550 (diff) |
refactor
Cleanup and improvements
-rw-r--r-- | src/aggregate.rs | 5 | ||||
-rw-r--r-- | src/common.rs | 57 | ||||
-rw-r--r-- | src/traverse.rs | 127 |
3 files changed, 70 insertions, 119 deletions
diff --git a/src/aggregate.rs b/src/aggregate.rs index 91a0c94..1cad08e 100644 --- a/src/aggregate.rs +++ b/src/aggregate.rs @@ -3,7 +3,6 @@ use crate::{crossdev, file_size_on_disk, FlowControl, Throttle, WalkOptions, Wal use anyhow::Result; use owo_colors::{AnsiColors as Color, OwoColorize}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; use std::time::Duration; use std::{io, path::Path}; @@ -26,7 +25,7 @@ pub fn aggregate( let mut total = 0; let mut num_roots = 0; let mut aggregates = Vec::new(); - let inodes = Arc::new(InodeFilter::default()); + let inodes = &InodeFilter::default(); let progress = Throttle::new(Duration::from_millis(100), Duration::from_secs(1).into()); for path in paths.into_iter() { @@ -49,7 +48,7 @@ pub fn aggregate( }; let num_errors = AtomicU64::default(); let count_size = { - let inodes = inodes.clone(); + let inodes = &inodes; let apparent_size = walk_options.apparent_size; let count_hard_links = walk_options.count_hard_links; let smallest_file_in_bytes = &smallest_file_in_bytes; diff --git a/src/common.rs b/src/common.rs index 5ee6358..0e89bd3 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,11 +1,10 @@ -use crate::crossdev; use crate::traverse::{EntryData, Tree, TreeIndex}; use byte_unit::{n_gb_bytes, n_gib_bytes, n_mb_bytes, n_mib_bytes, ByteUnit}; +use std::fmt; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use std::{fmt, path::Path}; pub fn get_entry_or_panic(tree: &Tree, node_idx: TreeIndex) -> &EntryData { tree.node_weight(node_idx) @@ -172,60 +171,6 @@ pub struct WalkOptions { pub ignore_dirs: Vec<PathBuf>, } -type WalkDir = jwalk::WalkDirGeneric<((), Option<Result<std::fs::Metadata, jwalk::Error>>)>; - -impl WalkOptions { - pub fn iter_from_path(&self, root: &Path, root_device_id: u64) -> WalkDir { - WalkDir::new(root) - .follow_links(false) - .sort(match self.sorting { - TraversalSorting::None => false, - TraversalSorting::AlphabeticalByFileName => true, - }) - .skip_hidden(false) - .process_read_dir({ - let ignore_dirs = self.ignore_dirs.clone(); - let cross_filesystems = self.cross_filesystems; - move |_, _, _, dir_entry_results| { - dir_entry_results.iter_mut().for_each(|dir_entry_result| { - if let Ok(dir_entry) = dir_entry_result { - let metadata = dir_entry.metadata(); - - if dir_entry.file_type.is_file() || dir_entry.file_type().is_symlink() { - dir_entry.client_state = Some(metadata); - } else if dir_entry.file_type.is_dir() { - let ok_for_fs = cross_filesystems - || metadata - .as_ref() - .map(|m| crossdev::is_same_device(root_device_id, m)) - .unwrap_or(true); - if !ok_for_fs || ignore_dirs.contains(&dir_entry.path()) { - dir_entry.read_children_path = None; - } - } - } - }) - } - }) - .parallelism(match self.threads { - 0 => jwalk::Parallelism::RayonDefaultPool { - busy_timeout: std::time::Duration::from_secs(1), - }, - 1 => jwalk::Parallelism::Serial, - _ => jwalk::Parallelism::RayonExistingPool { - pool: jwalk::rayon::ThreadPoolBuilder::new() - .stack_size(128 * 1024) - .num_threads(self.threads) - .thread_name(|idx| format!("dua-fs-walk-{idx}")) - .build() - .expect("fields we set cannot fail") - .into(), - busy_timeout: None, - }, - }) - } -} - pub enum FlowControl { Continue, Abort, diff --git a/src/traverse.rs b/src/traverse.rs index 1d42e53..8093520 100644 --- a/src/traverse.rs +++ b/src/traverse.rs @@ -1,4 +1,6 @@ -use crate::{crossdev, file_size_on_disk, get_size_or_panic, InodeFilter, Throttle, WalkOptions}; +use crate::{ + crossdev, file_size_on_disk, get_size_or_panic, FlowControl, InodeFilter, Throttle, WalkOptions, +}; use ::moonwalk::{DirEntry, WalkState}; use parking_lot::Mutex; use petgraph::{graph::NodeIndex, stable_graph::StableGraph, Directed, Direction}; @@ -74,13 +76,13 @@ impl Traversal { }; #[derive(Clone)] - struct Delegate { + struct Delegate<'a> { tree: Arc<Mutex<Tree>>, - io_errors: Arc<AtomicU64>, + io_errors: &'a AtomicU64, results: std::sync::mpsc::Sender<()>, count_hard_links: bool, apparent_size: bool, - inodes: Arc<InodeFilter>, + inodes: &'a InodeFilter, } fn compute_file_size( @@ -100,7 +102,7 @@ impl Traversal { } } - impl ::moonwalk::VisitorParallel for Delegate { + impl<'b> ::moonwalk::VisitorParallel for Delegate<'b> { type State = (TreeIndex, AtomicU64); fn visit<'a>( @@ -122,7 +124,7 @@ impl Traversal { m, self.count_hard_links, self.apparent_size, - &self.inodes, + self.inodes, ), Err(_) => { self.io_errors.fetch_add(1, Ordering::SeqCst); @@ -171,8 +173,8 @@ impl Traversal { } } - let inodes = Arc::new(InodeFilter::default()); - let io_errors = Arc::new(AtomicU64::default()); + let inodes = InodeFilter::default(); + let io_errors = AtomicU64::default(); let throttle = Throttle::new(Duration::from_millis(50), None); if walk_options.threads == 0 { @@ -189,65 +191,70 @@ impl Traversal { continue; } }; - - let (rx, traversal_root_idx) = { - let (tx, rx) = std::sync::mpsc::channel(); - if !meta.is_dir() { - // moonwalk will fail to traverse non-dirs, so we have to fill in what it would do. - tx.send(()).ok(); - } - let traversal_root_idx = { - let tree = &mut t.tree.lock(); - let traversal_root_idx = tree.add_node(EntryData { - name: path.clone(), - size: compute_file_size( - &meta.into(), - walk_options.count_hard_links, - walk_options.apparent_size, - &inodes, - ), - ..Default::default() + let flow = std::thread::scope(|scope| -> anyhow::Result<_> { + let (rx, traversal_root_idx) = { + let (tx, rx) = std::sync::mpsc::channel(); + if !meta.is_dir() { + // moonwalk will fail to traverse non-dirs, so we have to fill in what it would do. + tx.send(()).ok(); + } + let traversal_root_idx = { + let tree = &mut t.tree.lock(); + let traversal_root_idx = tree.add_node(EntryData { + name: path.clone(), + size: compute_file_size( + &meta.into(), + walk_options.count_hard_links, + walk_options.apparent_size, + &inodes, + ), + ..Default::default() + }); + tree.add_edge(t.root_index, traversal_root_idx, ()); + traversal_root_idx + }; + + scope.spawn({ + let walk_options = walk_options.clone(); + let path = path.clone(); + let tree = t.tree.clone(); + let io_errors = &io_errors; + let inodes = &inodes; + move || { + walk_options.moonwalk_from_path_2( + path.as_ref(), + device_id, + Delegate { + tree, + io_errors, + results: tx, + apparent_size: walk_options.apparent_size, + count_hard_links: walk_options.count_hard_links, + inodes, + }, + (traversal_root_idx, 0.into()), + ) + } }); - tree.add_edge(t.root_index, traversal_root_idx, ()); - traversal_root_idx + (rx, traversal_root_idx) }; - std::thread::spawn({ - let walk_options = walk_options.clone(); - let tx = tx.clone(); - let path = path.clone(); - let tree = t.tree.clone(); - let io_errors = io_errors.clone(); - let inodes = inodes.clone(); - move || { - walk_options.moonwalk_from_path_2( - path.as_ref(), - device_id, - Delegate { - tree, - io_errors, - results: tx, - apparent_size: walk_options.apparent_size, - count_hard_links: walk_options.count_hard_links, - inodes: inodes.clone(), - }, - (traversal_root_idx, 0.into()), - ) + for () in rx { + t.entries_traversed += 1; + if throttle.can_update() && update(&mut t)? { + return Ok(FlowControl::Abort); } - }); - (rx, traversal_root_idx) - }; + } - for () in rx { - t.entries_traversed += 1; - if throttle.can_update() && update(&mut t)? { - return Ok(None); + let root_size = t.recompute_size_by_aggregating_children(traversal_root_idx); + if root_size != 0 { + set_size_or_panic(&mut t.tree.lock(), traversal_root_idx, root_size); } - } - let root_size = t.recompute_size_by_aggregating_children(traversal_root_idx); - if root_size != 0 { - set_size_or_panic(&mut t.tree.lock(), traversal_root_idx, root_size); + Ok(FlowControl::Continue) + })?; + if matches!(flow, FlowControl::Abort) { + return Ok(None); } } |