summaryrefslogtreecommitdiffstats
path: root/crates/common/tedge_utils/src/fs_notify.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/common/tedge_utils/src/fs_notify.rs')
-rw-r--r--crates/common/tedge_utils/src/fs_notify.rs574
1 files changed, 574 insertions, 0 deletions
diff --git a/crates/common/tedge_utils/src/fs_notify.rs b/crates/common/tedge_utils/src/fs_notify.rs
new file mode 100644
index 00000000..00003fda
--- /dev/null
+++ b/crates/common/tedge_utils/src/fs_notify.rs
@@ -0,0 +1,574 @@
+use std::{
+ collections::HashMap,
+ hash::Hash,
+ path::{Path, PathBuf},
+};
+
+use async_stream::try_stream;
+pub use futures::{pin_mut, Stream, StreamExt};
+use inotify::{EventMask, Inotify, WatchMask};
+use std::collections::BTreeSet;
+use strum_macros::Display;
+use try_traits::default::TryDefault;
+
+#[derive(Debug, Display, PartialEq, Eq, Clone, Hash, PartialOrd, Ord, Copy)]
+pub enum FileEvent {
+ Modified,
+ Deleted,
+ Created,
+}
+
+impl From<FileEvent> for WatchMask {
+ fn from(value: FileEvent) -> Self {
+ match value {
+ FileEvent::Modified => WatchMask::MODIFY,
+ FileEvent::Deleted => WatchMask::DELETE,
+ FileEvent::Created => WatchMask::CREATE,
+ }
+ }
+}
+
+impl TryFrom<EventMask> for FileEvent {
+ type Error = NotifyStreamError;
+
+ fn try_from(value: EventMask) -> Result<Self, Self::Error> {
+ match value {
+ EventMask::MODIFY => Ok(FileEvent::Modified),
+ EventMask::DELETE => Ok(FileEvent::Deleted),
+ EventMask::CREATE => Ok(FileEvent::Created),
+ _ => Err(NotifyStreamError::UnsupportedEventMask { mask: value }),
+ }
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum NotifyStreamError {
+ #[error(transparent)]
+ FromIOError(#[from] std::io::Error),
+
+ #[error("Error creating event stream")]
+ FailedToCreateStream,
+
+ #[error("Error normalising path for: {path:?}")]
+ FailedToNormalisePath { path: PathBuf },
+
+ #[error("Unsupported mask: {mask:?}")]
+ UnsupportedWatchMask { mask: WatchMask },
+
+ #[error("Unsupported mask: {mask:?}")]
+ UnsupportedEventMask { mask: EventMask },
+
+ #[error("Expected watch directory to be: {expected:?} but was: {actual:?}")]
+ WrongParentDirectory { expected: PathBuf, actual: PathBuf },
+
+ #[error("Watcher: {mask} is duplicated for file: {path:?}")]
+ DuplicateWatcher { mask: FileEvent, path: PathBuf },
+}
+
+#[derive(Debug, Default, Clone)]
+struct WatchDescriptor {
+ description: HashMap<PathBuf, HashMap<Option<String>, Vec<FileEvent>>>,
+}
+
+impl WatchDescriptor {
+ #[cfg(test)]
+ #[cfg(feature = "fs-notify")]
+ pub fn get_watch_descriptor(
+ &self,
+ ) -> &HashMap<PathBuf, HashMap<Option<String>, Vec<FileEvent>>> {
+ &self.description
+ }
+
+ /// inserts new values in `self.description`. this takes care of inserting
+ /// - new keys (dir_path, file_name)
+ /// - inserting or appending new masks
+ /// NOTE: though it is not a major concern, the `masks` entry is unordered
+ /// vec![Masks::Deleted, Masks::Modified] does not equal vec![Masks::Modified, Masks::Deleted]
+ fn insert(&mut self, dir_path: PathBuf, file_name: Option<String>, masks: Vec<FileEvent>) {
+ let root_directory_entry = self
+ .description
+ .entry(dir_path)
+ .or_insert_with(HashMap::new);
+ let file_entry = root_directory_entry
+ .entry(file_name)
+ .or_insert_with(|| masks.clone());
+ // if `file_entry` does not contain a `mask`, insert it.
+ let mut ext = masks
+ .into_iter()
+ .filter(|mask| !file_entry.contains(mask))
+ .collect::<Vec<FileEvent>>();
+
+ file_entry.append(&mut ext);
+ }
+
+ /// get a set of `Masks` for a given `dir_path`
+ fn get_mask_set_for_directory<P: AsRef<Path>>(&mut self, dir_path: P) -> Vec<FileEvent> {
+ let hash_map = self
+ .description
+ .entry(dir_path.as_ref().to_path_buf())
+ .or_insert_with(HashMap::new);
+
+ let set = hash_map
+ .values()
+ .flat_map(|masks| masks.iter())
+ .map(|mask| mask.to_owned())
+ .collect::<BTreeSet<_>>();
+ Vec::from_iter(set)
+ }
+}
+
+pub struct NotifyStream {
+ buffer: [u8; 1024],
+ inotify: Inotify,
+ watchers: WatchDescriptor,
+}
+
+impl TryDefault for NotifyStream {
+ type Error = NotifyStreamError;
+
+ fn try_default() -> Result<Self, Self::Error> {
+ let inotify = Inotify::init()?;
+ let buffer = [0; 1024];
+
+ Ok(Self {
+ buffer,
+ inotify,
+ watchers: WatchDescriptor::default(),
+ })
+ }
+}
+
+/// normalisation step joining `candidate_watch_dir` and `candidate_file` and computing the parent of `candidate_file`.
+///
+/// this is useful in situations where:
+/// `candidate_watch_dir` = /path/to/a/directory
+/// `candidate_file` = continued/path/to/a/file
+///
+/// this function will concatenate the two, into:
+/// `/path/to/a/directory/continued/path/to/a/file`
+/// and will return:
+/// `/path/to/a/directory/continued/path/to/a/` and `file`
+fn normalising_watch_dir_and_file(
+ candidate_watch_dir: &Path,
+ candidate_file: Option<String>,
+) -> Result<(PathBuf, Option<String>), NotifyStreamError> {
+ match candidate_file {
+ Some(file_name) => {
+ let full_path = candidate_watch_dir.join(file_name);
+ let parent =
+ full_path
+ .parent()
+ .ok_or_else(|| NotifyStreamError::FailedToNormalisePath {
+ path: full_path.to_path_buf(),
+ })?;
+ let file = full_path
+ .file_name()
+ .and_then(|f| f.to_str())
+ .ok_or_else(|| NotifyStreamError::FailedToNormalisePath {
+ path: full_path.to_path_buf(),
+ })?;
+ Ok((parent.to_path_buf(), Some(file.to_string())))
+ }
+ None => Ok((candidate_watch_dir.to_path_buf(), None)),
+ }
+}
+
+/// to allow notify to watch for multiple events (CLOSE_WRITE, CREATE, MODIFY, etc...)
+/// our internal enum `Masks` needs to be converted into a single `WatchMask` via bitwise OR
+/// operations. (Note, our `Masks` type is an enum, `WatchMask` is a bitflag)
+pub(crate) fn pipe_masks_into_watch_mask(masks: &[FileEvent]) -> WatchMask {
+ let mut watch_mask = WatchMask::empty();
+ for mask in masks {
+ watch_mask |= mask.clone().into()
+ }
+ watch_mask
+}
+
+impl NotifyStream {
+ /// add a watcher to a file or to a directory
+ ///
+ /// this is implemented as a directory watcher regardless if a file is desired
+ /// to be watched or if a directory. There is an internal data structure that
+ /// keeps track of what is being watched - `self.watchers`
+ /// The `stream` method determines whether the incoming event matches what is
+ /// expected in `self.watchers`.
+ ///
+ /// # Watching directories
+ ///
+ /// ```rust
+ /// use tedge_utils::fs_notify::{NotifyStream, FileEvent};
+ /// use try_traits::default::TryDefault;
+ /// use std::path::Path;
+ ///
+ /// let dir_path_a = Path::new("/tmp");
+ /// let dir_path_b = Path::new("/etc/tedge/c8y");
+ ///
+ /// let mut fs_notification_stream = NotifyStream::try_default().unwrap();
+ /// fs_notification_stream.add_watcher(dir_path_a, None, &[FileEvent::Created]).unwrap();
+ /// fs_notification_stream.add_watcher(dir_path_b, None, &[FileEvent::Created, FileEvent::Deleted]).unwrap();
+ /// ```
+ ///
+ /// # Watching files
+ ///
+ /// ```rust
+ /// use tedge_utils::fs_notify::{NotifyStream, FileEvent};
+ /// use tedge_test_utils::fs::TempTedgeDir;
+ /// use try_traits::default::TryDefault;
+ ///
+ /// let ttd = TempTedgeDir::new(); // created a new tmp directory
+ /// let file_a = ttd.file("file_a");
+ /// let file_b = ttd.file("file_b");
+ ///
+ /// let mut fs_notification_stream = NotifyStream::try_default().unwrap();
+ /// fs_notification_stream.add_watcher(ttd.path(), Some(String::from("file_a")), &[FileEvent::Modified]).unwrap();
+ /// fs_notification_stream.add_watcher(ttd.path(), Some(String::from("file_b")), &[FileEvent::Created, FileEvent::Deleted]).unwrap();
+ /// ```
+ /// NOTE:
+ /// in this last example, the root directory is the same: `ttd.path()`
+ /// but the files watched and masks are different. In the background,
+ /// the `add_watcher` fn will add a watch on `ttd.path()` with masks:
+ /// Created, Modified and Deleted. and will update `self.watchers`
+ /// with two entries, one for file_a and one for file_b.
+ ///
+ /// The `stream` method will check that events coming from
+ /// `ttd.path()` match `self.watchers`
+ pub fn add_watcher(
+ &mut self,
+ dir_path: &Path,
+ file: Option<String>,
+ masks: &[FileEvent],
+ ) -> Result<(), NotifyStreamError> {
+ let (dir_path, file) = normalising_watch_dir_and_file(dir_path, file)?;
+ let dir_path = dir_path.as_path();
+
+ if self.watchers.description.is_empty() {
+ let watch_mask = pipe_masks_into_watch_mask(masks);
+ let _ = self.inotify.add_watch(dir_path, watch_mask);
+ let mut wd = WatchDescriptor::default();
+ wd.insert(dir_path.to_path_buf(), file, masks.to_vec());
+ self.watchers = wd;
+ } else {
+ self.watchers
+ .insert(dir_path.to_path_buf(), file, masks.to_vec());
+ let masks = self.watchers.get_mask_set_for_directory(dir_path);
+
+ let watch_mask = pipe_masks_into_watch_mask(&masks);
+ let _ = self.inotify.add_watch(dir_path, watch_mask);
+ }
+ Ok(())
+ }
+
+ //// create an fs notification event stream
+ pub fn stream(mut self) -> impl Stream<Item = Result<(PathBuf, FileEvent), NotifyStreamError>> {
+ try_stream! {
+ let mut notify_service = self.inotify.event_stream(self.buffer)?;
+ while let Some(event_or_error) = notify_service.next().await {
+ match event_or_error {
+ Ok(event) => {
+ let event_mask: FileEvent = event.mask.try_into()?;
+ // because watching a file or watching a direcotry is implemented as
+ // watching a directory, we can ignore the case where &event.name is None
+ if let Some(event_name) = &event.name {
+ let notify_file_name = event_name.to_str().ok_or_else(|| NotifyStreamError::FailedToCreateStream)?;
+ // inotify triggered for a file named `notify_file_name`. Next we need
+ // to see if we have a matching entry WITH a matching flag/mask in `self.watchers`
+ for (dir_path, key) in &self.watchers.description {
+ for (maybe_file_name, flags) in key {
+ for flag in flags {
+ // There are two cases:
+ // 1. we added a file watch
+ // 2. we added a directory watch
+ //
+ // for case 1. our input could have been something like:
+ // ...
+ // notify_service.add_watcher(
+ // "/path/to/some/place",
+ // Some("file_name"), <------ note file name is given
+ // &[Masks::Created]
+ // )
+ // here the file we are watching is *given* - so we can yield events with the
+ // corresponding `event_name` and mask.
+ if let Some(file_name) = maybe_file_name {
+ if file_name.eq(notify_file_name) && event_mask.eq(flag) {
+ let full_path = dir_path.join(file_name);
+ yield (full_path, event_mask)
+ }
+ }
+ else {
+ // for case 2. our input could have been something like:
+ // notify_service.add_watcher(
+ // "/path/to/some/place",
+ // None, <------ note the file name is not given
+ // &[Masks::Created]
+ // )
+ // here the file we are watching is not known to us, so we match only on event mask
+ if event_mask.eq(flag) {
+ let full_path = dir_path.join(notify_file_name);
+ yield (full_path, event_mask)
+ }
+ }
+ }
+ }
+
+ }
+ }
+ // there should never be an "if let None = &event.name" because add_watcher
+ // will always add a watcher as a directory
+ },
+ Err(error) => {
+ // any error comming out of `notify_service.next()` will be
+ // an std::Io error: https://docs.rs/inotify/latest/src/inotify/stream.rs.html#48
+ yield Err(NotifyStreamError::FromIOError(error))?;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// utility function to return an fs notify stream:
+///
+/// this supports both file wathes and directory watches:
+///
+/// # Example
+/// ```rust
+/// use tedge_utils::fs_notify::{fs_notify_stream, FileEvent};
+/// use tedge_test_utils::fs::TempTedgeDir;
+///
+/// // created a new tmp directory with some files and directories
+/// let ttd = TempTedgeDir::new();
+/// let file_a = ttd.file("file_a");
+/// let file_b = ttd.file("file_b");
+/// let file_c = ttd.dir("some_directory").file("file_c");
+///
+///
+/// let fs_notification_stream = fs_notify_stream(&[
+/// (ttd.path(), Some(String::from("file_a")), &[FileEvent::Created]),
+/// (ttd.path(), Some(String::from("file_b")), &[FileEvent::Modified, FileEvent::Created]),
+/// (ttd.path(), Some(String::from("some_directory/file_c")), &[FileEvent::Deleted])
+/// ]
+/// ).unwrap();
+/// ```
+pub fn fs_notify_stream(
+ input: &[(&Path, Option<String>, &[FileEvent])],
+) -> Result<impl Stream<Item = Result<(PathBuf, FileEvent), NotifyStreamError>>, NotifyStreamError>
+{
+ let mut fs_notification_service = NotifyStream::try_default()?;
+ for (dir_path, watch, flags) in input {
+ fs_notification_service.add_watcher(dir_path, watch.to_owned(), flags)?;
+ }
+ Ok(fs_notification_service.stream())
+}
+
+#[cfg(test)]
+#[cfg(feature = "fs-notify")]
+mod tests {
+ use std::{collections::HashMap, path::PathBuf, sync::Arc};
+
+ use futures::{pin_mut, Stream, StreamExt};
+
+ use maplit::hashmap;
+ use tedge_test_utils::fs::TempTedgeDir;
+ use try_traits::default::TryDefault;
+
+ use crate::fs_notify::FileEvent;
+
+ use super::{fs_notify_stream, NotifyStream, NotifyStreamError, WatchDescriptor};
+
+ #[test]
+ /// this test checks the underlying data structure `WatchDescriptor.description`
+ /// three files are created:
+ /// - file_a, file_b at root level of `TempTedgeDir`
+ /// - file_c, at level: `TempTedgeDir`/new_dir
+ fn test_watch_descriptor_data_field() {
+ let ttd = TempTedgeDir::new();
+ let new_dir = ttd.dir("new_dir");
+ ttd.file("file_a");
+ ttd.file("file_b");
+ new_dir.file("file_c");
+
+ let expected_data_structure = hashmap! {
+ ttd.path().to_path_buf() => hashmap! {
+ Some(String::from("file_a")) => vec![FileEvent::Created, FileEvent::Deleted],
+ Some(String::from("file_b")) => vec![FileEvent::Created, FileEvent::Modified]
+ },
+ new_dir.path().to_path_buf() => hashmap! {
+ Some(String::from("file_c")) => vec![FileEvent::Modified]
+ }
+
+ };
+ let expected_hash_set_for_root_dir =
+ vec![FileEvent::Modified, FileEvent::Deleted, FileEvent::Created];
+ let expected_hash_set_for_new_dir = vec![FileEvent::Modified];
+
+ let mut actual_data_structure = WatchDescriptor::default();
+ actual_data_structure.insert(
+ ttd.path().to_path_buf(),
+ Some(String::from("file_a")),
+ vec![FileEvent::Created],
+ );
+ actual_data_structure.insert(
+ ttd.path().to_path_buf(),
+ Some(String::from("file_b")),
+ vec![FileEvent::Created, FileEvent::Modified],
+ );
+ actual_data_structure.insert(
+ new_dir.path().to_path_buf(),
+ Some(String::from("file_c")),
+ vec![FileEvent::Modified],
+ );
+ // NOTE: re-adding `file_a` with an extra mask
+ actual_data_structure.insert(
+ ttd.path().to_path_buf(),
+ Some(String::from("file_a")),
+ vec![FileEvent::Deleted],
+ );
+ assert!(actual_data_structure
+ .get_watch_descriptor()
+ .eq(&expected_data_structure));
+
+ assert_eq!(
+ actual_data_structure.get_mask_set_for_directory(ttd.path()),
+ expected_hash_set_for_root_dir
+ );
+
+ assert_eq!(
+ actual_data_structure.get_mask_set_for_directory(new_dir.path()),
+ expected_hash_set_for_new_dir
+ );
+ }
+
+ #[test]
+ fn test_add_watcher() {
+ let ttd = TempTedgeDir::new();
+ let new_dir = ttd.dir("new_dir");
+ ttd.file("file_a");
+ ttd.file("file_b");
+ new_dir.file("file_c");
+
+ let mut notify_service = NotifyStream::try_default().unwrap();
+ notify_service
+ .add_watcher(
+ ttd.path(),
+ Some(String::from("file_a")),
+ &[FileEvent::Created],
+ )
+ .unwrap();
+ notify_service
+ .add_watcher(
+ ttd.path(),
+ Some(String::from("file_a")),
+ &[FileEvent::Created, FileEvent::Deleted],
+ )
+ .unwrap();
+ notify_service
+ .add_watcher(
+ ttd.path(),
+ Some(String::from("file_b")),
+ &[FileEvent::Modified],
+ )
+ .unwrap();
+ notify_service
+ .add_watcher(
+ new_dir.path(),
+ Some(String::from("file_c")),
+ &[FileEvent::Deleted],
+ )
+ .unwrap();
+ }
+
+ async fn assert_stream(
+ mut inputs: HashMap<String, Vec<FileEvent>>,
+ stream: Result<
+ impl Stream<Item = Result<(PathBuf, FileEvent), NotifyStreamError>>,
+ NotifyStreamError,
+ >,
+ ) {
+ let stream = stream.unwrap();
+ pin_mut!(stream);
+ while let Some(Ok((path, flag))) = stream.next().await {
+ let file_name = String::from(path.file_name().unwrap().to_str().unwrap());
+ let mut values = inputs.get_mut(&file_name).unwrap().to_vec();
+ let index = values.iter().position(|x| *x == flag).unwrap();
+ values.remove(index);
+
+ if values.is_empty() {
+ inputs.remove(&file_name);
+ } else {
+ inputs.insert(file_name, values);
+ }
+
+ if inputs.is_empty() {
+ break;
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_multiple_known_files_watched() {
+ let ttd = Arc::new(TempTedgeDir::new());
+ let ttd_clone = ttd.clone();
+
+ let expected_events = hashmap! {
+ String::from("file_a") => vec![FileEvent::Created],
+ String::from("file_b") => vec![FileEvent::Created, FileEvent::Modified]
+ };
+
+ let stream = fs_notify_stream(&[
+ (
+ ttd.path(),
+ Some(String::from("file_a")),
+ &[FileEvent::Created],
+ ),
+ (
+ ttd.path(),
+ Some(String::from("file_b")),
+ &[FileEvent::Created, FileEvent::Modified],
+ ),
+ ]);
+
+ let fs_notify_handler = tokio::task::spawn(async move {
+ assert_stream(expected_events, stream).await;
+ });
+
+ let file_handler = tokio::task::spawn(async move {
+ ttd_clone.file("file_a").with_raw_content("content");
+ ttd_clone.file("file_b").with_raw_content("content");
+ });
+
+ let () = fs_notify_handler.await.unwrap();
+ let () = file_handler.await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_multiple_unknown_files_watched() {
+ let ttd = Arc::new(TempTedgeDir::new());
+ ttd.file("file_b"); // creating this file before the fs notify service
+ let ttd_clone = ttd.clone();
+
+ let expected_events = hashmap! {
+ String::from("file_a") => vec![FileEvent::Created],
+ String::from("file_b") => vec![FileEvent::Modified],
+ String::from("file_c") => vec![FileEvent::Created, FileEvent::Deleted]
+ };
+
+ let stream = fs_notify_stream(&[(
+ ttd.path(),
+ None,
+ &[FileEvent::Created, FileEvent::Modified, FileEvent::Deleted],
+ )]);
+
+ let fs_notify_handler = tokio::task::spawn(async move {
+ assert_stream(expected_events, stream).await;
+ });
+
+ let file_handler = tokio::task::spawn(async move {
+ ttd_clone.file("file_a"); // should match CREATE
+ ttd_clone.file("file_b").with_raw_content("content"); // should match MODIFY
+ ttd_clone.file("file_c").delete(); // should match CREATE, DELETE
+ });
+
+ let () = fs_notify_handler.await.unwrap();
+ let () = file_handler.await.unwrap();
+ }
+}