summaryrefslogtreecommitdiffstats
path: root/src/traverse.rs
diff options
context:
space:
mode:
authorPiotr Wach <pwach@bloomberg.net>2024-01-07 19:36:43 +0000
committerPiotr Wach <pwach@bloomberg.net>2024-01-07 19:36:43 +0000
commit51b67ff9d009a56272448d1fee1951f30b1de678 (patch)
treebbfd954803961d338426f105d468c75a2c5b2b9a /src/traverse.rs
parent13c381bebc6a64e553ec11793ec8880f868e712c (diff)
wip
Diffstat (limited to 'src/traverse.rs')
-rw-r--r--src/traverse.rs521
1 files changed, 251 insertions, 270 deletions
diff --git a/src/traverse.rs b/src/traverse.rs
index 69c1639..8838438 100644
--- a/src/traverse.rs
+++ b/src/traverse.rs
@@ -71,283 +71,254 @@ pub struct Traversal {
}
impl Traversal {
- pub fn from_walk<T>(
- mut walk_options: WalkOptions,
- input: Vec<PathBuf>,
- waker_rx: &crossbeam::channel::Receiver<T>,
- mut update: impl FnMut(&mut Traversal, Option<T>) -> Result<bool>,
- ) -> Result<Option<Traversal>> {
- #[derive(Default, Copy, Clone)]
- struct EntryInfo {
- size: u128,
- entries_count: Option<u64>,
- }
- impl EntryInfo {
- fn add_count(&mut self, other: &Self) {
- self.entries_count = match (self.entries_count, other.entries_count) {
- (Some(a), Some(b)) => Some(a + b),
- (None, Some(b)) => Some(b),
- (Some(a), None) => Some(a),
- (None, None) => None,
- };
- }
- }
- fn set_entry_info_or_panic(
- tree: &mut Tree,
- node_idx: TreeIndex,
- EntryInfo {
- size,
- entries_count,
- }: EntryInfo,
- ) {
- let node = tree
- .node_weight_mut(node_idx)
- .expect("node for parent index we just retrieved");
- node.size = size;
- node.entry_count = entries_count;
- }
- fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex {
- tree.neighbors_directed(parent_node_idx, Direction::Incoming)
- .next()
- .expect("every node in the iteration has a parent")
- }
- fn pop_or_panic(v: &mut Vec<EntryInfo>) -> EntryInfo {
- v.pop().expect("sizes per level to be in sync with graph")
- }
+ // pub fn from_walk<T>(
+ // mut walk_options: WalkOptions,
+ // input: Vec<PathBuf>,
+ // mut update: impl FnMut(&mut Traversal, Option<T>) -> Result<bool>,
+ // ) -> Result<Option<Traversal>> {
+
- let mut t = {
- let mut tree = Tree::new();
- let root_index = tree.add_node(EntryData::default());
- Traversal {
- tree,
- root_index,
- entries_traversed: 0,
- start: std::time::Instant::now(),
- elapsed: None,
- io_errors: 0,
- total_bytes: None,
- }
- };
-
- let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index);
- let mut directory_info_per_depth_level = Vec::new();
- let mut current_directory_at_depth = EntryInfo::default();
- let mut previous_depth = 0;
- let mut inodes = InodeFilter::default();
-
- let throttle = Throttle::new(Duration::from_millis(250), None);
- if walk_options.threads == 0 {
- // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done.
- // Also means that we will spin up a bunch of threads per root path, instead of reusing them.
- walk_options.threads = num_cpus::get();
- }
+ // let mut t = {
+ // let mut tree = Tree::new();
+ // let root_index = tree.add_node(EntryData::default());
+ // Traversal {
+ // tree,
+ // root_index,
+ // entries_traversed: 0,
+ // start: std::time::Instant::now(),
+ // elapsed: None,
+ // io_errors: 0,
+ // total_bytes: None,
+ // }
+ // };
- #[cfg(not(windows))]
- fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
- name.size_on_disk_fast(meta)
- }
- #[cfg(windows)]
- fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
- parent.join(name).size_on_disk_fast(meta)
- }
+ // let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index);
+ // let mut directory_info_per_depth_level = Vec::new();
+ // let mut current_directory_at_depth = EntryInfo::default();
+ // let mut previous_depth = 0;
+ // let mut inodes = InodeFilter::default();
- let (entry_tx, entry_rx) = crossbeam::channel::bounded(100);
- std::thread::Builder::new()
- .name("dua-fs-walk-dispatcher".to_string())
- .spawn({
- let walk_options = walk_options.clone();
- move || {
- for root_path in input.into_iter() {
- let device_id = match crossdev::init(root_path.as_ref()) {
- Ok(id) => id,
- Err(_) => {
- t.io_errors += 1;
- continue;
- }
- };
-
- let root_path = Arc::new(root_path);
- for entry in walk_options
- .iter_from_path(root_path.as_ref(), device_id)
- .into_iter()
- {
- if entry_tx
- .send((entry, Arc::clone(&root_path), device_id))
- .is_err()
- {
- // The channel is closed, this means the user has
- // requested to quit the app. Abort the walking.
- return;
- }
- }
- }
- }
- })?;
-
- loop {
- crossbeam::select! {
- recv(entry_rx) -> entry => {
- let Ok((entry, root_path, device_id)) = entry else {
- break;
- };
-
- t.entries_traversed += 1;
- let mut data = EntryData::default();
- match entry {
- Ok(entry) => {
- data.name = if entry.depth < 1 {
- (*root_path).clone()
- } else {
- entry.file_name.into()
- };
-
- let mut file_size = 0u128;
- let mut mtime: SystemTime = UNIX_EPOCH;
- match &entry.client_state {
- Some(Ok(ref m)) => {
- if !m.is_dir()
- && (walk_options.count_hard_links || inodes.add(m))
- && (walk_options.cross_filesystems
- || crossdev::is_same_device(device_id, m))
- {
- if walk_options.apparent_size {
- file_size = m.len() as u128;
- } else {
- file_size = size_on_disk(&entry.parent_path, &data.name, m)
- .unwrap_or_else(|_| {
- t.io_errors += 1;
- data.metadata_io_error = true;
- 0
- })
- as u128;
- }
- } else {
- data.entry_count = Some(0);
- data.is_dir = true;
- }
-
- match m.modified() {
- Ok(modified) => {
- mtime = modified;
- }
- Err(_) => {
- t.io_errors += 1;
- data.metadata_io_error = true;
- }
- }
- }
- Some(Err(_)) => {
- t.io_errors += 1;
- data.metadata_io_error = true;
- }
- None => {}
- }
-
- match (entry.depth, previous_depth) {
- (n, p) if n > p => {
- directory_info_per_depth_level.push(current_directory_at_depth);
- current_directory_at_depth = EntryInfo {
- size: file_size,
- entries_count: Some(1),
- };
- parent_node_idx = previous_node_idx;
- }
- (n, p) if n < p => {
- for _ in n..p {
- set_entry_info_or_panic(
- &mut t.tree,
- parent_node_idx,
- current_directory_at_depth,
- );
- let dir_info =
- pop_or_panic(&mut directory_info_per_depth_level);
-
- current_directory_at_depth.size += dir_info.size;
- current_directory_at_depth.add_count(&dir_info);
-
- parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
- }
- current_directory_at_depth.size += file_size;
- *current_directory_at_depth.entries_count.get_or_insert(0) += 1;
- set_entry_info_or_panic(
- &mut t.tree,
- parent_node_idx,
- current_directory_at_depth,
- );
- }
- _ => {
- current_directory_at_depth.size += file_size;
- *current_directory_at_depth.entries_count.get_or_insert(0) += 1;
- }
- };
-
- data.mtime = mtime;
- data.size = file_size;
- let entry_index = t.tree.add_node(data);
-
- t.tree.add_edge(parent_node_idx, entry_index, ());
- previous_node_idx = entry_index;
- previous_depth = entry.depth;
- }
- Err(_) => {
- if previous_depth == 0 {
- data.name = (*root_path).clone();
- let entry_index = t.tree.add_node(data);
- t.tree.add_edge(parent_node_idx, entry_index, ());
- }
-
- t.io_errors += 1
- }
- }
-
- if throttle.can_update() && update(&mut t, None)? {
- return Ok(None);
- }
- },
- recv(waker_rx) -> waker_value => {
- let Ok(waker_value) = waker_value else {
- continue;
- };
- if update(&mut t, Some(waker_value))? {
- return Ok(None);
- }
- },
- default(Duration::from_millis(250)) => {
- // No events or new entries received, but we still need
- // to keep updating the status message regularly.
- if update(&mut t, None)? {
- return Ok(None);
- }
- }
- }
- }
+ // let throttle = Throttle::new(Duration::from_millis(250), None);
+ // // if walk_options.threads == 0 {
+ // // // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done.
+ // // // Also means that we will spin up a bunch of threads per root path, instead of reusing them.
+ // // walk_options.threads = num_cpus::get();
+ // // }
- directory_info_per_depth_level.push(current_directory_at_depth);
- current_directory_at_depth = EntryInfo::default();
- for _ in 0..previous_depth {
- let dir_info = pop_or_panic(&mut directory_info_per_depth_level);
- current_directory_at_depth.size += dir_info.size;
- current_directory_at_depth.add_count(&dir_info);
+ // // #[cfg(not(windows))]
+ // // fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ // // name.size_on_disk_fast(meta)
+ // // }
+ // // #[cfg(windows)]
+ // // fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ // // parent.join(name).size_on_disk_fast(meta)
+ // // }
- set_entry_info_or_panic(&mut t.tree, parent_node_idx, current_directory_at_depth);
- parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
- }
- let root_size = t.recompute_root_size();
- set_entry_info_or_panic(
- &mut t.tree,
- t.root_index,
- EntryInfo {
- size: root_size,
- entries_count: (t.entries_traversed > 0).then_some(t.entries_traversed),
- },
- );
- t.total_bytes = Some(root_size);
+ // // enum TraversalEvents {
+ // // Entry(Result<jwalk::DirEntry<((), Option<Result<std::fs::Metadata, jwalk::Error>>)>, jwalk::Error>, Arc<PathBuf>, u64),
+ // // Finished,
+ // // }
- t.elapsed = Some(t.start.elapsed());
- Ok(Some(t))
- }
+ // let (entry_tx, entry_rx) = crossbeam::channel::bounded(100);
+ // std::thread::Builder::new()
+ // .name("dua-fs-walk-dispatcher".to_string())
+ // .spawn({
+ // let walk_options = walk_options.clone();
+ // move || {
+ // for root_path in input.into_iter() {
+ // let device_id = match crossdev::init(root_path.as_ref()) {
+ // Ok(id) => id,
+ // Err(_) => {
+ // t.io_errors += 1;
+ // continue;
+ // }
+ // };
+
+ // let root_path = Arc::new(root_path);
+ // for entry in walk_options
+ // .iter_from_path(root_path.as_ref(), device_id)
+ // .into_iter()
+ // {
+ // if entry_tx
+ // .send(TraversalEvents::Entry(entry, Arc::clone(&root_path), device_id))
+ // .is_err()
+ // {
+ // // The channel is closed, this means the user has
+ // // requested to quit the app. Abort the walking.
+ // return;
+ // }
+ // }
+ // }
+ // if entry_tx.send(TraversalEvents::Finished).is_err() {
+ // log::error!("Failed to send TraversalEvents::Finished event");
+ // }
+ // }
+ // })?;
+
+ // // loop {
+ // // crossbeam::select! {
+ // // recv(entry_rx) -> entry => {
+ // // let Ok((entry, root_path, device_id)) = entry else {
+ // // break;
+ // // };
+
+ // // t.entries_traversed += 1;
+ // // let mut data = EntryData::default();
+ // // match entry {
+ // // Ok(entry) => {
+ // // data.name = if entry.depth < 1 {
+ // // (*root_path).clone()
+ // // } else {
+ // // entry.file_name.into()
+ // // };
+
+ // // let mut file_size = 0u128;
+ // // let mut mtime: SystemTime = UNIX_EPOCH;
+ // // match &entry.client_state {
+ // // Some(Ok(ref m)) => {
+ // // if !m.is_dir()
+ // // && (walk_options.count_hard_links || inodes.add(m))
+ // // && (walk_options.cross_filesystems
+ // // || crossdev::is_same_device(device_id, m))
+ // // {
+ // // if walk_options.apparent_size {
+ // // file_size = m.len() as u128;
+ // // } else {
+ // // file_size = size_on_disk(&entry.parent_path, &data.name, m)
+ // // .unwrap_or_else(|_| {
+ // // t.io_errors += 1;
+ // // data.metadata_io_error = true;
+ // // 0
+ // // })
+ // // as u128;
+ // // }
+ // // } else {
+ // // data.entry_count = Some(0);
+ // // data.is_dir = true;
+ // // }
- fn recompute_root_size(&self) -> u128 {
+ // // match m.modified() {
+ // // Ok(modified) => {
+ // // mtime = modified;
+ // // }
+ // // Err(_) => {
+ // // t.io_errors += 1;
+ // // data.metadata_io_error = true;
+ // // }
+ // // }
+ // // }
+ // // Some(Err(_)) => {
+ // // t.io_errors += 1;
+ // // data.metadata_io_error = true;
+ // // }
+ // // None => {}
+ // // }
+
+ // // match (entry.depth, previous_depth) {
+ // // (n, p) if n > p => {
+ // // directory_info_per_depth_level.push(current_directory_at_depth);
+ // // current_directory_at_depth = EntryInfo {
+ // // size: file_size,
+ // // entries_count: Some(1),
+ // // };
+ // // parent_node_idx = previous_node_idx;
+ // // }
+ // // (n, p) if n < p => {
+ // // for _ in n..p {
+ // // set_entry_info_or_panic(
+ // // &mut t.tree,
+ // // parent_node_idx,
+ // // current_directory_at_depth,
+ // // );
+ // // let dir_info =
+ // // pop_or_panic(&mut directory_info_per_depth_level);
+
+ // // current_directory_at_depth.size += dir_info.size;
+ // // current_directory_at_depth.add_count(&dir_info);
+
+ // // parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
+ // // }
+ // // current_directory_at_depth.size += file_size;
+ // // *current_directory_at_depth.entries_count.get_or_insert(0) += 1;
+ // // set_entry_info_or_panic(
+ // // &mut t.tree,
+ // // parent_node_idx,
+ // // current_directory_at_depth,
+ // // );
+ // // }
+ // // _ => {
+ // // current_directory_at_depth.size += file_size;
+ // // *current_directory_at_depth.entries_count.get_or_insert(0) += 1;
+ // // }
+ // // };
+
+ // // data.mtime = mtime;
+ // // data.size = file_size;
+ // // let entry_index = t.tree.add_node(data);
+
+ // // t.tree.add_edge(parent_node_idx, entry_index, ());
+ // // previous_node_idx = entry_index;
+ // // previous_depth = entry.depth;
+ // // }
+ // // Err(_) => {
+ // // if previous_depth == 0 {
+ // // data.name = (*root_path).clone();
+ // // let entry_index = t.tree.add_node(data);
+ // // t.tree.add_edge(parent_node_idx, entry_index, ());
+ // // }
+
+ // // t.io_errors += 1
+ // // }
+ // // }
+
+ // // if throttle.can_update() && update(&mut t, None)? {
+ // // return Ok(None);
+ // // }
+ // // },
+ // // recv(waker_rx) -> waker_value => {
+ // // let Ok(waker_value) = waker_value else {
+ // // continue;
+ // // };
+ // // if update(&mut t, Some(waker_value))? {
+ // // return Ok(None);
+ // // }
+ // // },
+ // // default(Duration::from_millis(250)) => {
+ // // // No events or new entries received, but we still need
+ // // // to keep updating the status message regularly.
+ // // if update(&mut t, None)? {
+ // // return Ok(None);
+ // // }
+ // // }
+ // // }
+ // // }
+
+ // directory_info_per_depth_level.push(current_directory_at_depth);
+ // current_directory_at_depth = EntryInfo::default();
+ // for _ in 0..previous_depth {
+ // let dir_info = pop_or_panic(&mut directory_info_per_depth_level);
+ // current_directory_at_depth.size += dir_info.size;
+ // current_directory_at_depth.add_count(&dir_info);
+
+ // set_entry_info_or_panic(&mut t.tree, parent_node_idx, current_directory_at_depth);
+ // parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx);
+ // }
+ // let root_size = t.recompute_root_size();
+ // set_entry_info_or_panic(
+ // &mut t.tree,
+ // t.root_index,
+ // EntryInfo {
+ // size: root_size,
+ // entries_count: (t.entries_traversed > 0).then_some(t.entries_traversed),
+ // },
+ // );
+ // t.total_bytes = Some(root_size);
+
+ // t.elapsed = Some(t.start.elapsed());
+ // Ok(Some(t))
+ // }
+
+ pub fn recompute_root_size(&self) -> u128 {
self.tree
.neighbors_directed(self.root_index, Direction::Outgoing)
.map(|idx| get_size_or_panic(&self.tree, idx))
@@ -355,6 +326,16 @@ impl Traversal {
}
}
+#[cfg(not(windows))]
+pub fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ name.size_on_disk_fast(meta)
+}
+
+#[cfg(windows)]
+pub fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> {
+ parent.join(name).size_on_disk_fast(meta)
+}
+
#[cfg(test)]
mod tests {
use super::*;