diff options
Diffstat (limited to 'src/fscache.rs')
-rw-r--r-- | src/fscache.rs | 268 |
1 files changed, 186 insertions, 82 deletions
diff --git a/src/fscache.rs b/src/fscache.rs index 8fb991e..7f4ce24 100644 --- a/src/fscache.rs +++ b/src/fscache.rs @@ -2,7 +2,7 @@ use notify::{RecommendedWatcher, Watcher, DebouncedEvent, RecursiveMode}; use async_value::{Async, Stale}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, Weak}; use std::sync::mpsc::{channel, Sender, Receiver}; use std::collections::{HashMap, HashSet}; use std::time::Duration; @@ -12,6 +12,8 @@ use crate::files::{Files, File, SortBy}; use crate::widget::Events; use crate::fail::{HResult, HError, ErrorLog, Backtrace, ArcBacktrace}; +pub type CachedFiles = (Option<File>, Async<Files>); + #[derive(Debug, Clone)] pub struct DirSettings { @@ -64,6 +66,63 @@ impl std::fmt::Debug for FsCache { } } +#[derive(Clone)] +struct FsEventDispatcher { + targets: Arc<RwLock<HashMap<File, Vec<Weak<RwLock<Vec<FsEvent>>>>>>> +} + +impl FsEventDispatcher { + fn new() -> Self { + FsEventDispatcher { + targets: Arc::new(RwLock::new(HashMap::new())) + } + } + + fn add_target(&self, + dir: &File, + target: &Arc<RwLock<Vec<FsEvent>>>) -> HResult<()> { + let target = Arc::downgrade(target); + + self.targets + .write() + .map(|mut targets| { + match targets.get_mut(dir) { + Some(targets) => targets.push(target), + None => { targets.insert(dir.clone(), vec![target]); } + } + })?; + Ok(()) + } + + fn remove_target(&self, dir: &File) -> HResult<()> { + self.targets + .write()? + .get_mut(dir) + .map(|targets| { + targets.retain(|t| t.upgrade().is_some()); + }); + Ok(()) + } + + fn dispatch(&self, events: HashMap<File, Vec<FsEvent>>) -> HResult<()> { + for (dir, events) in events { + for target_dirs in self.targets + .read()? + .get(&dir) { + for target in target_dirs { + if let Some(target) = target.upgrade() { + let events = events.clone(); + + target.write()?.extend(events) + } + } + } + } + Ok(()) + } + + // fn remove_unnecessary +} #[derive(Clone)] pub struct FsCache { @@ -71,14 +130,14 @@ pub struct FsCache { pub tab_settings: Arc<RwLock<HashMap<File, TabSettings>>>, watched_dirs: Arc<RwLock<HashSet<File>>>, watcher: Arc<RwLock<RecommendedWatcher>>, - pub fs_changes: Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>, + fs_event_dispatcher: FsEventDispatcher } impl FsCache { pub fn new(sender: Sender<Events>) -> FsCache { let (tx_fs_event, rx_fs_event) = channel(); let watcher = RecommendedWatcher::new(tx_fs_event, - Duration::from_secs(2)).unwrap(); + Duration::from_secs(2)).unwrap(); let fs_cache = FsCache { @@ -86,12 +145,11 @@ impl FsCache { tab_settings: Arc::new(RwLock::new(HashMap::new())), watched_dirs: Arc::new(RwLock::new(HashSet::new())), watcher: Arc::new(RwLock::new(watcher)), - fs_changes: Arc::new(RwLock::new(vec![])), + fs_event_dispatcher: FsEventDispatcher::new() }; watch_fs(rx_fs_event, - fs_cache.files.clone(), - fs_cache.fs_changes.clone(), + fs_cache.fs_event_dispatcher.clone(), sender); fs_cache @@ -104,8 +162,6 @@ impl FsCache { } } -pub type CachedFiles = (Option<File>, Async<Files>); - impl FsCache { pub fn get_files(&self, dir: &File, stale: Stale) -> HResult<CachedFiles> { if self.files.read()?.contains_key(dir) { @@ -117,6 +173,8 @@ impl FsCache { let files = Async::new(move |_| { let mut files = Files::new_from_path_cancellable(&dir.path, stale)?; cache.add_watch(&dir).log(); + cache.fs_event_dispatcher.add_target(&dir, + &files.pending_events).log(); FsCache::apply_settingss(&cache, &mut files).ok(); Ok(files) }); @@ -129,7 +187,6 @@ impl FsCache { let mut files = files.run_sync()?; FsCache::apply_settingss(&self, &mut files).ok(); let files = FsCache::ensure_not_empty(files)?; - self.add_watch(&dir).log(); Ok(files) } @@ -144,28 +201,6 @@ impl FsCache { Ok(()) } - pub fn put_files(&self, files: &Files, selection: Option<File>) -> HResult<()> { - let dir = files.directory.clone(); - - let tab_settings = FsCache::extract_tab_settings(&files, selection); - - self.tab_settings.write()?.insert(dir.clone(), tab_settings); - - // let mut file_cache = self.files.write()?; - - // if file_cache.contains_key(&files.directory) { - // if files.meta_updated { - // let mut files = files.clone(); - // files.meta_updated = false; - // file_cache.insert(dir, files); - // } - // } else { - // file_cache.insert(dir, files.clone()); - // } - - Ok(()) - } - pub fn is_cached(&self, dir: &File) -> HResult<bool> { Ok(self.files.read()?.contains_key(dir)) } @@ -298,65 +333,134 @@ impl FsCache { } +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub enum FsEvent { + Create(File), + Change(File), + Rename(File, File), + Remove(File) +} + +impl FsEvent { + pub fn file(&self) -> &File { + use FsEvent::*; + match self { + Create(event_file) | + Change(event_file) | + Remove(event_file) | + Rename(_, event_file) => &event_file + } + } + + pub fn for_file(&self, file: &File) -> bool { + use FsEvent::*; + match self { + Create(event_file) | + Change(event_file) | + Remove(event_file) | + Rename(_, event_file) => event_file.path == file.path + } + } +} + +use std::convert::TryFrom; +impl TryFrom<DebouncedEvent> for FsEvent { + type Error = HError; + + fn try_from(event: DebouncedEvent) -> HResult<Self> { + let event = match event { + DebouncedEvent::Create(path) + => FsEvent::Create(File::new_from_path(&path, None)?), + + DebouncedEvent::Remove(path) + => FsEvent::Remove(File::new_from_path(&path, None)?), + + DebouncedEvent::Write(path) | + DebouncedEvent::Chmod(path) + => FsEvent::Change(File::new_from_path(&path, None)?), + + DebouncedEvent::Rename(old_path, new_path) + => FsEvent::Rename(File::new_from_path(&old_path, None)?, + File::new_from_path(&new_path, None)?), + + DebouncedEvent::Error(err, path) + => Err(HError::INotifyError(format!("{}, {:?}", err, path), + Backtrace::new_arced()))?, + DebouncedEvent::Rescan + => Err(HError::INotifyError("Need to rescan".to_string(), + Backtrace::new_arced()))?, + // Ignore NoticeRemove/NoticeWrite + _ => None?, + }; + + Ok(event) + } +} + + fn watch_fs(rx_fs_events: Receiver<DebouncedEvent>, - fs_cache: Arc<RwLock<HashMap<File, Files>>>, - fs_changes: Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>, + fs_event_dispatcher: FsEventDispatcher, sender: Sender<Events>) { std::thread::spawn(move || -> HResult<()> { - for event in rx_fs_events.iter() { - apply_event(&fs_cache, &fs_changes, event).log(); + let transform_event = + move |event: DebouncedEvent| -> HResult<(File, FsEvent)> { + let path = event.get_source_path()?; + let dirpath = path.parent() + .map(|path| path) + .unwrap_or(std::path::Path::new("/")); + let dir = File::new_from_path(&dirpath, None)?; + let event = FsEvent::try_from(event)?; + Ok((dir, event)) + }; + + let collect_events = + move || -> HResult<HashMap<File, Vec<FsEvent>>> { + let event = loop { + use DebouncedEvent::*; + + let event = rx_fs_events.recv()?; + match event { + NoticeWrite(_) => continue, + NoticeRemove(_) => continue, + _ => break std::iter::once(event) + } + }; + + // Wait a bit to batch up more events + std::thread::sleep(std::time::Duration::from_millis(100)); - sender.send(Events::WidgetReady).ok(); + // Batch up all other remaining events received so far + let events = event.chain(rx_fs_events.try_iter()) + .map(transform_event) + .flatten() + .fold(HashMap::with_capacity(1000), |mut events, (dir, event)| { + events.entry(dir) + .or_insert(vec![]) + .push(event); + + events + }); + + Ok(events) + }; + + + let dispatch_events = + move |events| -> HResult<()> { + fs_event_dispatcher.dispatch(events)?; + sender.send(Events::WidgetReady)?; + Ok(()) + }; + + loop { + if let Ok(events) = collect_events().log_and() { + dispatch_events(events).log(); + } } - Ok(()) }); } -fn apply_event(_fs_cache: &Arc<RwLock<HashMap<File, Files>>>, - fs_changes: &Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>, - event: DebouncedEvent) - -> HResult<()> { - let path = &event.get_source_path()?; - - let dirpath = path.parent() - .map(|path| path.to_path_buf()) - .unwrap_or_else(|| PathBuf::from("/")); - let dir = File::new_from_path(&dirpath, None)?; - - let old_file = File::new_from_path(&path, None)?; - let mut new_file = match event { - DebouncedEvent::Remove(_) => None, - _ => Some(File::new_from_path(&path, None)?) - }; - - new_file.as_mut().map(|file| file.meta_sync()); - - fs_changes.write()?.push((dir, - Some(old_file), - new_file)); - - // for dir in fs_cache.write()?.values_mut() { - // if dir.path_in_here(&path).unwrap_or(false) { - // let old_file = dir.find_file_with_path(&path).cloned(); - // let dirty_meta = old_file - // .as_ref() - // .map(|f| f.dirty_meta.clone()) - // .unwrap_or(None); - // let mut new_file = match event { - // DebouncedEvent::Remove(_) => None, - // _ => Some(File::new_from_path(&path, dirty_meta)?) - // }; - - // new_file.as_mut().map(|file| file.meta_sync()); - // dir.replace_file(old_file.as_ref(), new_file.clone()).log(); - - // fs_changes.write()?.push((dir.directory.clone(), - // old_file, - // new_file)); - // } - // } - Ok(()) -} + trait PathFromEvent { fn get_source_path(&self) -> HResult<&PathBuf>; |