diff options
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/interactive/app/eventloop.rs | 77 | ||||
-rw-r--r-- | src/interactive/app/input.rs | 32 | ||||
-rw-r--r-- | src/interactive/app/mod.rs | 1 | ||||
-rw-r--r-- | src/traverse.rs | 252 |
6 files changed, 227 insertions, 137 deletions
@@ -356,6 +356,7 @@ dependencies = [ "byte-unit", "chrono", "clap", + "crossbeam", "crosstermion", "filesize", "gix-glob", @@ -45,6 +45,7 @@ bstr = "1.8.0" simplelog = "0.12.1" log = "0.4.20" log-panics = { version = "2", features = ["with-backtrace"]} +crossbeam = "0.8" [[bin]] name="dua" diff --git a/src/interactive/app/eventloop.rs b/src/interactive/app/eventloop.rs index 36f60e0..cec4506 100644 --- a/src/interactive/app/eventloop.rs +++ b/src/interactive/app/eventloop.rs @@ -6,8 +6,9 @@ use crate::interactive::{ SortMode, }; use anyhow::Result; +use crossbeam::channel::Receiver; use crosstermion::crossterm::event::{KeyCode, KeyEvent, KeyEventKind, KeyModifiers}; -use crosstermion::input::{input_channel, Event}; +use crosstermion::input::Event; use dua::{ traverse::{EntryData, Traversal}, WalkOptions, WalkResult, @@ -16,6 +17,7 @@ use std::path::PathBuf; use tui::backend::Backend; use tui_react::Terminal; +use super::input::input_channel; use super::tree_view::TreeView; #[derive(Default, Copy, Clone, PartialEq)] @@ -343,7 +345,7 @@ pub struct TerminalApp { pub window: MainWindow, } -type KeyboardInputAndApp = (std::sync::mpsc::Receiver<Event>, TerminalApp); +type KeyboardInputAndApp = (crossbeam::channel::Receiver<Event>, TerminalApp); impl TerminalApp { pub fn refresh_view<B>(&mut self, terminal: &mut Terminal<B>) @@ -397,56 +399,63 @@ impl TerminalApp { let mut window = MainWindow::default(); let keys_rx = match mode { Interaction::None => { - let (_, keys_rx) = std::sync::mpsc::channel(); + let (_, keys_rx) = crossbeam::channel::unbounded(); keys_rx } Interaction::Full => input_channel(), }; - let fetch_buffered_key_events = || { + #[inline] + fn fetch_buffered_key_events(keys_rx: &Receiver<Event>) -> Vec<Event> { let mut keys = Vec::new(); while let Ok(key) = keys_rx.try_recv() { keys.push(key); } keys - }; + } let mut state = AppState { is_scanning: true, ..Default::default() }; let mut received_events = false; - let traversal = Traversal::from_walk(options, input_paths, |traversal| { - if !received_events { - state.navigation_mut().view_root = traversal.root_index; - } - state.entries = sorted_entries( - &traversal.tree, - state.navigation().view_root, - state.sorting, - state.glob_root(), - ); - if !received_events { - state.navigation_mut().selected = state.entries.first().map(|b| b.index); - } - state.reset_message(); // force "scanning" to appear - - let events = fetch_buffered_key_events(); - received_events |= !events.is_empty(); + let traversal = + Traversal::from_walk(options, input_paths, &keys_rx, |traversal, event| { + if !received_events { + state.navigation_mut().view_root = traversal.root_index; + } + state.entries = sorted_entries( + &traversal.tree, + state.navigation().view_root, + state.sorting, + state.glob_root(), + ); + if !received_events { + state.navigation_mut().selected = state.entries.first().map(|b| b.index); + } + state.reset_message(); // force "scanning" to appear - let should_exit = match state.process_events( - &mut window, - traversal, - &mut display, - terminal, - events.into_iter(), - )? { - ProcessingResult::ExitRequested(_) => true, - ProcessingResult::Finished(_) => false, - }; + let mut events = fetch_buffered_key_events(&keys_rx); + if let Some(event) = event { + // This update is triggered by a user event, insert it + // before any events fetched later. + events.insert(0, event); + } + received_events |= !events.is_empty(); + + let should_exit = match state.process_events( + &mut window, + traversal, + &mut display, + terminal, + events.into_iter(), + )? { + ProcessingResult::ExitRequested(_) => true, + ProcessingResult::Finished(_) => false, + }; - Ok(should_exit) - })?; + Ok(should_exit) + })?; let traversal = match traversal { Some(t) => t, diff --git a/src/interactive/app/input.rs b/src/interactive/app/input.rs new file mode 100644 index 0000000..3dc1e54 --- /dev/null +++ b/src/interactive/app/input.rs @@ -0,0 +1,32 @@ +use crossbeam::channel::Receiver; +pub use crosstermion::crossterm::event::Event; + +enum Action<T> { + Continue, + Result(Result<T, std::io::Error>), +} + +fn continue_on_interrupt<T>(result: Result<T, std::io::Error>) -> Action<T> { + match result { + Ok(v) => Action::Result(Ok(v)), + Err(err) if err.kind() == std::io::ErrorKind::Interrupted => Action::Continue, + Err(err) => Action::Result(Err(err)), + } +} + +pub fn input_channel() -> Receiver<Event> { + let (key_send, key_receive) = crossbeam::channel::bounded(0); + std::thread::spawn(move || -> Result<(), std::io::Error> { + loop { + let event = match continue_on_interrupt(crosstermion::crossterm::event::read()) { + Action::Continue => continue, + Action::Result(res) => res?, + }; + if key_send.send(event).is_err() { + break; + } + } + Ok(()) + }); + key_receive +} diff --git a/src/interactive/app/mod.rs b/src/interactive/app/mod.rs index 7794b40..f1e2925 100644 --- a/src/interactive/app/mod.rs +++ b/src/interactive/app/mod.rs @@ -2,6 +2,7 @@ mod bytevis; mod common; mod eventloop; mod handlers; +mod input; mod navigation; pub mod tree_view; diff --git a/src/traverse.rs b/src/traverse.rs index c6026b1..69c1639 100644 --- a/src/traverse.rs +++ b/src/traverse.rs @@ -7,6 +7,7 @@ use std::{ fs::Metadata, io, path::{Path, PathBuf}, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -70,10 +71,11 @@ pub struct Traversal { } impl Traversal { - pub fn from_walk( + pub fn from_walk<T>( mut walk_options: WalkOptions, input: Vec<PathBuf>, - mut update: impl FnMut(&mut Traversal) -> Result<bool>, + waker_rx: &crossbeam::channel::Receiver<T>, + mut update: impl FnMut(&mut Traversal, Option<T>) -> Result<bool>, ) -> Result<Option<Traversal>> { #[derive(Default, Copy, Clone)] struct EntryInfo { @@ -149,129 +151,173 @@ impl Traversal { parent.join(name).size_on_disk_fast(meta) } - for path in input.into_iter() { - let device_id = match crossdev::init(path.as_ref()) { - Ok(id) => id, - Err(_) => { - t.io_errors += 1; - continue; - } - }; - for entry in walk_options - .iter_from_path(path.as_ref(), device_id) - .into_iter() - { - t.entries_traversed += 1; - let mut data = EntryData::default(); - match entry { - Ok(entry) => { - data.name = if entry.depth < 1 { - path.clone() - } else { - entry.file_name.into() + let (entry_tx, entry_rx) = crossbeam::channel::bounded(100); + std::thread::Builder::new() + .name("dua-fs-walk-dispatcher".to_string()) + .spawn({ + let walk_options = walk_options.clone(); + move || { + for root_path in input.into_iter() { + let device_id = match crossdev::init(root_path.as_ref()) { + Ok(id) => id, + Err(_) => { + t.io_errors += 1; + continue; + } }; - let mut file_size = 0u128; - let mut mtime: SystemTime = UNIX_EPOCH; - match &entry.client_state { - Some(Ok(ref m)) => { - if !m.is_dir() - && (walk_options.count_hard_links || inodes.add(m)) - && (walk_options.cross_filesystems - || crossdev::is_same_device(device_id, m)) - { - if walk_options.apparent_size { - file_size = m.len() as u128; + let root_path = Arc::new(root_path); + for entry in walk_options + .iter_from_path(root_path.as_ref(), device_id) + .into_iter() + { + if entry_tx + .send((entry, Arc::clone(&root_path), device_id)) + .is_err() + { + // The channel is closed, this means the user has + // requested to quit the app. Abort the walking. + return; + } + } + } + } + })?; + + loop { + crossbeam::select! { + recv(entry_rx) -> entry => { + let Ok((entry, root_path, device_id)) = entry else { + break; + }; + + t.entries_traversed += 1; + let mut data = EntryData::default(); + match entry { + Ok(entry) => { + data.name = if entry.depth < 1 { + (*root_path).clone() + } else { + entry.file_name.into() + }; + + let mut file_size = 0u128; + let mut mtime: SystemTime = UNIX_EPOCH; + match &entry.client_state { + Some(Ok(ref m)) => { + if !m.is_dir() + && (walk_options.count_hard_links || inodes.add(m)) + && (walk_options.cross_filesystems + || crossdev::is_same_device(device_id, m)) + { + if walk_options.apparent_size { + file_size = m.len() as u128; + } else { + file_size = size_on_disk(&entry.parent_path, &data.name, m) + .unwrap_or_else(|_| { + t.io_errors += 1; + data.metadata_io_error = true; + 0 + }) + as u128; + } } else { - file_size = size_on_disk(&entry.parent_path, &data.name, m) - .unwrap_or_else(|_| { - t.io_errors += 1; - data.metadata_io_error = true; - 0 - }) - as u128; + data.entry_count = Some(0); + data.is_dir = true; } - } else { - data.entry_count = Some(0); - data.is_dir = true; - } - match m.modified() { - Ok(modified) => { - mtime = modified; - } - Err(_) => { - t.io_errors += 1; - data.metadata_io_error = true; + match m.modified() { + Ok(modified) => { + mtime = modified; + } + Err(_) => { + t.io_errors += 1; + data.metadata_io_error = true; + } } } + Some(Err(_)) => { + t.io_errors += 1; + data.metadata_io_error = true; + } + None => {} } - Some(Err(_)) => { - t.io_errors += 1; - data.metadata_io_error = true; - } - None => {} - } - match (entry.depth, previous_depth) { - (n, p) if n > p => { - directory_info_per_depth_level.push(current_directory_at_depth); - current_directory_at_depth = EntryInfo { - size: file_size, - entries_count: Some(1), - }; - parent_node_idx = previous_node_idx; - } - (n, p) if n < p => { - for _ in n..p { + match (entry.depth, previous_depth) { + (n, p) if n > p => { + directory_info_per_depth_level.push(current_directory_at_depth); + current_directory_at_depth = EntryInfo { + size: file_size, + entries_count: Some(1), + }; + parent_node_idx = previous_node_idx; + } + (n, p) if n < p => { + for _ in n..p { + set_entry_info_or_panic( + &mut t.tree, + parent_node_idx, + current_directory_at_depth, + ); + let dir_info = + pop_or_panic(&mut directory_info_per_depth_level); + + current_directory_at_depth.size += dir_info.size; + current_directory_at_depth.add_count(&dir_info); + + parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); + } + current_directory_at_depth.size += file_size; + *current_directory_at_depth.entries_count.get_or_insert(0) += 1; set_entry_info_or_panic( &mut t.tree, parent_node_idx, current_directory_at_depth, ); - let dir_info = - pop_or_panic(&mut directory_info_per_depth_level); - - current_directory_at_depth.size += dir_info.size; - current_directory_at_depth.add_count(&dir_info); - - parent_node_idx = parent_or_panic(&mut t.tree, parent_node_idx); } - current_directory_at_depth.size += file_size; - *current_directory_at_depth.entries_count.get_or_insert(0) += 1; - set_entry_info_or_panic( - &mut t.tree, - parent_node_idx, - current_directory_at_depth, - ); - } - _ => { - current_directory_at_depth.size += file_size; - *current_directory_at_depth.entries_count.get_or_insert(0) += 1; - } - }; - - data.mtime = mtime; - data.size = file_size; - let entry_index = t.tree.add_node(data); + _ => { + current_directory_at_depth.size += file_size; + *current_directory_at_depth.entries_count.get_or_insert(0) += 1; + } + }; - t.tree.add_edge(parent_node_idx, entry_index, ()); - previous_node_idx = entry_index; - previous_depth = entry.depth; - } - Err(_) => { - if previous_depth == 0 { - data.name = path.clone(); + data.mtime = mtime; + data.size = file_size; let entry_index = t.tree.add_node(data); + t.tree.add_edge(parent_node_idx, entry_index, ()); + previous_node_idx = entry_index; + previous_depth = entry.depth; } + Err(_) => { + if previous_depth == 0 { + data.name = (*root_path).clone(); + let entry_index = t.tree.add_node(data); + t.tree.add_edge(parent_node_idx, entry_index, ()); + } - t.io_errors += 1 + t.io_errors += 1 + } } - } - if throttle.can_update() && update(&mut t)? { - return Ok(None); + if throttle.can_update() && update(&mut t, None)? { + return Ok(None); + } + }, + recv(waker_rx) -> waker_value => { + let Ok(waker_value) = waker_value else { + continue; + }; + if update(&mut t, Some(waker_value))? { + return Ok(None); + } + }, + default(Duration::from_millis(250)) => { + // No events or new entries received, but we still need + // to keep updating the status message regularly. + if update(&mut t, None)? { + return Ok(None); + } } } } |