diff options
Diffstat (limited to 'src/file_sum/sum_computation.rs')
-rw-r--r-- | src/file_sum/sum_computation.rs | 344 |
1 files changed, 186 insertions, 158 deletions
diff --git a/src/file_sum/sum_computation.rs b/src/file_sum/sum_computation.rs index 84e5072..28ce930 100644 --- a/src/file_sum/sum_computation.rs +++ b/src/file_sum/sum_computation.rs @@ -7,7 +7,6 @@ use { }, ahash::AHashMap, crossbeam::channel, - lazy_static::lazy_static, rayon::{ThreadPool, ThreadPoolBuilder}, std::{ convert::TryInto, @@ -29,6 +28,12 @@ use { }, }; + +struct DirSummer { + thread_count: usize, + thread_pool: ThreadPool, +} + /// a node id, taking the device into account to be sure to discriminate /// nodes with the same inode but on different devices #[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] @@ -39,10 +44,6 @@ struct NodeId { dev: u64, } -// threads used by one computation -const THREADS_COUNT: usize = 6; - - #[inline(always)] fn is_ignored(path: &Path, special_paths: &[SpecialPath]) -> bool { match special_paths.find(path) { @@ -51,189 +52,216 @@ fn is_ignored(path: &Path, special_paths: &[SpecialPath]) -> bool { } } -/// compute the consolidated numbers for a directory, with implementation -/// varying depending on the OS: -/// On unix, the computation is done on blocks of 512 bytes -/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks -pub fn compute_dir_sum( - path: &Path, - cache: &mut AHashMap<PathBuf, FileSum>, - dam: &Dam, - con: &AppContext, -) -> Option<FileSum> { - //debug!("compute size of dir {:?} --------------- ", path); - - if is_ignored(path, &con.special_paths) { - return Some(FileSum::zero()); - } - - lazy_static! { - static ref THREAD_POOL: ThreadPool = ThreadPoolBuilder::new() - .num_threads(THREADS_COUNT * 2) +impl DirSummer { + pub fn new(thread_count: usize) -> Self { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(thread_count) .build() .unwrap(); + Self { + thread_count, + thread_pool, + } } + /// compute the consolidated numbers for a directory, with implementation + /// varying depending on the OS: + /// On unix, the computation is done on blocks of 512 bytes + /// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks + pub fn compute_dir_sum( + &mut self, + path: &Path, + cache: &mut AHashMap<PathBuf, FileSum>, + dam: &Dam, + con: &AppContext, + ) -> Option<FileSum> { + let threads_count = self.thread_count; + + if is_ignored(path, &con.special_paths) { + return Some(FileSum::zero()); + } - // to avoid counting twice a node, we store their id in a set - #[cfg(unix)] - let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default())); - - // busy is the number of directories which are either being processed or queued - // We use this count to determine when threads can stop waiting for tasks - let mut busy = 0; - let mut sum = compute_file_sum(path); - - // this MPMC channel contains the directory paths which must be handled. - // A None means there's nothing left and the thread may send its result and stop - let (dirs_sender, dirs_receiver) = channel::unbounded(); - - let special_paths: Vec<SpecialPath> = con.special_paths.iter() - .filter(|sp| sp.can_have_matches_in(path)) - .cloned() - .collect(); - - // the first level is managed a little differently: we look at the cache - // before adding. This enables faster computations in two cases: - // - for the root line (assuming it's computed after the content) - // - when we navigate up the tree - if let Ok(entries) = fs::read_dir(path) { - for e in entries.flatten() { - if let Ok(md) = e.metadata() { - if md.is_dir() { - let entry_path = e.path(); - - if is_ignored(&entry_path, &special_paths) { - debug!("not summing special path {:?}", entry_path); - continue; - } + // to avoid counting twice a node, we store their id in a set + #[cfg(unix)] + let nodes = Arc::new(Mutex::new(FnvHashSet::<NodeId>::default())); + + // busy is the number of directories which are either being processed or queued + // We use this count to determine when threads can stop waiting for tasks + let mut busy = 0; + let mut sum = compute_file_sum(path); + + // this MPMC channel contains the directory paths which must be handled. + // A None means there's nothing left and the thread may send its result and stop + let (dirs_sender, dirs_receiver) = channel::unbounded(); + + let special_paths: Vec<SpecialPath> = con.special_paths.iter() + .filter(|sp| sp.can_have_matches_in(path)) + .cloned() + .collect(); + + // the first level is managed a little differently: we look at the cache + // before adding. This enables faster computations in two cases: + // - for the root line (assuming it's computed after the content) + // - when we navigate up the tree + if let Ok(entries) = fs::read_dir(path) { + for e in entries.flatten() { + if let Ok(md) = e.metadata() { + if md.is_dir() { + let entry_path = e.path(); + + if is_ignored(&entry_path, &special_paths) { + debug!("not summing special path {:?}", entry_path); + continue; + } - // we check the cache - if let Some(entry_sum) = cache.get(&entry_path) { - sum += *entry_sum; - continue; - } - // we add the directory to the channel of dirs needing - // processing - busy += 1; - dirs_sender.send(Some(entry_path)).unwrap(); - } else { - - #[cfg(unix)] - if md.nlink() > 1 { - let mut nodes = nodes.lock().unwrap(); - let node_id = NodeId { - inode: md.ino(), - dev: md.dev(), - }; - if !nodes.insert(node_id) { - // it was already in the set + // we check the cache + if let Some(entry_sum) = cache.get(&entry_path) { + sum += *entry_sum; continue; } - } + // we add the directory to the channel of dirs needing + // processing + busy += 1; + dirs_sender.send(Some(entry_path)).unwrap(); + } else { + + #[cfg(unix)] + if md.nlink() > 1 { + let mut nodes = nodes.lock().unwrap(); + let node_id = NodeId { + inode: md.ino(), + dev: md.dev(), + }; + if !nodes.insert(node_id) { + // it was already in the set + continue; + } + } + } + sum += md_sum(&md); } - sum += md_sum(&md); } } - } - - if busy == 0 { - return Some(sum); - } - let busy = Arc::new(AtomicIsize::new(busy)); - - // this MPMC channel is here for the threads to send their results - // at end of computation - let (thread_sum_sender, thread_sum_receiver) = channel::bounded(THREADS_COUNT); + if busy == 0 { + return Some(sum); + } + let busy = Arc::new(AtomicIsize::new(busy)); - // Each thread does a summation without merge and the data are merged - // at the end (this avoids waiting for a mutex during computation) - for _ in 0..THREADS_COUNT { - let busy = Arc::clone(&busy); - let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone()); + // this MPMC channel is here for the threads to send their results + // at end of computation + let (thread_sum_sender, thread_sum_receiver) = channel::bounded(threads_count); - #[cfg(unix)] - let nodes = nodes.clone(); - - let special_paths = special_paths.clone(); - - let observer = dam.observer(); - let thread_sum_sender = thread_sum_sender.clone(); - THREAD_POOL.spawn(move || { - let mut thread_sum = FileSum::zero(); - loop { - let o = dirs_receiver.recv(); - if let Ok(Some(open_dir)) = o { - if let Ok(entries) = fs::read_dir(&open_dir) { - for e in entries.flatten() { - if let Ok(md) = e.metadata() { - if md.is_dir() { - - let path = e.path(); - - if is_ignored(&path, &special_paths) { - debug!("not summing (deep) special path {:?}", path); - continue; - } - // we add the directory to the channel of dirs needing - // processing - busy.fetch_add(1, Ordering::Relaxed); - dirs_sender.send(Some(path)).unwrap(); - } else { + // Each thread does a summation without merge and the data are merged + // at the end (this avoids waiting for a mutex during computation) + for _ in 0..threads_count { + let busy = Arc::clone(&busy); + let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone()); - #[cfg(unix)] - if md.nlink() > 1 { - let mut nodes = nodes.lock().unwrap(); - let node_id = NodeId { - inode: md.ino(), - dev: md.dev(), - }; - if !nodes.insert(node_id) { - // it was already in the set + #[cfg(unix)] + let nodes = nodes.clone(); + + let special_paths = special_paths.clone(); + + let observer = dam.observer(); + let thread_sum_sender = thread_sum_sender.clone(); + self.thread_pool.spawn(move || { + let mut thread_sum = FileSum::zero(); + loop { + let o = dirs_receiver.recv(); + if let Ok(Some(open_dir)) = o { + if let Ok(entries) = fs::read_dir(&open_dir) { + for e in entries.flatten() { + if let Ok(md) = e.metadata() { + if md.is_dir() { + + let path = e.path(); + + if is_ignored(&path, &special_paths) { + debug!("not summing (deep) special path {:?}", path); continue; } - } + // we add the directory to the channel of dirs needing + // processing + busy.fetch_add(1, Ordering::Relaxed); + dirs_sender.send(Some(path)).unwrap(); + } else { + + #[cfg(unix)] + if md.nlink() > 1 { + let mut nodes = nodes.lock().unwrap(); + let node_id = NodeId { + inode: md.ino(), + dev: md.dev(), + }; + if !nodes.insert(node_id) { + // it was already in the set + continue; + } + } + + } + thread_sum += md_sum(&md); + } else { + // we can't measure much but we can count the file + thread_sum.incr(); } - thread_sum += md_sum(&md); - } else { - // we can't measure much but we can count the file - thread_sum.incr(); } } + busy.fetch_sub(1, Ordering::Relaxed); + } + if observer.has_event() { + dirs_sender.send(None).unwrap(); // to unlock the next waiting thread + break; + } + if busy.load(Ordering::Relaxed) < 1 { + dirs_sender.send(None).unwrap(); // to unlock the next waiting thread + break; } - busy.fetch_sub(1, Ordering::Relaxed); } - if observer.has_event() { - dirs_sender.send(None).unwrap(); // to unlock the next waiting thread - break; + thread_sum_sender.send(thread_sum).unwrap(); + }); + } + // Wait for the threads to finish and consolidate their results + for _ in 0..threads_count { + match thread_sum_receiver.recv() { + Ok(thread_sum) => { + sum += thread_sum; } - if busy.load(Ordering::Relaxed) < 1 { - dirs_sender.send(None).unwrap(); // to unlock the next waiting thread - break; + Err(e) => { + warn!("Error while recv summing thread result : {:?}", e); } } - thread_sum_sender.send(thread_sum).unwrap(); - }); - } - // Wait for the threads to finish and consolidate their results - for _ in 0..THREADS_COUNT { - match thread_sum_receiver.recv() { - Ok(thread_sum) => { - sum += thread_sum; - } - Err(e) => { - warn!("Error while recv summing thread result : {:?}", e); - } } + if dam.has_event() { + return None; + } + Some(sum) } - if dam.has_event() { - return None; - } - Some(sum) +} + + +/// compute the consolidated numbers for a directory, with implementation +/// varying depending on the OS: +/// On unix, the computation is done on blocks of 512 bytes +/// see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks +pub fn compute_dir_sum( + path: &Path, + cache: &mut AHashMap<PathBuf, FileSum>, + dam: &Dam, + con: &AppContext, +) -> Option<FileSum> { + use once_cell::sync::OnceCell; + static DIR_SUMMER: OnceCell<Mutex<DirSummer>> = OnceCell::new(); + DIR_SUMMER + .get_or_init(|| { + Mutex::new(DirSummer::new(con.file_sum_threads_count)) + }) + .lock().unwrap() + .compute_dir_sum(path, cache, dam, con) } /// compute the sum for a regular file (not a folder) |