diff options
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/common/mqtt_channel/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/operations.rs | 51 | ||||
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs | 100 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/error.rs | 3 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 70 |
8 files changed, 103 insertions, 127 deletions
@@ -1641,6 +1641,7 @@ dependencies = [ "mqtt_tests", "rumqttc", "serial_test", + "tedge_utils", "thiserror", "tokio", ] diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml index c4a2e352..9c75e543 100644 --- a/crates/common/mqtt_channel/Cargo.toml +++ b/crates/common/mqtt_channel/Cargo.toml @@ -12,6 +12,7 @@ async-trait = "0.1" futures = "0.3" fastrand = "1.8" rumqttc = "0.10" +tedge_utils = { path = "../tedge_utils" } thiserror = "1.0" tokio = { version = "1.12", features = ["rt", "time"] } diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs index 87e6aee9..ea0202b7 100644 --- a/crates/core/c8y_smartrest/src/operations.rs +++ b/crates/core/c8y_smartrest/src/operations.rs @@ -48,6 +48,16 @@ pub struct Operations { operations_by_trigger: HashMap<String, usize>, } +/// depending on which editor you use, temporary files could be created that contain the name of +/// the file. +/// this `operation_name_is_valid` fn will ensure that only files that do not contain +/// any special characters are allowed. +pub fn is_valid_operation_name(operation: &str) -> bool { + operation + .chars() + .all(|c| c.is_ascii_alphabetic() || c.is_numeric() || c.eq(&'_')) +} + impl Operations { pub fn add_operation(&mut self, operation: Operation) { if self.operations.iter().any(|o| o.name.eq(&operation.name)) { @@ -106,23 +116,28 @@ fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, .collect::<Vec<PathBuf>>(); for path in dir_entries { - let mut details = match fs::read(&path) { - Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) - .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, + if let Some(file_name) = path.file_name().and_then(|file_name| file_name.to_str()) { + if !is_valid_operation_name(file_name) { + continue; + } + + let mut details = match fs::read(&path) { + Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) + .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, - Err(err) => return Err(OperationsError::FromIo(err)), - }; + Err(err) => return Err(OperationsError::FromIo(err)), + }; - details.name = path - .file_name() - .and_then(|filename| filename.to_str()) - .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? - .to_owned(); - operations.add_operation(details); + details.name = path + .file_name() + .and_then(|filename| filename.to_str()) + .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? + .to_owned(); + operations.add_operation(details); + } } Ok(operations) } - pub fn get_operation(path: PathBuf) -> Result<Operation, OperationsError> { let mut details = match fs::read(&path) { Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) @@ -231,4 +246,16 @@ mod tests { assert_eq!(operations.operations.len(), ops_count); } + + #[test_case("file_a?", false)] + #[test_case("~file_b", false)] + #[test_case("c8y_Command", true)] + #[test_case("c8y_CommandA~", false)] + #[test_case(".c8y_CommandB", false)] + #[test_case("c8y_CommandD?", false)] + #[test_case("c8y_CommandE?!£$%^&*(", false)] + #[test_case("?!£$%^&*(c8y_CommandF?!£$%^&*(", false)] + fn operation_name_should_contain_only_alphabetic_chars(operation: &str, expected_result: bool) { + assert_eq!(is_valid_operation_name(operation), expected_result) + } } diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index fa5447fb..009d3cc6 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -47,7 +47,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" clap = { version = "3.2", features = ["cargo", "derive"] } tedge_config = { path = "../../common/tedge_config" } -tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } +tedge_utils = { path = "../../common/tedge_utils", features = ["logging", "fs-notify"] } thin_edge_json = { path = "../thin_edge_json" } thiserror = "1.0" time = "0.3" diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs index 4eff35c4..afe0eb71 100644 --- a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs +++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs @@ -1,7 +1,8 @@ -use std::{ffi::OsString, path::PathBuf}; +use std::path::{Path, PathBuf}; -use inotify::{Event, EventMask, Inotify, WatchMask}; +use c8y_smartrest::operations::is_valid_operation_name; use serde::{Deserialize, Serialize}; +use tedge_utils::fs_notify::FileEvent; #[derive(Serialize, Deserialize, Debug)] pub enum EventType { @@ -19,81 +20,40 @@ pub struct DiscoverOp { #[derive(thiserror::Error, Debug)] #[allow(clippy::enum_variant_names)] pub enum DynamicDiscoverOpsError { - #[error("Failed to add watch to directory: {0}")] - FailedtoAddWatch(String), - - #[error("A non-UTF8 name cannot be used as an operation name: {0:?}")] - NotAnOperationName(OsString), + #[error("A non-UTF8 path cannot be parsed as an operation: {0:?}")] + NotAnOperation(PathBuf), #[error(transparent)] EventError(#[from] std::io::Error), } -pub fn create_inotify_watch(ops_dir: PathBuf) -> Result<Inotify, DynamicDiscoverOpsError> { - let mut inotify = Inotify::init()?; - inotify - .add_watch(ops_dir.clone(), WatchMask::CLOSE_WRITE | WatchMask::DELETE) - .map_err(|_| { - DynamicDiscoverOpsError::FailedtoAddWatch(ops_dir.to_string_lossy().to_string()) - })?; - Ok(inotify) -} - -pub fn create_inofity_event_stream( - ops_dir: PathBuf, -) -> Result<inotify::EventStream<[u8; 1024]>, DynamicDiscoverOpsError> { - let buffer = [0; 1024]; - let mut ino = create_inotify_watch(ops_dir)?; - Ok(ino.event_stream(buffer)?) -} - pub fn process_inotify_events( - ops_dir: PathBuf, - event: Event<OsString>, + path: &Path, + mask: FileEvent, ) -> Result<Option<DiscoverOp>, DynamicDiscoverOpsError> { - if let Some(ops_name) = event.clone().name { - let operation_name = ops_name - .to_str() - .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperationName(ops_name.clone())); - - match operation_name { - Ok(ops_name) => match event.mask { - EventMask::DELETE => { - return Ok(Some(DiscoverOp { - ops_dir, - event_type: EventType::Remove, - operation_name: ops_name.to_string(), - })) - } - EventMask::CLOSE_WRITE => { - return Ok(Some(DiscoverOp { - ops_dir, - event_type: EventType::Add, - operation_name: ops_name.to_string(), - })) - } - _ => return Ok(None), - }, - Err(e) => return Err(e), + let operation_name = path + .file_name() + .and_then(|file_name| file_name.to_str()) + .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperation(path.to_path_buf()))?; + + let parent_dir = path + .parent() + .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperation(path.to_path_buf()))?; + + if is_valid_operation_name(operation_name) { + match mask { + FileEvent::Deleted => Ok(Some(DiscoverOp { + ops_dir: parent_dir.to_path_buf(), + event_type: EventType::Remove, + operation_name: operation_name.to_string(), + })), + FileEvent::Created | FileEvent::Modified => Ok(Some(DiscoverOp { + ops_dir: parent_dir.to_path_buf(), + event_type: EventType::Add, + operation_name: operation_name.to_string(), + })), } + } else { + Ok(None) } - Ok(None) -} - -#[cfg(test)] -#[test] -fn create_inotify_with_non_existing_dir() { - let err = create_inotify_watch("/tmp/discover_ops".into()).unwrap_err(); - assert_eq!( - err.to_string(), - "Failed to add watch to directory: /tmp/discover_ops" - ); -} - -#[test] -fn create_inotify_with_right_directory() { - use tedge_test_utils::fs::TempTedgeDir; - let dir = TempTedgeDir::new(); - let res = create_inotify_watch(dir.path().to_path_buf()); - assert!(res.is_ok()); } diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 81068adb..24bce5cb 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -90,7 +90,7 @@ impl TEdgeComponent for CumulocityMapper { let ops_dir = PathBuf::from(format!("{}/operations/c8y", &config_dir)); mapper - .run(Some(ops_dir)) + .run(Some(&ops_dir)) .instrument(info_span!(CUMULOCITY_MAPPER_NAME)) .await?; diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index 2d2e0163..1932ab61 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -25,6 +25,9 @@ pub enum MapperError { #[error(transparent)] FromFlockfile(#[from] flockfile::FlockfileError), + + #[error(transparent)] + FromNotifyFs(#[from] tedge_utils::fs_notify::NotifyStreamError), } #[derive(Debug, thiserror::Error)] diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index a16a042d..b9144ea5 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -5,8 +5,9 @@ use mqtt_channel::{ UnboundedSender, }; use serde_json::json; -use std::path::PathBuf; +use std::path::Path; use std::{process, time::Duration}; +use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent}; use time::OffsetDateTime; use tracing::{error, info, instrument, warn}; const SYNC_WINDOW: Duration = Duration::from_secs(3); @@ -86,7 +87,7 @@ impl Mapper { } } - pub(crate) async fn run(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> { + pub(crate) async fn run(&mut self, ops_dir: Option<&Path>) -> Result<(), MapperError> { info!("Running"); self.process_messages(ops_dir).await?; Ok(()) @@ -102,7 +103,7 @@ impl Mapper { } #[instrument(skip(self), name = "messages")] - async fn process_messages(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> { + async fn process_messages(&mut self, ops_dir: Option<&Path>) -> Result<(), MapperError> { let init_messages = self.converter.init_messages(); for init_message in init_messages.into_iter() { let _ = self.output.send(init_message).await; @@ -122,16 +123,7 @@ impl Mapper { self.process_message(message).await; } - match ops_dir { - // Create inotify steam for capturing the inotify events. - Some(dir) => { - process_inotify_and_mqtt_messages(self, dir).await?; - } - None => { - // If there is no operation directory to watch, then continue processing only the mqtt messages - let _ = process_mqtt_messages(self).await; - } - } + process_messages(self, ops_dir).await?; Ok(()) } @@ -155,28 +147,26 @@ impl Mapper { } } -async fn process_inotify_and_mqtt_messages( - mapper: &mut Mapper, - dir: PathBuf, -) -> Result<(), MqttError> { - match create_inofity_event_stream(dir.clone()) { - Ok(mut inotify_events) => loop { +async fn process_messages(mapper: &mut Mapper, path: Option<&Path>) -> Result<(), MapperError> { + if let Some(path) = path { + let fs_notification_stream = fs_notify_stream(&[( + path, + None, + &[FileEvent::Created, FileEvent::Deleted, FileEvent::Modified], + )])?; + pin_mut!(fs_notification_stream); // needed for iteration + + loop { tokio::select! { - msg = mapper.input.next() => { - match msg { - Some(message) => { - mapper.process_message(message).await; - } None => { - break Ok(()); - } - } + Some(message) = mapper.input.next() => { + mapper.process_message(message).await; } - Some(event_or_error) = inotify_events.next() => { + Some(event_or_error) = fs_notification_stream.next() => { match event_or_error { - Ok(event) => { - match process_inotify_events(dir.clone(), event) { + Ok((path, mask)) => { + match process_inotify_events(&path, mask) { Ok(Some(discovered_ops)) => { - let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await; + let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await; } Ok(None) => {} Err(e) => {eprintln!("Processing inotify event failed due to {}", e);} @@ -185,24 +175,18 @@ async fn process_inotify_and_mqtt_messages( Err(error) => { eprintln!("Failed to extract event {}", error); } - } + } // On error continue to process only mqtt messages. } } - }, // On error continue to process only mqtt messages. - Err(e) => { - eprintln!("Failed to create the inotify stream due to {:?}. So, dynamic operation discovery not supported, please restart the mapper on Add/Removal of an operation", e); - process_mqtt_messages(mapper).await } + } else { + while let Some(message) = mapper.input.next().await { + mapper.process_message(message).await; + } + Ok(()) } } -async fn process_mqtt_messages(mapper: &mut Mapper) -> Result<(), MqttError> { - while let Some(message) = mapper.input.next().await { - mapper.process_message(message).await; - } - Ok(()) -} - #[cfg(test)] mod tests { use super::*; |