summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSebastian Thiel <sebastian.thiel@icloud.com>2023-05-07 09:05:00 +0200
committerSebastian Thiel <sebastian.thiel@icloud.com>2023-05-07 09:18:28 +0200
commita0a1dbd95fadae50fec894e50bb2c29c41a69bc5 (patch)
tree55f092205b8ea7bf9e8b4655d61d420a3582e49a
parentb7fd4dedecb015681fe5a5162b0591c17a462550 (diff)
refactor
Cleanup and improvements
-rw-r--r--src/aggregate.rs5
-rw-r--r--src/common.rs57
-rw-r--r--src/traverse.rs127
3 files changed, 70 insertions, 119 deletions
diff --git a/src/aggregate.rs b/src/aggregate.rs
index 91a0c94..1cad08e 100644
--- a/src/aggregate.rs
+++ b/src/aggregate.rs
@@ -3,7 +3,6 @@ use crate::{crossdev, file_size_on_disk, FlowControl, Throttle, WalkOptions, Wal
use anyhow::Result;
use owo_colors::{AnsiColors as Color, OwoColorize};
use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
use std::time::Duration;
use std::{io, path::Path};
@@ -26,7 +25,7 @@ pub fn aggregate(
let mut total = 0;
let mut num_roots = 0;
let mut aggregates = Vec::new();
- let inodes = Arc::new(InodeFilter::default());
+ let inodes = &InodeFilter::default();
let progress = Throttle::new(Duration::from_millis(100), Duration::from_secs(1).into());
for path in paths.into_iter() {
@@ -49,7 +48,7 @@ pub fn aggregate(
};
let num_errors = AtomicU64::default();
let count_size = {
- let inodes = inodes.clone();
+ let inodes = &inodes;
let apparent_size = walk_options.apparent_size;
let count_hard_links = walk_options.count_hard_links;
let smallest_file_in_bytes = &smallest_file_in_bytes;
diff --git a/src/common.rs b/src/common.rs
index 5ee6358..0e89bd3 100644
--- a/src/common.rs
+++ b/src/common.rs
@@ -1,11 +1,10 @@
-use crate::crossdev;
use crate::traverse::{EntryData, Tree, TreeIndex};
use byte_unit::{n_gb_bytes, n_gib_bytes, n_mb_bytes, n_mib_bytes, ByteUnit};
+use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
-use std::{fmt, path::Path};
pub fn get_entry_or_panic(tree: &Tree, node_idx: TreeIndex) -> &EntryData {
tree.node_weight(node_idx)
@@ -172,60 +171,6 @@ pub struct WalkOptions {
pub ignore_dirs: Vec<PathBuf>,
}
-type WalkDir = jwalk::WalkDirGeneric<((), Option<Result<std::fs::Metadata, jwalk::Error>>)>;
-
-impl WalkOptions {
- pub fn iter_from_path(&self, root: &Path, root_device_id: u64) -> WalkDir {
- WalkDir::new(root)
- .follow_links(false)
- .sort(match self.sorting {
- TraversalSorting::None => false,
- TraversalSorting::AlphabeticalByFileName => true,
- })
- .skip_hidden(false)
- .process_read_dir({
- let ignore_dirs = self.ignore_dirs.clone();
- let cross_filesystems = self.cross_filesystems;
- move |_, _, _, dir_entry_results| {
- dir_entry_results.iter_mut().for_each(|dir_entry_result| {
- if let Ok(dir_entry) = dir_entry_result {
- let metadata = dir_entry.metadata();
-
- if dir_entry.file_type.is_file() || dir_entry.file_type().is_symlink() {
- dir_entry.client_state = Some(metadata);
- } else if dir_entry.file_type.is_dir() {
- let ok_for_fs = cross_filesystems
- || metadata
- .as_ref()
- .map(|m| crossdev::is_same_device(root_device_id, m))
- .unwrap_or(true);
- if !ok_for_fs || ignore_dirs.contains(&dir_entry.path()) {
- dir_entry.read_children_path = None;
- }
- }
- }
- })
- }
- })
- .parallelism(match self.threads {
- 0 => jwalk::Parallelism::RayonDefaultPool {
- busy_timeout: std::time::Duration::from_secs(1),
- },
- 1 => jwalk::Parallelism::Serial,
- _ => jwalk::Parallelism::RayonExistingPool {
- pool: jwalk::rayon::ThreadPoolBuilder::new()
- .stack_size(128 * 1024)
- .num_threads(self.threads)
- .thread_name(|idx| format!("dua-fs-walk-{idx}"))
- .build()
- .expect("fields we set cannot fail")
- .into(),
- busy_timeout: None,
- },
- })
- }
-}
-
pub enum FlowControl {
Continue,
Abort,
diff --git a/src/traverse.rs b/src/traverse.rs
index 1d42e53..8093520 100644
--- a/src/traverse.rs
+++ b/src/traverse.rs
@@ -1,4 +1,6 @@
-use crate::{crossdev, file_size_on_disk, get_size_or_panic, InodeFilter, Throttle, WalkOptions};
+use crate::{
+ crossdev, file_size_on_disk, get_size_or_panic, FlowControl, InodeFilter, Throttle, WalkOptions,
+};
use ::moonwalk::{DirEntry, WalkState};
use parking_lot::Mutex;
use petgraph::{graph::NodeIndex, stable_graph::StableGraph, Directed, Direction};
@@ -74,13 +76,13 @@ impl Traversal {
};
#[derive(Clone)]
- struct Delegate {
+ struct Delegate<'a> {
tree: Arc<Mutex<Tree>>,
- io_errors: Arc<AtomicU64>,
+ io_errors: &'a AtomicU64,
results: std::sync::mpsc::Sender<()>,
count_hard_links: bool,
apparent_size: bool,
- inodes: Arc<InodeFilter>,
+ inodes: &'a InodeFilter,
}
fn compute_file_size(
@@ -100,7 +102,7 @@ impl Traversal {
}
}
- impl ::moonwalk::VisitorParallel for Delegate {
+ impl<'b> ::moonwalk::VisitorParallel for Delegate<'b> {
type State = (TreeIndex, AtomicU64);
fn visit<'a>(
@@ -122,7 +124,7 @@ impl Traversal {
m,
self.count_hard_links,
self.apparent_size,
- &self.inodes,
+ self.inodes,
),
Err(_) => {
self.io_errors.fetch_add(1, Ordering::SeqCst);
@@ -171,8 +173,8 @@ impl Traversal {
}
}
- let inodes = Arc::new(InodeFilter::default());
- let io_errors = Arc::new(AtomicU64::default());
+ let inodes = InodeFilter::default();
+ let io_errors = AtomicU64::default();
let throttle = Throttle::new(Duration::from_millis(50), None);
if walk_options.threads == 0 {
@@ -189,65 +191,70 @@ impl Traversal {
continue;
}
};
-
- let (rx, traversal_root_idx) = {
- let (tx, rx) = std::sync::mpsc::channel();
- if !meta.is_dir() {
- // moonwalk will fail to traverse non-dirs, so we have to fill in what it would do.
- tx.send(()).ok();
- }
- let traversal_root_idx = {
- let tree = &mut t.tree.lock();
- let traversal_root_idx = tree.add_node(EntryData {
- name: path.clone(),
- size: compute_file_size(
- &meta.into(),
- walk_options.count_hard_links,
- walk_options.apparent_size,
- &inodes,
- ),
- ..Default::default()
+ let flow = std::thread::scope(|scope| -> anyhow::Result<_> {
+ let (rx, traversal_root_idx) = {
+ let (tx, rx) = std::sync::mpsc::channel();
+ if !meta.is_dir() {
+ // moonwalk will fail to traverse non-dirs, so we have to fill in what it would do.
+ tx.send(()).ok();
+ }
+ let traversal_root_idx = {
+ let tree = &mut t.tree.lock();
+ let traversal_root_idx = tree.add_node(EntryData {
+ name: path.clone(),
+ size: compute_file_size(
+ &meta.into(),
+ walk_options.count_hard_links,
+ walk_options.apparent_size,
+ &inodes,
+ ),
+ ..Default::default()
+ });
+ tree.add_edge(t.root_index, traversal_root_idx, ());
+ traversal_root_idx
+ };
+
+ scope.spawn({
+ let walk_options = walk_options.clone();
+ let path = path.clone();
+ let tree = t.tree.clone();
+ let io_errors = &io_errors;
+ let inodes = &inodes;
+ move || {
+ walk_options.moonwalk_from_path_2(
+ path.as_ref(),
+ device_id,
+ Delegate {
+ tree,
+ io_errors,
+ results: tx,
+ apparent_size: walk_options.apparent_size,
+ count_hard_links: walk_options.count_hard_links,
+ inodes,
+ },
+ (traversal_root_idx, 0.into()),
+ )
+ }
});
- tree.add_edge(t.root_index, traversal_root_idx, ());
- traversal_root_idx
+ (rx, traversal_root_idx)
};
- std::thread::spawn({
- let walk_options = walk_options.clone();
- let tx = tx.clone();
- let path = path.clone();
- let tree = t.tree.clone();
- let io_errors = io_errors.clone();
- let inodes = inodes.clone();
- move || {
- walk_options.moonwalk_from_path_2(
- path.as_ref(),
- device_id,
- Delegate {
- tree,
- io_errors,
- results: tx,
- apparent_size: walk_options.apparent_size,
- count_hard_links: walk_options.count_hard_links,
- inodes: inodes.clone(),
- },
- (traversal_root_idx, 0.into()),
- )
+ for () in rx {
+ t.entries_traversed += 1;
+ if throttle.can_update() && update(&mut t)? {
+ return Ok(FlowControl::Abort);
}
- });
- (rx, traversal_root_idx)
- };
+ }
- for () in rx {
- t.entries_traversed += 1;
- if throttle.can_update() && update(&mut t)? {
- return Ok(None);
+ let root_size = t.recompute_size_by_aggregating_children(traversal_root_idx);
+ if root_size != 0 {
+ set_size_or_panic(&mut t.tree.lock(), traversal_root_idx, root_size);
}
- }
- let root_size = t.recompute_size_by_aggregating_children(traversal_root_idx);
- if root_size != 0 {
- set_size_or_panic(&mut t.tree.lock(), traversal_root_idx, root_size);
+ Ok(FlowControl::Continue)
+ })?;
+ if matches!(flow, FlowControl::Abort) {
+ return Ok(None);
}
}