diff options
author | Piotr Wach <pwach@bloomberg.net> | 2024-01-07 19:36:43 +0000 |
---|---|---|
committer | Piotr Wach <pwach@bloomberg.net> | 2024-01-07 19:36:43 +0000 |
commit | 51b67ff9d009a56272448d1fee1951f30b1de678 (patch) | |
tree | bbfd954803961d338426f105d468c75a2c5b2b9a /src/traverse.rs | |
parent | 13c381bebc6a64e553ec11793ec8880f868e712c (diff) |
wip
Diffstat (limited to 'src/traverse.rs')
-rw-r--r-- | src/traverse.rs | 521 |
1 files changed, 251 insertions, 270 deletions
diff --git a/src/traverse.rs b/src/traverse.rs index 69c1639..8838438 100644 --- a/src/traverse.rs +++ b/src/traverse.rs @@ -71,283 +71,254 @@ pub struct Traversal { } impl Traversal { - pub fn from_walk<T>( - mut walk_options: WalkOptions, - input: Vec<PathBuf>, - waker_rx: &crossbeam::channel::Receiver<T>, - mut update: impl FnMut(&mut Traversal, Option<T>) -> Result<bool>, - ) -> Result<Option<Traversal>> { - #[derive(Default, Copy, Clone)] - struct EntryInfo { - size: u128, - entries_count: Option<u64>, - } - impl EntryInfo { - fn add_count(&mut self, other: &Self) { - self.entries_count = match (self.entries_count, other.entries_count) { - (Some(a), Some(b)) => Some(a + b), - (None, Some(b)) => Some(b), - (Some(a), None) => Some(a), - (None, None) => None, - }; - } - } - fn set_entry_info_or_panic( - tree: &mut Tree, - node_idx: TreeIndex, - EntryInfo { - size, - entries_count, - }: EntryInfo, - ) { - let node = tree - .node_weight_mut(node_idx) - .expect("node for parent index we just retrieved"); - node.size = size; - node.entry_count = entries_count; - } - fn parent_or_panic(tree: &mut Tree, parent_node_idx: TreeIndex) -> TreeIndex { - tree.neighbors_directed(parent_node_idx, Direction::Incoming) - .next() - .expect("every node in the iteration has a parent") - } - fn pop_or_panic(v: &mut Vec<EntryInfo>) -> EntryInfo { - v.pop().expect("sizes per level to be in sync with graph") - } + // pub fn from_walk<T>( + // mut walk_options: WalkOptions, + // input: Vec<PathBuf>, + // mut update: impl FnMut(&mut Traversal, Option<T>) -> Result<bool>, + // ) -> Result<Option<Traversal>> { + - let mut t = { - let mut tree = Tree::new(); - let root_index = tree.add_node(EntryData::default()); - Traversal { - tree, - root_index, - entries_traversed: 0, - start: std::time::Instant::now(), - elapsed: None, - io_errors: 0, - total_bytes: None, - } - }; - - let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index); - let mut directory_info_per_depth_level = Vec::new(); - let mut current_directory_at_depth = EntryInfo::default(); - let mut previous_depth = 0; - let mut inodes = InodeFilter::default(); - - let throttle = Throttle::new(Duration::from_millis(250), None); - if walk_options.threads == 0 { - // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done. - // Also means that we will spin up a bunch of threads per root path, instead of reusing them. - walk_options.threads = num_cpus::get(); - } + // let mut t = { + // let mut tree = Tree::new(); + // let root_index = tree.add_node(EntryData::default()); + // Traversal { + // tree, + // root_index, + // entries_traversed: 0, + // start: std::time::Instant::now(), + // elapsed: None, + // io_errors: 0, + // total_bytes: None, + // } + // }; - #[cfg(not(windows))] - fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { - name.size_on_disk_fast(meta) - } - #[cfg(windows)] - fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { - parent.join(name).size_on_disk_fast(meta) - } + // let (mut previous_node_idx, mut parent_node_idx) = (t.root_index, t.root_index); + // let mut directory_info_per_depth_level = Vec::new(); + // let mut current_directory_at_depth = EntryInfo::default(); + // let mut previous_depth = 0; + // let mut inodes = InodeFilter::default(); - let (entry_tx, entry_rx) = crossbeam::channel::bounded(100); - std::thread::Builder::new() - .name("dua-fs-walk-dispatcher".to_string()) - .spawn({ - let walk_options = walk_options.clone(); - move || { - for root_path in input.into_iter() { - let device_id = match crossdev::init(root_path.as_ref()) { - Ok(id) => id, - Err(_) => { - t.io_errors += 1; - continue; - } - }; - - let root_path = Arc::new(root_path); - for entry in walk_options - .iter_from_path(root_path.as_ref(), device_id) - .into_iter() - { - if entry_tx - .send((entry, Arc::clone(&root_path), device_id)) - .is_err() - { - // The channel is closed, this means the user has - // requested to quit the app. Abort the walking. - return; - } - } - } - } - })?; - - loop { - crossbeam::select! { - recv(entry_rx) -> entry => { - let Ok((entry, root_path, device_id)) = entry else { - break; - }; - - t.entries_traversed += 1; - let mut data = EntryData::default(); - match entry { - Ok(entry) => { - data.name = if entry.depth < 1 { - (*root_path).clone() - } else { - entry.file_name.into() - }; - - let mut file_size = 0u128; - let mut mtime: SystemTime = UNIX_EPOCH; - match &entry.client_state { - Some(Ok(ref m)) => { - if !m.is_dir() - && (walk_options.count_hard_links || inodes.add(m)) - && (walk_options.cross_filesystems - || crossdev::is_same_device(device_id, m)) - { - if walk_options.apparent_size { - file_size = m.len() as u128; - } else { - file_size = size_on_disk(&entry.parent_path, &data.name, m) - .unwrap_or_else(|_| { - t.io_errors += 1; - data.metadata_io_error = true; - 0 - }) - as u128; - } - } else { - data.entry_count = Some(0); - data.is_dir = true; - } - - match m.modified() { - Ok(modified) => { - mtime = modified; - } - Err(_) => { - t.io_errors += 1; - data.metadata_io_error = true; - } - } - } - Some(Err(_)) => { - t.io_errors += 1; - data.metadata_io_error = true; - } - None => {} - } - - match (entry.depth, previous_depth) { - (n, p) if n > p => { - directory_info_per_depth_level.push(current_directory_at_depth); - current_directory_at_depth = EntryInfo { - size: file_size, - entries_count: Some(1), - }; - parent_node_idx = previous_node_idx; - } - (n, p) if n < p => { - for _ in n..p { - set_entry_info_or_panic( - &mut t.tree, - parent_node_idx, - current_directory_at_depth, - ); - let dir_info = - pop_or_panic(&mut directory_info_per_depth_level); - - current_directory_at_depth.size += dir_info.size; - current_directory_at_depth.add_count(&dir_info); - - parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); - } - current_directory_at_depth.size += file_size; - *current_directory_at_depth.entries_count.get_or_insert(0) += 1; - set_entry_info_or_panic( - &mut t.tree, - parent_node_idx, - current_directory_at_depth, - ); - } - _ => { - current_directory_at_depth.size += file_size; - *current_directory_at_depth.entries_count.get_or_insert(0) += 1; - } - }; - - data.mtime = mtime; - data.size = file_size; - let entry_index = t.tree.add_node(data); - - t.tree.add_edge(parent_node_idx, entry_index, ()); - previous_node_idx = entry_index; - previous_depth = entry.depth; - } - Err(_) => { - if previous_depth == 0 { - data.name = (*root_path).clone(); - let entry_index = t.tree.add_node(data); - t.tree.add_edge(parent_node_idx, entry_index, ()); - } - - t.io_errors += 1 - } - } - - if throttle.can_update() && update(&mut t, None)? { - return Ok(None); - } - }, - recv(waker_rx) -> waker_value => { - let Ok(waker_value) = waker_value else { - continue; - }; - if update(&mut t, Some(waker_value))? { - return Ok(None); - } - }, - default(Duration::from_millis(250)) => { - // No events or new entries received, but we still need - // to keep updating the status message regularly. - if update(&mut t, None)? { - return Ok(None); - } - } - } - } + // let throttle = Throttle::new(Duration::from_millis(250), None); + // // if walk_options.threads == 0 { + // // // avoid using the global rayon pool, as it will keep a lot of threads alive after we are done. + // // // Also means that we will spin up a bunch of threads per root path, instead of reusing them. + // // walk_options.threads = num_cpus::get(); + // // } - directory_info_per_depth_level.push(current_directory_at_depth); - current_directory_at_depth = EntryInfo::default(); - for _ in 0..previous_depth { - let dir_info = pop_or_panic(&mut directory_info_per_depth_level); - current_directory_at_depth.size += dir_info.size; - current_directory_at_depth.add_count(&dir_info); + // // #[cfg(not(windows))] + // // fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + // // name.size_on_disk_fast(meta) + // // } + // // #[cfg(windows)] + // // fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + // // parent.join(name).size_on_disk_fast(meta) + // // } - set_entry_info_or_panic(&mut t.tree, parent_node_idx, current_directory_at_depth); - parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); - } - let root_size = t.recompute_root_size(); - set_entry_info_or_panic( - &mut t.tree, - t.root_index, - EntryInfo { - size: root_size, - entries_count: (t.entries_traversed > 0).then_some(t.entries_traversed), - }, - ); - t.total_bytes = Some(root_size); + // // enum TraversalEvents { + // // Entry(Result<jwalk::DirEntry<((), Option<Result<std::fs::Metadata, jwalk::Error>>)>, jwalk::Error>, Arc<PathBuf>, u64), + // // Finished, + // // } - t.elapsed = Some(t.start.elapsed()); - Ok(Some(t)) - } + // let (entry_tx, entry_rx) = crossbeam::channel::bounded(100); + // std::thread::Builder::new() + // .name("dua-fs-walk-dispatcher".to_string()) + // .spawn({ + // let walk_options = walk_options.clone(); + // move || { + // for root_path in input.into_iter() { + // let device_id = match crossdev::init(root_path.as_ref()) { + // Ok(id) => id, + // Err(_) => { + // t.io_errors += 1; + // continue; + // } + // }; + + // let root_path = Arc::new(root_path); + // for entry in walk_options + // .iter_from_path(root_path.as_ref(), device_id) + // .into_iter() + // { + // if entry_tx + // .send(TraversalEvents::Entry(entry, Arc::clone(&root_path), device_id)) + // .is_err() + // { + // // The channel is closed, this means the user has + // // requested to quit the app. Abort the walking. + // return; + // } + // } + // } + // if entry_tx.send(TraversalEvents::Finished).is_err() { + // log::error!("Failed to send TraversalEvents::Finished event"); + // } + // } + // })?; + + // // loop { + // // crossbeam::select! { + // // recv(entry_rx) -> entry => { + // // let Ok((entry, root_path, device_id)) = entry else { + // // break; + // // }; + + // // t.entries_traversed += 1; + // // let mut data = EntryData::default(); + // // match entry { + // // Ok(entry) => { + // // data.name = if entry.depth < 1 { + // // (*root_path).clone() + // // } else { + // // entry.file_name.into() + // // }; + + // // let mut file_size = 0u128; + // // let mut mtime: SystemTime = UNIX_EPOCH; + // // match &entry.client_state { + // // Some(Ok(ref m)) => { + // // if !m.is_dir() + // // && (walk_options.count_hard_links || inodes.add(m)) + // // && (walk_options.cross_filesystems + // // || crossdev::is_same_device(device_id, m)) + // // { + // // if walk_options.apparent_size { + // // file_size = m.len() as u128; + // // } else { + // // file_size = size_on_disk(&entry.parent_path, &data.name, m) + // // .unwrap_or_else(|_| { + // // t.io_errors += 1; + // // data.metadata_io_error = true; + // // 0 + // // }) + // // as u128; + // // } + // // } else { + // // data.entry_count = Some(0); + // // data.is_dir = true; + // // } - fn recompute_root_size(&self) -> u128 { + // // match m.modified() { + // // Ok(modified) => { + // // mtime = modified; + // // } + // // Err(_) => { + // // t.io_errors += 1; + // // data.metadata_io_error = true; + // // } + // // } + // // } + // // Some(Err(_)) => { + // // t.io_errors += 1; + // // data.metadata_io_error = true; + // // } + // // None => {} + // // } + + // // match (entry.depth, previous_depth) { + // // (n, p) if n > p => { + // // directory_info_per_depth_level.push(current_directory_at_depth); + // // current_directory_at_depth = EntryInfo { + // // size: file_size, + // // entries_count: Some(1), + // // }; + // // parent_node_idx = previous_node_idx; + // // } + // // (n, p) if n < p => { + // // for _ in n..p { + // // set_entry_info_or_panic( + // // &mut t.tree, + // // parent_node_idx, + // // current_directory_at_depth, + // // ); + // // let dir_info = + // // pop_or_panic(&mut directory_info_per_depth_level); + + // // current_directory_at_depth.size += dir_info.size; + // // current_directory_at_depth.add_count(&dir_info); + + // // parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); + // // } + // // current_directory_at_depth.size += file_size; + // // *current_directory_at_depth.entries_count.get_or_insert(0) += 1; + // // set_entry_info_or_panic( + // // &mut t.tree, + // // parent_node_idx, + // // current_directory_at_depth, + // // ); + // // } + // // _ => { + // // current_directory_at_depth.size += file_size; + // // *current_directory_at_depth.entries_count.get_or_insert(0) += 1; + // // } + // // }; + + // // data.mtime = mtime; + // // data.size = file_size; + // // let entry_index = t.tree.add_node(data); + + // // t.tree.add_edge(parent_node_idx, entry_index, ()); + // // previous_node_idx = entry_index; + // // previous_depth = entry.depth; + // // } + // // Err(_) => { + // // if previous_depth == 0 { + // // data.name = (*root_path).clone(); + // // let entry_index = t.tree.add_node(data); + // // t.tree.add_edge(parent_node_idx, entry_index, ()); + // // } + + // // t.io_errors += 1 + // // } + // // } + + // // if throttle.can_update() && update(&mut t, None)? { + // // return Ok(None); + // // } + // // }, + // // recv(waker_rx) -> waker_value => { + // // let Ok(waker_value) = waker_value else { + // // continue; + // // }; + // // if update(&mut t, Some(waker_value))? { + // // return Ok(None); + // // } + // // }, + // // default(Duration::from_millis(250)) => { + // // // No events or new entries received, but we still need + // // // to keep updating the status message regularly. + // // if update(&mut t, None)? { + // // return Ok(None); + // // } + // // } + // // } + // // } + + // directory_info_per_depth_level.push(current_directory_at_depth); + // current_directory_at_depth = EntryInfo::default(); + // for _ in 0..previous_depth { + // let dir_info = pop_or_panic(&mut directory_info_per_depth_level); + // current_directory_at_depth.size += dir_info.size; + // current_directory_at_depth.add_count(&dir_info); + + // set_entry_info_or_panic(&mut t.tree, parent_node_idx, current_directory_at_depth); + // parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); + // } + // let root_size = t.recompute_root_size(); + // set_entry_info_or_panic( + // &mut t.tree, + // t.root_index, + // EntryInfo { + // size: root_size, + // entries_count: (t.entries_traversed > 0).then_some(t.entries_traversed), + // }, + // ); + // t.total_bytes = Some(root_size); + + // t.elapsed = Some(t.start.elapsed()); + // Ok(Some(t)) + // } + + pub fn recompute_root_size(&self) -> u128 { self.tree .neighbors_directed(self.root_index, Direction::Outgoing) .map(|idx| get_size_or_panic(&self.tree, idx)) @@ -355,6 +326,16 @@ impl Traversal { } } +#[cfg(not(windows))] +pub fn size_on_disk(_parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + name.size_on_disk_fast(meta) +} + +#[cfg(windows)] +pub fn size_on_disk(parent: &Path, name: &Path, meta: &Metadata) -> io::Result<u64> { + parent.join(name).size_on_disk_fast(meta) +} + #[cfg(test)] mod tests { use super::*; |