summaryrefslogtreecommitdiffstats
path: root/src/fscache.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/fscache.rs')
-rw-r--r--src/fscache.rs268
1 files changed, 186 insertions, 82 deletions
diff --git a/src/fscache.rs b/src/fscache.rs
index 8fb991e..7f4ce24 100644
--- a/src/fscache.rs
+++ b/src/fscache.rs
@@ -2,7 +2,7 @@ use notify::{RecommendedWatcher, Watcher, DebouncedEvent, RecursiveMode};
use async_value::{Async, Stale};
-use std::sync::{Arc, RwLock};
+use std::sync::{Arc, RwLock, Weak};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::collections::{HashMap, HashSet};
use std::time::Duration;
@@ -12,6 +12,8 @@ use crate::files::{Files, File, SortBy};
use crate::widget::Events;
use crate::fail::{HResult, HError, ErrorLog, Backtrace, ArcBacktrace};
+pub type CachedFiles = (Option<File>, Async<Files>);
+
#[derive(Debug, Clone)]
pub struct DirSettings {
@@ -64,6 +66,63 @@ impl std::fmt::Debug for FsCache {
}
}
+#[derive(Clone)]
+struct FsEventDispatcher {
+ targets: Arc<RwLock<HashMap<File, Vec<Weak<RwLock<Vec<FsEvent>>>>>>>
+}
+
+impl FsEventDispatcher {
+ fn new() -> Self {
+ FsEventDispatcher {
+ targets: Arc::new(RwLock::new(HashMap::new()))
+ }
+ }
+
+ fn add_target(&self,
+ dir: &File,
+ target: &Arc<RwLock<Vec<FsEvent>>>) -> HResult<()> {
+ let target = Arc::downgrade(target);
+
+ self.targets
+ .write()
+ .map(|mut targets| {
+ match targets.get_mut(dir) {
+ Some(targets) => targets.push(target),
+ None => { targets.insert(dir.clone(), vec![target]); }
+ }
+ })?;
+ Ok(())
+ }
+
+ fn remove_target(&self, dir: &File) -> HResult<()> {
+ self.targets
+ .write()?
+ .get_mut(dir)
+ .map(|targets| {
+ targets.retain(|t| t.upgrade().is_some());
+ });
+ Ok(())
+ }
+
+ fn dispatch(&self, events: HashMap<File, Vec<FsEvent>>) -> HResult<()> {
+ for (dir, events) in events {
+ for target_dirs in self.targets
+ .read()?
+ .get(&dir) {
+ for target in target_dirs {
+ if let Some(target) = target.upgrade() {
+ let events = events.clone();
+
+ target.write()?.extend(events)
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ // fn remove_unnecessary
+}
#[derive(Clone)]
pub struct FsCache {
@@ -71,14 +130,14 @@ pub struct FsCache {
pub tab_settings: Arc<RwLock<HashMap<File, TabSettings>>>,
watched_dirs: Arc<RwLock<HashSet<File>>>,
watcher: Arc<RwLock<RecommendedWatcher>>,
- pub fs_changes: Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>,
+ fs_event_dispatcher: FsEventDispatcher
}
impl FsCache {
pub fn new(sender: Sender<Events>) -> FsCache {
let (tx_fs_event, rx_fs_event) = channel();
let watcher = RecommendedWatcher::new(tx_fs_event,
- Duration::from_secs(2)).unwrap();
+ Duration::from_secs(2)).unwrap();
let fs_cache = FsCache {
@@ -86,12 +145,11 @@ impl FsCache {
tab_settings: Arc::new(RwLock::new(HashMap::new())),
watched_dirs: Arc::new(RwLock::new(HashSet::new())),
watcher: Arc::new(RwLock::new(watcher)),
- fs_changes: Arc::new(RwLock::new(vec![])),
+ fs_event_dispatcher: FsEventDispatcher::new()
};
watch_fs(rx_fs_event,
- fs_cache.files.clone(),
- fs_cache.fs_changes.clone(),
+ fs_cache.fs_event_dispatcher.clone(),
sender);
fs_cache
@@ -104,8 +162,6 @@ impl FsCache {
}
}
-pub type CachedFiles = (Option<File>, Async<Files>);
-
impl FsCache {
pub fn get_files(&self, dir: &File, stale: Stale) -> HResult<CachedFiles> {
if self.files.read()?.contains_key(dir) {
@@ -117,6 +173,8 @@ impl FsCache {
let files = Async::new(move |_| {
let mut files = Files::new_from_path_cancellable(&dir.path, stale)?;
cache.add_watch(&dir).log();
+ cache.fs_event_dispatcher.add_target(&dir,
+ &files.pending_events).log();
FsCache::apply_settingss(&cache, &mut files).ok();
Ok(files)
});
@@ -129,7 +187,6 @@ impl FsCache {
let mut files = files.run_sync()?;
FsCache::apply_settingss(&self, &mut files).ok();
let files = FsCache::ensure_not_empty(files)?;
- self.add_watch(&dir).log();
Ok(files)
}
@@ -144,28 +201,6 @@ impl FsCache {
Ok(())
}
- pub fn put_files(&self, files: &Files, selection: Option<File>) -> HResult<()> {
- let dir = files.directory.clone();
-
- let tab_settings = FsCache::extract_tab_settings(&files, selection);
-
- self.tab_settings.write()?.insert(dir.clone(), tab_settings);
-
- // let mut file_cache = self.files.write()?;
-
- // if file_cache.contains_key(&files.directory) {
- // if files.meta_updated {
- // let mut files = files.clone();
- // files.meta_updated = false;
- // file_cache.insert(dir, files);
- // }
- // } else {
- // file_cache.insert(dir, files.clone());
- // }
-
- Ok(())
- }
-
pub fn is_cached(&self, dir: &File) -> HResult<bool> {
Ok(self.files.read()?.contains_key(dir))
}
@@ -298,65 +333,134 @@ impl FsCache {
}
+#[derive(PartialEq, Eq, Hash, Clone, Debug)]
+pub enum FsEvent {
+ Create(File),
+ Change(File),
+ Rename(File, File),
+ Remove(File)
+}
+
+impl FsEvent {
+ pub fn file(&self) -> &File {
+ use FsEvent::*;
+ match self {
+ Create(event_file) |
+ Change(event_file) |
+ Remove(event_file) |
+ Rename(_, event_file) => &event_file
+ }
+ }
+
+ pub fn for_file(&self, file: &File) -> bool {
+ use FsEvent::*;
+ match self {
+ Create(event_file) |
+ Change(event_file) |
+ Remove(event_file) |
+ Rename(_, event_file) => event_file.path == file.path
+ }
+ }
+}
+
+use std::convert::TryFrom;
+impl TryFrom<DebouncedEvent> for FsEvent {
+ type Error = HError;
+
+ fn try_from(event: DebouncedEvent) -> HResult<Self> {
+ let event = match event {
+ DebouncedEvent::Create(path)
+ => FsEvent::Create(File::new_from_path(&path, None)?),
+
+ DebouncedEvent::Remove(path)
+ => FsEvent::Remove(File::new_from_path(&path, None)?),
+
+ DebouncedEvent::Write(path) |
+ DebouncedEvent::Chmod(path)
+ => FsEvent::Change(File::new_from_path(&path, None)?),
+
+ DebouncedEvent::Rename(old_path, new_path)
+ => FsEvent::Rename(File::new_from_path(&old_path, None)?,
+ File::new_from_path(&new_path, None)?),
+
+ DebouncedEvent::Error(err, path)
+ => Err(HError::INotifyError(format!("{}, {:?}", err, path),
+ Backtrace::new_arced()))?,
+ DebouncedEvent::Rescan
+ => Err(HError::INotifyError("Need to rescan".to_string(),
+ Backtrace::new_arced()))?,
+ // Ignore NoticeRemove/NoticeWrite
+ _ => None?,
+ };
+
+ Ok(event)
+ }
+}
+
+
fn watch_fs(rx_fs_events: Receiver<DebouncedEvent>,
- fs_cache: Arc<RwLock<HashMap<File, Files>>>,
- fs_changes: Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>,
+ fs_event_dispatcher: FsEventDispatcher,
sender: Sender<Events>) {
std::thread::spawn(move || -> HResult<()> {
- for event in rx_fs_events.iter() {
- apply_event(&fs_cache, &fs_changes, event).log();
+ let transform_event =
+ move |event: DebouncedEvent| -> HResult<(File, FsEvent)> {
+ let path = event.get_source_path()?;
+ let dirpath = path.parent()
+ .map(|path| path)
+ .unwrap_or(std::path::Path::new("/"));
+ let dir = File::new_from_path(&dirpath, None)?;
+ let event = FsEvent::try_from(event)?;
+ Ok((dir, event))
+ };
+
+ let collect_events =
+ move || -> HResult<HashMap<File, Vec<FsEvent>>> {
+ let event = loop {
+ use DebouncedEvent::*;
+
+ let event = rx_fs_events.recv()?;
+ match event {
+ NoticeWrite(_) => continue,
+ NoticeRemove(_) => continue,
+ _ => break std::iter::once(event)
+ }
+ };
+
+ // Wait a bit to batch up more events
+ std::thread::sleep(std::time::Duration::from_millis(100));
- sender.send(Events::WidgetReady).ok();
+ // Batch up all other remaining events received so far
+ let events = event.chain(rx_fs_events.try_iter())
+ .map(transform_event)
+ .flatten()
+ .fold(HashMap::with_capacity(1000), |mut events, (dir, event)| {
+ events.entry(dir)
+ .or_insert(vec![])
+ .push(event);
+
+ events
+ });
+
+ Ok(events)
+ };
+
+
+ let dispatch_events =
+ move |events| -> HResult<()> {
+ fs_event_dispatcher.dispatch(events)?;
+ sender.send(Events::WidgetReady)?;
+ Ok(())
+ };
+
+ loop {
+ if let Ok(events) = collect_events().log_and() {
+ dispatch_events(events).log();
+ }
}
- Ok(())
});
}
-fn apply_event(_fs_cache: &Arc<RwLock<HashMap<File, Files>>>,
- fs_changes: &Arc<RwLock<Vec<(File, Option<File>, Option<File>)>>>,
- event: DebouncedEvent)
- -> HResult<()> {
- let path = &event.get_source_path()?;
-
- let dirpath = path.parent()
- .map(|path| path.to_path_buf())
- .unwrap_or_else(|| PathBuf::from("/"));
- let dir = File::new_from_path(&dirpath, None)?;
-
- let old_file = File::new_from_path(&path, None)?;
- let mut new_file = match event {
- DebouncedEvent::Remove(_) => None,
- _ => Some(File::new_from_path(&path, None)?)
- };
-
- new_file.as_mut().map(|file| file.meta_sync());
-
- fs_changes.write()?.push((dir,
- Some(old_file),
- new_file));
-
- // for dir in fs_cache.write()?.values_mut() {
- // if dir.path_in_here(&path).unwrap_or(false) {
- // let old_file = dir.find_file_with_path(&path).cloned();
- // let dirty_meta = old_file
- // .as_ref()
- // .map(|f| f.dirty_meta.clone())
- // .unwrap_or(None);
- // let mut new_file = match event {
- // DebouncedEvent::Remove(_) => None,
- // _ => Some(File::new_from_path(&path, dirty_meta)?)
- // };
-
- // new_file.as_mut().map(|file| file.meta_sync());
- // dir.replace_file(old_file.as_ref(), new_file.clone()).log();
-
- // fs_changes.write()?.push((dir.directory.clone(),
- // old_file,
- // new_file));
- // }
- // }
- Ok(())
-}
+
trait PathFromEvent {
fn get_source_path(&self) -> HResult<&PathBuf>;