summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/common.rs4
-rw-r--r--src/interactive/app/eventloop.rs1
-rw-r--r--src/traverse.rs435
3 files changed, 434 insertions, 6 deletions
diff --git a/src/common.rs b/src/common.rs
index 014dc85..2a5947c 100644
--- a/src/common.rs
+++ b/src/common.rs
@@ -260,7 +260,7 @@ mod moonwalk {
&self,
root: &Path,
root_device_id: u64,
- update: impl Fn(std::io::Result<&mut DirEntry<'_>>, Option<usize>) -> FlowControl
+ update: impl FnMut(std::io::Result<&mut DirEntry<'_>>, Option<usize>) -> FlowControl
+ Send
+ Clone,
needs_depth: bool,
@@ -292,7 +292,7 @@ mod moonwalk {
impl<CB> moonwalk::VisitorParallel for Delegate<CB>
where
- CB: for<'a> Fn(std::io::Result<&'a mut DirEntry>, Option<usize>) -> FlowControl
+ CB: for<'a> FnMut(std::io::Result<&'a mut DirEntry>, Option<usize>) -> FlowControl
+ Send
+ Clone,
{
diff --git a/src/interactive/app/eventloop.rs b/src/interactive/app/eventloop.rs
index fb4faa6..62c0661 100644
--- a/src/interactive/app/eventloop.rs
+++ b/src/interactive/app/eventloop.rs
@@ -6,6 +6,7 @@ use crate::interactive::{
};
use anyhow::Result;
use crosstermion::input::{input_channel, Event, Key};
+use dua::traverse::ThreadSafeTraversal;
use dua::{
traverse::{Traversal, TreeIndex},
WalkOptions, WalkResult,
diff --git a/src/traverse.rs b/src/traverse.rs
index 2c8a2bc..a5b23ac 100644
--- a/src/traverse.rs
+++ b/src/traverse.rs
@@ -1,7 +1,13 @@
-use crate::{crossdev, get_size_or_panic, InodeFilter, Throttle, WalkOptions};
-use anyhow::Result;
+use crate::{
+ crossdev, file_size_on_disk, get_size_or_panic, FlowControl, InodeFilter, Throttle, WalkOptions,
+};
use filesize::PathExt;
+use moonwalk::DirEntry;
+use parking_lot::Mutex;
use petgraph::{graph::NodeIndex, stable_graph::StableGraph, Directed, Direction};
+use std::ffi::OsString;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::Arc;
use std::{
fs::Metadata,
io,
@@ -44,8 +50,8 @@ impl Traversal {
pub fn from_walk(
mut walk_options: WalkOptions,
input: Vec<PathBuf>,
- mut update: impl FnMut(&mut Traversal) -> Result<bool>,
- ) -> Result<Option<Traversal>> {
+ mut update: impl FnMut(&mut Traversal) -> anyhow::Result<bool>,
+ ) -> anyhow::Result<Option<Traversal>> {
fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) {
tree.node_weight_mut(node_idx)
.expect("node for parent index we just retrieved")
@@ -219,4 +225,425 @@ impl Traversal {
.map(|idx| get_size_or_panic(&self.tree, idx))
.sum()
}
+
+ pub fn from_moonwalk(
+ mut walk_options: WalkOptions,
+ input: Vec<PathBuf>,
+ mut update: impl FnMut(&mut Traversal) -> anyhow::Result<bool>,
+ ) -> anyhow::Result<Option<Traversal>> {
+ fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) {
+ tree.node_weight_mut(node_idx)
+ .expect("node for parent index we just retrieved")
+ .size = current_size_at_depth;
+ }
+ fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex {
+ tree.neighbors_directed(parent_node_idx, Direction::Incoming)
+ .next()
+ .expect("every node in the iteration has a parent")
+ }
+ fn pop_or_panic(v: &mut Vec<u64>) -> u64 {
+ v.pop().expect("sizes per level to be in sync with graph")
+ }
+
+ let mut t = {
+ let mut tree = Tree::new();
+ let root_index = tree.add_node(EntryData::default());
+ Traversal {
+ tree,
+ root_index,
+ ..Default::default()
+ }
+ };
+
+ let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index);
+ let mut sizes_per_depth_level = Vec::new();
+ let mut current_size_at_depth: u64 = 0;
+ let mut previous_depth = 0;
+ let inodes = InodeFilter::default();
+
+ let throttle = Throttle::new(Duration::from_millis(100), None);
+ if walk_options.threads == 0 {
+ // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done.
+ // Also means that we will spin up a bunch of threads per root path, instead of reusing them.
+ walk_options.threads = num_cpus::get();
+ }
+
+ #[cfg(not(windows))]
+ fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ name.size_on_disk_fast(meta)
+ }
+ #[cfg(windows)]
+ fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ parent.join(name).size_on_disk_fast(meta)
+ }
+
+ for path in input.into_iter() {
+ let (device_id, _meta) = match crossdev::init(path.as_ref()) {
+ Ok(id) => id,
+ Err(_) => {
+ t.io_errors += 1;
+ continue;
+ }
+ };
+
+ let rx = {
+ let (tx, rx) = std::sync::mpsc::channel();
+ std::thread::spawn({
+ let walk_options = walk_options.clone();
+ let tx = tx.clone();
+ let path = path.clone();
+ move || {
+ walk_options.moonwalk_from_path(
+ path.as_ref(),
+ device_id,
+ {
+ move |entry, depth| {
+ let depth = depth.expect("BUG: set in options");
+ if tx.send((OwnEntry::new(entry), depth)).is_err() {
+ FlowControl::Abort
+ } else {
+ FlowControl::Continue
+ }
+ }
+ },
+ true,
+ )
+ }
+ });
+ rx
+ };
+
+ for (entry, depth) in rx {
+ t.entries_traversed += 1;
+ let mut data = EntryData::default();
+ match entry {
+ Ok(entry) => {
+ data.name = if depth < 1 {
+ path.clone()
+ } else {
+ entry.file_name.into()
+ };
+ let file_size = match entry.metadata.as_ref() {
+ Ok(m)
+ if !m.is_dir()
+ && (walk_options.count_hard_links
+ || inodes.is_first_moonwalk(m)) =>
+ {
+ if walk_options.apparent_size {
+ m.len()
+ } else {
+ file_size_on_disk(m)
+ }
+ }
+ Ok(_) => 0,
+ Err(_) => {
+ t.io_errors += 1;
+ data.metadata_io_error = true;
+ 0
+ }
+ };
+
+ match (depth, previous_depth) {
+ (n, p) if n > p => {
+ sizes_per_depth_level.push(current_size_at_depth);
+ current_size_at_depth = file_size;
+ parent_node_idx = previous_node_idx;
+ }
+ (n, p) if n < p => {
+ for _ in n..p {
+ set_size_or_panic(
+ &mut t.tree,
+ parent_node_idx,
+ current_size_at_depth,
+ );
+ current_size_at_depth +=
+ pop_or_panic(&mut sizes_per_depth_level);
+ parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
+ }
+ current_size_at_depth += file_size;
+ set_size_or_panic(
+ &mut t.tree,
+ parent_node_idx,
+ current_size_at_depth,
+ );
+ }
+ _ => {
+ current_size_at_depth += file_size;
+ }
+ };
+
+ data.size = file_size;
+ let entry_index = t.tree.add_node(data);
+
+ t.tree.add_edge(parent_node_idx, entry_index, ());
+ previous_node_idx = entry_index;
+ previous_depth = depth;
+ }
+ Err(_) => {
+ if previous_depth == 0 {
+ data.name = path.clone();
+ let entry_index = t.tree.add_node(data);
+ t.tree.add_edge(parent_node_idx, entry_index, ());
+ }
+
+ t.io_errors += 1
+ }
+ }
+
+ if throttle.can_update() && update(&mut t)? {
+ return Ok(None);
+ }
+ }
+ }
+
+ sizes_per_depth_level.push(current_size_at_depth);
+ current_size_at_depth = 0;
+ for _ in 0..previous_depth {
+ current_size_at_depth += pop_or_panic(&mut sizes_per_depth_level);
+ set_size_or_panic(&mut t.tree, parent_node_idx, current_size_at_depth);
+ parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
+ }
+ let root_size = t.recompute_root_size();
+ set_size_or_panic(&mut t.tree, t.root_index, root_size);
+ t.total_bytes = Some(root_size);
+
+ Ok(Some(t))
+ }
+}
+
+struct OwnEntry {
+ file_name: OsString,
+ metadata: std::io::Result<moonwalk::Metadata>,
+}
+
+impl OwnEntry {
+ fn new(dent: std::io::Result<&mut DirEntry<'_>>) -> std::io::Result<Self> {
+ dent.map(|dent| OwnEntry {
+ file_name: dent.file_name().to_owned(),
+ metadata: dent.metadata().map(ToOwned::to_owned),
+ })
+ }
+}
+
+/// The result of the previous filesystem traversal
+#[derive(Default, Debug)]
+pub struct ThreadSafeTraversal {
+ /// A tree representing the entire filestem traversal
+ pub tree: Arc<Mutex<Tree>>,
+ /// The top-level node of the tree.
+ pub root_index: TreeIndex,
+ /// Amount of files or directories we have seen during the filesystem traversal
+ pub entries_traversed: AtomicU64,
+ /// Total amount of IO errors encountered when traversing the filesystem
+ pub io_errors: AtomicU64,
+ /// Total amount of bytes seen during the traversal
+ pub total_bytes: AtomicU64,
+}
+
+impl From<ThreadSafeTraversal> for Traversal {
+ fn from(t: ThreadSafeTraversal) -> Self {
+ Traversal {
+ tree: match Arc::try_unwrap(t.tree) {
+ Ok(v) => v.into_inner(),
+ Err(_) => unreachable!("BUG: no thread is using this anymore, single ownership"),
+ },
+ root_index: t.root_index,
+ entries_traversed: t.entries_traversed.load(Ordering::Relaxed),
+ io_errors: t.io_errors.load(Ordering::Relaxed),
+ total_bytes: Some(t.total_bytes.load(Ordering::Relaxed)),
+ }
+ }
+}
+
+impl ThreadSafeTraversal {
+ pub fn from_walk(
+ mut walk_options: WalkOptions,
+ input: Vec<PathBuf>,
+ update: impl FnMut(&ThreadSafeTraversal) -> anyhow::Result<bool> + Send + Clone,
+ ) -> anyhow::Result<Option<Traversal>> {
+ fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) {
+ tree.node_weight_mut(node_idx)
+ .expect("node for parent index we just retrieved")
+ .size = current_size_at_depth;
+ }
+ fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex {
+ tree.neighbors_directed(parent_node_idx, Direction::Incoming)
+ .next()
+ .expect("every node in the iteration has a parent")
+ }
+ fn pop_or_panic(v: &mut Vec<u64>) -> u64 {
+ v.pop().expect("sizes per level to be in sync with graph")
+ }
+
+ let t = {
+ let mut tree = Tree::new();
+ let root_index = tree.add_node(EntryData::default());
+ ThreadSafeTraversal {
+ tree: Arc::new(tree.into()),
+ root_index,
+ ..Default::default()
+ }
+ };
+
+ let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index);
+ let sizes_per_depth_level = Arc::new(Mutex::new(Vec::new()));
+ let mut current_size_at_depth: u64 = 0;
+ let mut previous_depth = 0;
+ let inodes = Arc::new(InodeFilter::default());
+
+ let throttle = Throttle::new(Duration::from_millis(250), None);
+ if walk_options.threads == 0 {
+ // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done.
+ // Also means that we will spin up a bunch of threads per root path, instead of reusing them.
+ walk_options.threads = num_cpus::get();
+ }
+
+ #[cfg(not(windows))]
+ fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ name.size_on_disk_fast(meta)
+ }
+ #[cfg(windows)]
+ fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ parent.join(name).size_on_disk_fast(meta)
+ }
+
+ for path in input.into_iter() {
+ let (device_id, _meta) = match crossdev::init(path.as_ref()) {
+ Ok(id) => id,
+ Err(_) => {
+ t.io_errors.fetch_add(1, Ordering::SeqCst);
+ continue;
+ }
+ };
+ walk_options.moonwalk_from_path(
+ path.as_ref(),
+ device_id,
+ {
+ let t = &t;
+ let inodes = inodes.clone();
+ let throttle = &throttle;
+ let count_hard_links = walk_options.count_hard_links;
+ let apparent_size = walk_options.apparent_size;
+ let path = path.clone();
+ let mut update = update.clone();
+ let sizes_per_depth_level = Arc::clone(&sizes_per_depth_level);
+ move |entry, depth| {
+ t.entries_traversed.fetch_add(1, Ordering::SeqCst);
+ let mut data = EntryData::default();
+ let depth = depth.expect("BUG: depth configured but not present");
+ match entry {
+ Ok(entry) => {
+ data.name = if depth < 1 {
+ path.clone()
+ } else {
+ entry.file_name().into()
+ };
+ let file_size = match &entry.metadata() {
+ Ok(ref m)
+ if (count_hard_links || inodes.is_first_moonwalk(m)) =>
+ {
+ if apparent_size {
+ m.len()
+ } else {
+ file_size_on_disk(m)
+ }
+ }
+ Ok(_) => 0,
+ Err(_) => {
+ t.io_errors.fetch_add(1, Ordering::SeqCst);
+ data.metadata_io_error = true;
+ 0
+ }
+ };
+
+ match (depth, previous_depth) {
+ (n, p) if n > p => {
+ sizes_per_depth_level.lock().push(current_size_at_depth);
+ current_size_at_depth = file_size;
+ parent_node_idx = previous_node_idx;
+ }
+ (n, p) if n < p => {
+ for _ in n..p {
+ set_size_or_panic(
+ &mut t.tree.lock(),
+ parent_node_idx,
+ current_size_at_depth,
+ );
+ current_size_at_depth +=
+ pop_or_panic(&mut sizes_per_depth_level.lock());
+ parent_node_idx = parent_or_panic(
+ &mut t.tree.lock(),
+ parent_node_idx,
+ );
+ }
+ current_size_at_depth += file_size;
+ set_size_or_panic(
+ &mut t.tree.lock(),
+ parent_node_idx,
+ current_size_at_depth,
+ );
+ }
+ _ => {
+ current_size_at_depth += file_size;
+ }
+ };
+
+ data.size = file_size;
+ let tree = &mut t.tree.lock();
+ let entry_index = tree.add_node(data);
+
+ tree.add_edge(parent_node_idx, entry_index, ());
+ previous_node_idx = entry_index;
+ previous_depth = depth;
+ }
+ Err(_err) => {
+ if previous_depth == 0 {
+ data.name = path.clone();
+ let tree = &mut t.tree.lock();
+ let entry_index = tree.add_node(data);
+ tree.add_edge(parent_node_idx, entry_index, ());
+ }
+
+ t.io_errors.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ if throttle.can_update() && update(t).unwrap_or(false) {
+ FlowControl::Abort
+ } else {
+ FlowControl::Continue
+ }
+ }
+ },
+ true,
+ )?;
+ }
+
+ {
+ let mut sizes_per_depth_level = match Arc::try_unwrap(sizes_per_depth_level) {
+ Ok(s) => s.into_inner(),
+ Err(_) => unreachable!("all threads have terminated by now"),
+ };
+ sizes_per_depth_level.push(current_size_at_depth);
+ current_size_at_depth = 0;
+ let tree = &mut t.tree.lock();
+ for _ in 0..previous_depth {
+ current_size_at_depth += pop_or_panic(&mut sizes_per_depth_level);
+ set_size_or_panic(tree, parent_node_idx, current_size_at_depth);
+ parent_node_idx = parent_or_panic(tree, parent_node_idx);
+ }
+ }
+ let root_size = t.recompute_root_size();
+ set_size_or_panic(&mut t.tree.lock(), t.root_index, root_size);
+ t.total_bytes.store(root_size, Ordering::Relaxed);
+
+ Ok(Some(t.into()))
+ }
+
+ fn recompute_root_size(&self) -> u64 {
+ let tree = self.tree.lock();
+ tree.neighbors_directed(self.root_index, Direction::Outgoing)
+ .map(|idx| get_size_or_panic(&tree, idx))
+ .sum()
+ }
}