diff options
-rw-r--r-- | src/common.rs | 4 | ||||
-rw-r--r-- | src/interactive/app/eventloop.rs | 1 | ||||
-rw-r--r-- | src/traverse.rs | 435 |
3 files changed, 434 insertions, 6 deletions
diff --git a/src/common.rs b/src/common.rs index 014dc85..2a5947c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -260,7 +260,7 @@ mod moonwalk { &self, root: &Path, root_device_id: u64, - update: impl Fn(std::io::Result<&mut DirEntry<'_>>, Option<usize>) -> FlowControl + update: impl FnMut(std::io::Result<&mut DirEntry<'_>>, Option<usize>) -> FlowControl + Send + Clone, needs_depth: bool, @@ -292,7 +292,7 @@ mod moonwalk { impl<CB> moonwalk::VisitorParallel for Delegate<CB> where - CB: for<'a> Fn(std::io::Result<&'a mut DirEntry>, Option<usize>) -> FlowControl + CB: for<'a> FnMut(std::io::Result<&'a mut DirEntry>, Option<usize>) -> FlowControl + Send + Clone, { diff --git a/src/interactive/app/eventloop.rs b/src/interactive/app/eventloop.rs index fb4faa6..62c0661 100644 --- a/src/interactive/app/eventloop.rs +++ b/src/interactive/app/eventloop.rs @@ -6,6 +6,7 @@ use crate::interactive::{ }; use anyhow::Result; use crosstermion::input::{input_channel, Event, Key}; +use dua::traverse::ThreadSafeTraversal; use dua::{ traverse::{Traversal, TreeIndex}, WalkOptions, WalkResult, diff --git a/src/traverse.rs b/src/traverse.rs index 2c8a2bc..a5b23ac 100644 --- a/src/traverse.rs +++ b/src/traverse.rs @@ -1,7 +1,13 @@ -use crate::{crossdev, get_size_or_panic, InodeFilter, Throttle, WalkOptions}; -use anyhow::Result; +use crate::{ + crossdev, file_size_on_disk, get_size_or_panic, FlowControl, InodeFilter, Throttle, WalkOptions, +}; use filesize::PathExt; +use moonwalk::DirEntry; +use parking_lot::Mutex; use petgraph::{graph::NodeIndex, stable_graph::StableGraph, Directed, Direction}; +use std::ffi::OsString; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::{ fs::Metadata, io, @@ -44,8 +50,8 @@ impl Traversal { pub fn from_walk( mut walk_options: WalkOptions, input: Vec<PathBuf>, - mut update: impl FnMut(&mut Traversal) -> Result<bool>, - ) -> Result<Option<Traversal>> { + mut update: impl FnMut(&mut Traversal) -> anyhow::Result<bool>, + ) -> anyhow::Result<Option<Traversal>> { fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) { tree.node_weight_mut(node_idx) .expect("node for parent index we just retrieved") @@ -219,4 +225,425 @@ impl Traversal { .map(|idx| get_size_or_panic(&self.tree, idx)) .sum() } + + pub fn from_moonwalk( + mut walk_options: WalkOptions, + input: Vec<PathBuf>, + mut update: impl FnMut(&mut Traversal) -> anyhow::Result<bool>, + ) -> anyhow::Result<Option<Traversal>> { + fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) { + tree.node_weight_mut(node_idx) + .expect("node for parent index we just retrieved") + .size = current_size_at_depth; + } + fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex { + tree.neighbors_directed(parent_node_idx, Direction::Incoming) + .next() + .expect("every node in the iteration has a parent") + } + fn pop_or_panic(v: &mut Vec<u64>) -> u64 { + v.pop().expect("sizes per level to be in sync with graph") + } + + let mut t = { + let mut tree = Tree::new(); + let root_index = tree.add_node(EntryData::default()); + Traversal { + tree, + root_index, + ..Default::default() + } + }; + + let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index); + let mut sizes_per_depth_level = Vec::new(); + let mut current_size_at_depth: u64 = 0; + let mut previous_depth = 0; + let inodes = InodeFilter::default(); + + let throttle = Throttle::new(Duration::from_millis(100), None); + if walk_options.threads == 0 { + // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done. + // Also means that we will spin up a bunch of threads per root path, instead of reusing them. + walk_options.threads = num_cpus::get(); + } + + #[cfg(not(windows))] + fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + name.size_on_disk_fast(meta) + } + #[cfg(windows)] + fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + parent.join(name).size_on_disk_fast(meta) + } + + for path in input.into_iter() { + let (device_id, _meta) = match crossdev::init(path.as_ref()) { + Ok(id) => id, + Err(_) => { + t.io_errors += 1; + continue; + } + }; + + let rx = { + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn({ + let walk_options = walk_options.clone(); + let tx = tx.clone(); + let path = path.clone(); + move || { + walk_options.moonwalk_from_path( + path.as_ref(), + device_id, + { + move |entry, depth| { + let depth = depth.expect("BUG: set in options"); + if tx.send((OwnEntry::new(entry), depth)).is_err() { + FlowControl::Abort + } else { + FlowControl::Continue + } + } + }, + true, + ) + } + }); + rx + }; + + for (entry, depth) in rx { + t.entries_traversed += 1; + let mut data = EntryData::default(); + match entry { + Ok(entry) => { + data.name = if depth < 1 { + path.clone() + } else { + entry.file_name.into() + }; + let file_size = match entry.metadata.as_ref() { + Ok(m) + if !m.is_dir() + && (walk_options.count_hard_links + || inodes.is_first_moonwalk(m)) => + { + if walk_options.apparent_size { + m.len() + } else { + file_size_on_disk(m) + } + } + Ok(_) => 0, + Err(_) => { + t.io_errors += 1; + data.metadata_io_error = true; + 0 + } + }; + + match (depth, previous_depth) { + (n, p) if n > p => { + sizes_per_depth_level.push(current_size_at_depth); + current_size_at_depth = file_size; + parent_node_idx = previous_node_idx; + } + (n, p) if n < p => { + for _ in n..p { + set_size_or_panic( + &mut t.tree, + parent_node_idx, + current_size_at_depth, + ); + current_size_at_depth += + pop_or_panic(&mut sizes_per_depth_level); + parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); + } + current_size_at_depth += file_size; + set_size_or_panic( + &mut t.tree, + parent_node_idx, + current_size_at_depth, + ); + } + _ => { + current_size_at_depth += file_size; + } + }; + + data.size = file_size; + let entry_index = t.tree.add_node(data); + + t.tree.add_edge(parent_node_idx, entry_index, ()); + previous_node_idx = entry_index; + previous_depth = depth; + } + Err(_) => { + if previous_depth == 0 { + data.name = path.clone(); + let entry_index = t.tree.add_node(data); + t.tree.add_edge(parent_node_idx, entry_index, ()); + } + + t.io_errors += 1 + } + } + + if throttle.can_update() && update(&mut t)? { + return Ok(None); + } + } + } + + sizes_per_depth_level.push(current_size_at_depth); + current_size_at_depth = 0; + for _ in 0..previous_depth { + current_size_at_depth += pop_or_panic(&mut sizes_per_depth_level); + set_size_or_panic(&mut t.tree, parent_node_idx, current_size_at_depth); + parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); + } + let root_size = t.recompute_root_size(); + set_size_or_panic(&mut t.tree, t.root_index, root_size); + t.total_bytes = Some(root_size); + + Ok(Some(t)) + } +} + +struct OwnEntry { + file_name: OsString, + metadata: std::io::Result<moonwalk::Metadata>, +} + +impl OwnEntry { + fn new(dent: std::io::Result<&mut DirEntry<'_>>) -> std::io::Result<Self> { + dent.map(|dent| OwnEntry { + file_name: dent.file_name().to_owned(), + metadata: dent.metadata().map(ToOwned::to_owned), + }) + } +} + +/// The result of the previous filesystem traversal +#[derive(Default, Debug)] +pub struct ThreadSafeTraversal { + /// A tree representing the entire filestem traversal + pub tree: Arc<Mutex<Tree>>, + /// The top-level node of the tree. + pub root_index: TreeIndex, + /// Amount of files or directories we have seen during the filesystem traversal + pub entries_traversed: AtomicU64, + /// Total amount of IO errors encountered when traversing the filesystem + pub io_errors: AtomicU64, + /// Total amount of bytes seen during the traversal + pub total_bytes: AtomicU64, +} + +impl From<ThreadSafeTraversal> for Traversal { + fn from(t: ThreadSafeTraversal) -> Self { + Traversal { + tree: match Arc::try_unwrap(t.tree) { + Ok(v) => v.into_inner(), + Err(_) => unreachable!("BUG: no thread is using this anymore, single ownership"), + }, + root_index: t.root_index, + entries_traversed: t.entries_traversed.load(Ordering::Relaxed), + io_errors: t.io_errors.load(Ordering::Relaxed), + total_bytes: Some(t.total_bytes.load(Ordering::Relaxed)), + } + } +} + +impl ThreadSafeTraversal { + pub fn from_walk( + mut walk_options: WalkOptions, + input: Vec<PathBuf>, + update: impl FnMut(&ThreadSafeTraversal) -> anyhow::Result<bool> + Send + Clone, + ) -> anyhow::Result<Option<Traversal>> { + fn set_size_or_panic(tree: &mut Tree, node_idx: TreeIndex, current_size_at_depth: u64) { + tree.node_weight_mut(node_idx) + .expect("node for parent index we just retrieved") + .size = current_size_at_depth; + } + fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex { + tree.neighbors_directed(parent_node_idx, Direction::Incoming) + .next() + .expect("every node in the iteration has a parent") + } + fn pop_or_panic(v: &mut Vec<u64>) -> u64 { + v.pop().expect("sizes per level to be in sync with graph") + } + + let t = { + let mut tree = Tree::new(); + let root_index = tree.add_node(EntryData::default()); + ThreadSafeTraversal { + tree: Arc::new(tree.into()), + root_index, + ..Default::default() + } + }; + + let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index); + let sizes_per_depth_level = Arc::new(Mutex::new(Vec::new())); + let mut current_size_at_depth: u64 = 0; + let mut previous_depth = 0; + let inodes = Arc::new(InodeFilter::default()); + + let throttle = Throttle::new(Duration::from_millis(250), None); + if walk_options.threads == 0 { + // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done. + // Also means that we will spin up a bunch of threads per root path, instead of reusing them. + walk_options.threads = num_cpus::get(); + } + + #[cfg(not(windows))] + fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + name.size_on_disk_fast(meta) + } + #[cfg(windows)] + fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + parent.join(name).size_on_disk_fast(meta) + } + + for path in input.into_iter() { + let (device_id, _meta) = match crossdev::init(path.as_ref()) { + Ok(id) => id, + Err(_) => { + t.io_errors.fetch_add(1, Ordering::SeqCst); + continue; + } + }; + walk_options.moonwalk_from_path( + path.as_ref(), + device_id, + { + let t = &t; + let inodes = inodes.clone(); + let throttle = &throttle; + let count_hard_links = walk_options.count_hard_links; + let apparent_size = walk_options.apparent_size; + let path = path.clone(); + let mut update = update.clone(); + let sizes_per_depth_level = Arc::clone(&sizes_per_depth_level); + move |entry, depth| { + t.entries_traversed.fetch_add(1, Ordering::SeqCst); + let mut data = EntryData::default(); + let depth = depth.expect("BUG: depth configured but not present"); + match entry { + Ok(entry) => { + data.name = if depth < 1 { + path.clone() + } else { + entry.file_name().into() + }; + let file_size = match &entry.metadata() { + Ok(ref m) + if (count_hard_links || inodes.is_first_moonwalk(m)) => + { + if apparent_size { + m.len() + } else { + file_size_on_disk(m) + } + } + Ok(_) => 0, + Err(_) => { + t.io_errors.fetch_add(1, Ordering::SeqCst); + data.metadata_io_error = true; + 0 + } + }; + + match (depth, previous_depth) { + (n, p) if n > p => { + sizes_per_depth_level.lock().push(current_size_at_depth); + current_size_at_depth = file_size; + parent_node_idx = previous_node_idx; + } + (n, p) if n < p => { + for _ in n..p { + set_size_or_panic( + &mut t.tree.lock(), + parent_node_idx, + current_size_at_depth, + ); + current_size_at_depth += + pop_or_panic(&mut sizes_per_depth_level.lock()); + parent_node_idx = parent_or_panic( + &mut t.tree.lock(), + parent_node_idx, + ); + } + current_size_at_depth += file_size; + set_size_or_panic( + &mut t.tree.lock(), + parent_node_idx, + current_size_at_depth, + ); + } + _ => { + current_size_at_depth += file_size; + } + }; + + data.size = file_size; + let tree = &mut t.tree.lock(); + let entry_index = tree.add_node(data); + + tree.add_edge(parent_node_idx, entry_index, ()); + previous_node_idx = entry_index; + previous_depth = depth; + } + Err(_err) => { + if previous_depth == 0 { + data.name = path.clone(); + let tree = &mut t.tree.lock(); + let entry_index = tree.add_node(data); + tree.add_edge(parent_node_idx, entry_index, ()); + } + + t.io_errors.fetch_add(1, Ordering::SeqCst); + } + } + + if throttle.can_update() && update(t).unwrap_or(false) { + FlowControl::Abort + } else { + FlowControl::Continue + } + } + }, + true, + )?; + } + + { + let mut sizes_per_depth_level = match Arc::try_unwrap(sizes_per_depth_level) { + Ok(s) => s.into_inner(), + Err(_) => unreachable!("all threads have terminated by now"), + }; + sizes_per_depth_level.push(current_size_at_depth); + current_size_at_depth = 0; + let tree = &mut t.tree.lock(); + for _ in 0..previous_depth { + current_size_at_depth += pop_or_panic(&mut sizes_per_depth_level); + set_size_or_panic(tree, parent_node_idx, current_size_at_depth); + parent_node_idx = parent_or_panic(tree, parent_node_idx); + } + } + let root_size = t.recompute_root_size(); + set_size_or_panic(&mut t.tree.lock(), t.root_index, root_size); + t.total_bytes.store(root_size, Ordering::Relaxed); + + Ok(Some(t.into())) + } + + fn recompute_root_size(&self) -> u64 { + let tree = self.tree.lock(); + tree.neighbors_directed(self.root_index, Direction::Outgoing) + .map(|idx| get_size_or_panic(&tree, idx)) + .sum() + } } |