summaryrefslogtreecommitdiffstats
path: root/src/file_sum/sum_computation.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/file_sum/sum_computation.rs')
-rw-r--r--src/file_sum/sum_computation.rs344
1 files changed, 186 insertions, 158 deletions
diff --git a/src/file_sum/sum_computation.rs b/src/file_sum/sum_computation.rs
index 84e5072..28ce930 100644
--- a/src/file_sum/sum_computation.rs
+++ b/src/file_sum/sum_computation.rs
@@ -7,7 +7,6 @@ use {
},
ahash::AHashMap,
crossbeam::channel,
- lazy_static::lazy_static,
rayon::{ThreadPool, ThreadPoolBuilder},
std::{
convert::TryInto,
@@ -29,6 +28,12 @@ use {
},
};
+
+struct DirSummer {
+ thread_count: usize,
+ thread_pool: ThreadPool,
+}
+
/// a node id, taking the device into account to be sure to discriminate
/// nodes with the same inode but on different devices
#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
@@ -39,10 +44,6 @@ struct NodeId {
dev: u64,
}
-// threads used by one computation
-const THREADS_COUNT: usize = 6;
-
-
#[inline(always)]
fn is_ignored(path: &Path, special_paths: &[SpecialPath]) -> bool {
match special_paths.find(path) {
@@ -51,189 +52,216 @@ fn is_ignored(path: &Path, special_paths: &[SpecialPath]) -> bool {
}
}
-/// compute the consolidated numbers for a directory, with implementation
-/// varying depending on the OS:
-/// On unix, the computation is done on blocks of 512 bytes
-/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
-pub fn compute_dir_sum(
- path: &Path,
- cache: &mut AHashMap<PathBuf, FileSum>,
- dam: &Dam,
- con: &AppContext,
-) -> Option<FileSum> {
- //debug!("compute size of dir {:?} --------------- ", path);
-
- if is_ignored(path, &con.special_paths) {
- return Some(FileSum::zero());
- }
-
- lazy_static! {
- static ref THREAD_POOL: ThreadPool = ThreadPoolBuilder::new()
- .num_threads(THREADS_COUNT * 2)
+impl DirSummer {
+ pub fn new(thread_count: usize) -> Self {
+ let thread_pool = ThreadPoolBuilder::new()
+ .num_threads(thread_count)
.build()
.unwrap();
+ Self {
+ thread_count,
+ thread_pool,
+ }
}
+ /// compute the consolidated numbers for a directory, with implementation
+ /// varying depending on the OS:
+ /// On unix, the computation is done on blocks of 512 bytes
+ /// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
+ pub fn compute_dir_sum(
+ &mut self,
+ path: &Path,
+ cache: &mut AHashMap<PathBuf, FileSum>,
+ dam: &Dam,
+ con: &AppContext,
+ ) -> Option<FileSum> {
+ let threads_count = self.thread_count;
+
+ if is_ignored(path, &con.special_paths) {
+ return Some(FileSum::zero());
+ }
- // to avoid counting twice a node, we store their id in a set
- #[cfg(unix)]
- let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default()));
-
- // busy is the number of directories which are either being processed or queued
- // We use this count to determine when threads can stop waiting for tasks
- let mut busy = 0;
- let mut sum = compute_file_sum(path);
-
- // this MPMC channel contains the directory paths which must be handled.
- // A None means there's nothing left and the thread may send its result and stop
- let (dirs_sender, dirs_receiver) = channel::unbounded();
-
- let special_paths: Vec<SpecialPath> = con.special_paths.iter()
- .filter(|sp| sp.can_have_matches_in(path))
- .cloned()
- .collect();
-
- // the first level is managed a little differently: we look at the cache
- // before adding. This enables faster computations in two cases:
- // - for the root line (assuming it's computed after the content)
- // - when we navigate up the tree
- if let Ok(entries) = fs::read_dir(path) {
- for e in entries.flatten() {
- if let Ok(md) = e.metadata() {
- if md.is_dir() {
- let entry_path = e.path();
-
- if is_ignored(&entry_path, &special_paths) {
- debug!("not summing special path {:?}", entry_path);
- continue;
- }
+ // to avoid counting twice a node, we store their id in a set
+ #[cfg(unix)]
+ let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default()));
+
+ // busy is the number of directories which are either being processed or queued
+ // We use this count to determine when threads can stop waiting for tasks
+ let mut busy = 0;
+ let mut sum = compute_file_sum(path);
+
+ // this MPMC channel contains the directory paths which must be handled.
+ // A None means there's nothing left and the thread may send its result and stop
+ let (dirs_sender, dirs_receiver) = channel::unbounded();
+
+ let special_paths: Vec<SpecialPath> = con.special_paths.iter()
+ .filter(|sp| sp.can_have_matches_in(path))
+ .cloned()
+ .collect();
+
+ // the first level is managed a little differently: we look at the cache
+ // before adding. This enables faster computations in two cases:
+ // - for the root line (assuming it's computed after the content)
+ // - when we navigate up the tree
+ if let Ok(entries) = fs::read_dir(path) {
+ for e in entries.flatten() {
+ if let Ok(md) = e.metadata() {
+ if md.is_dir() {
+ let entry_path = e.path();
+
+ if is_ignored(&entry_path, &special_paths) {
+ debug!("not summing special path {:?}", entry_path);
+ continue;
+ }
- // we check the cache
- if let Some(entry_sum) = cache.get(&entry_path) {
- sum += *entry_sum;
- continue;
- }
- // we add the directory to the channel of dirs needing
- // processing
- busy += 1;
- dirs_sender.send(Some(entry_path)).unwrap();
- } else {
-
- #[cfg(unix)]
- if md.nlink() > 1 {
- let mut nodes = nodes.lock().unwrap();
- let node_id = NodeId {
- inode: md.ino(),
- dev: md.dev(),
- };
- if !nodes.insert(node_id) {
- // it was already in the set
+ // we check the cache
+ if let Some(entry_sum) = cache.get(&entry_path) {
+ sum += *entry_sum;
continue;
}
- }
+ // we add the directory to the channel of dirs needing
+ // processing
+ busy += 1;
+ dirs_sender.send(Some(entry_path)).unwrap();
+ } else {
+
+ #[cfg(unix)]
+ if md.nlink() > 1 {
+ let mut nodes = nodes.lock().unwrap();
+ let node_id = NodeId {
+ inode: md.ino(),
+ dev: md.dev(),
+ };
+ if !nodes.insert(node_id) {
+ // it was already in the set
+ continue;
+ }
+ }
+ }
+ sum += md_sum(&md);
}
- sum += md_sum(&md);
}
}
- }
-
- if busy == 0 {
- return Some(sum);
- }
- let busy = Arc::new(AtomicIsize::new(busy));
-
- // this MPMC channel is here for the threads to send their results
- // at end of computation
- let (thread_sum_sender, thread_sum_receiver) = channel::bounded(THREADS_COUNT);
+ if busy == 0 {
+ return Some(sum);
+ }
+ let busy = Arc::new(AtomicIsize::new(busy));
- // Each thread does a summation without merge and the data are merged
- // at the end (this avoids waiting for a mutex during computation)
- for _ in 0..THREADS_COUNT {
- let busy = Arc::clone(&busy);
- let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
+ // this MPMC channel is here for the threads to send their results
+ // at end of computation
+ let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count);
- #[cfg(unix)]
- let nodes = nodes.clone();
-
- let special_paths = special_paths.clone();
-
- let observer = dam.observer();
- let thread_sum_sender = thread_sum_sender.clone();
- THREAD_POOL.spawn(move || {
- let mut thread_sum = FileSum::zero();
- loop {
- let o = dirs_receiver.recv();
- if let Ok(Some(open_dir)) = o {
- if let Ok(entries) = fs::read_dir(&open_dir) {
- for e in entries.flatten() {
- if let Ok(md) = e.metadata() {
- if md.is_dir() {
-
- let path = e.path();
-
- if is_ignored(&path, &special_paths) {
- debug!("not summing (deep) special path {:?}", path);
- continue;
- }
- // we add the directory to the channel of dirs needing
- // processing
- busy.fetch_add(1, Ordering::Relaxed);
- dirs_sender.send(Some(path)).unwrap();
- } else {
+ // Each thread does a summation without merge and the data are merged
+ // at the end (this avoids waiting for a mutex during computation)
+ for _ in 0..threads_count {
+ let busy = Arc::clone(&busy);
+ let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
- #[cfg(unix)]
- if md.nlink() > 1 {
- let mut nodes = nodes.lock().unwrap();
- let node_id = NodeId {
- inode: md.ino(),
- dev: md.dev(),
- };
- if !nodes.insert(node_id) {
- // it was already in the set
+ #[cfg(unix)]
+ let nodes = nodes.clone();
+
+ let special_paths = special_paths.clone();
+
+ let observer = dam.observer();
+ let thread_sum_sender = thread_sum_sender.clone();
+ self.thread_pool.spawn(move || {
+ let mut thread_sum = FileSum::zero();
+ loop {
+ let o = dirs_receiver.recv();
+ if let Ok(Some(open_dir)) = o {
+ if let Ok(entries) = fs::read_dir(&open_dir) {
+ for e in entries.flatten() {
+ if let Ok(md) = e.metadata() {
+ if md.is_dir() {
+
+ let path = e.path();
+
+ if is_ignored(&path, &special_paths) {
+ debug!("not summing (deep) special path {:?}", path);
continue;
}
- }
+ // we add the directory to the channel of dirs needing
+ // processing
+ busy.fetch_add(1, Ordering::Relaxed);
+ dirs_sender.send(Some(path)).unwrap();
+ } else {
+
+ #[cfg(unix)]
+ if md.nlink() > 1 {
+ let mut nodes = nodes.lock().unwrap();
+ let node_id = NodeId {
+ inode: md.ino(),
+ dev: md.dev(),
+ };
+ if !nodes.insert(node_id) {
+ // it was already in the set
+ continue;
+ }
+ }
+
+ }
+ thread_sum += md_sum(&md);
+ } else {
+ // we can't measure much but we can count the file
+ thread_sum.incr();
}
- thread_sum += md_sum(&md);
- } else {
- // we can't measure much but we can count the file
- thread_sum.incr();
}
}
+ busy.fetch_sub(1, Ordering::Relaxed);
+ }
+ if observer.has_event() {
+ dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
+ break;
+ }
+ if busy.load(Ordering::Relaxed) < 1 {
+ dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
+ break;
}
- busy.fetch_sub(1, Ordering::Relaxed);
}
- if observer.has_event() {
- dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
- break;
+ thread_sum_sender.send(thread_sum).unwrap();
+ });
+ }
+ // Wait for the threads to finish and consolidate their results
+ for _ in 0..threads_count {
+ match thread_sum_receiver.recv() {
+ Ok(thread_sum) => {
+ sum += thread_sum;
}
- if busy.load(Ordering::Relaxed) < 1 {
- dirs_sender.send(None).unwrap(); // to unlock the next waiting thread
- break;
+ Err(e) => {
+ warn!("Error while recv summing thread result : {:?}", e);
}
}
- thread_sum_sender.send(thread_sum).unwrap();
- });
- }
- // Wait for the threads to finish and consolidate their results
- for _ in 0..THREADS_COUNT {
- match thread_sum_receiver.recv() {
- Ok(thread_sum) => {
- sum += thread_sum;
- }
- Err(e) => {
- warn!("Error while recv summing thread result : {:?}", e);
- }
}
+ if dam.has_event() {
+ return None;
+ }
+ Some(sum)
}
- if dam.has_event() {
- return None;
- }
- Some(sum)
+}
+
+
+/// compute the consolidated numbers for a directory, with implementation
+/// varying depending on the OS:
+/// On unix, the computation is done on blocks of 512 bytes
+/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
+pub fn compute_dir_sum(
+ path: &Path,
+ cache: &mut AHashMap<PathBuf, FileSum>,
+ dam: &Dam,
+ con: &AppContext,
+) -> Option<FileSum> {
+ use once_cell::sync::OnceCell;
+ static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new();
+ DIR_SUMMER
+ .get_or_init(|| {
+ Mutex::new(DirSummer::new(con.file_sum_threads_count))
+ })
+ .lock().unwrap()
+ .compute_dir_sum(path, cache, dam, con)
}
/// compute the sum for a regular file (not a folder)