summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2022-07-26 07:54:44 +0100
committerGitHub <noreply@github.com>2022-07-26 07:54:44 +0100
commitf39825fbed5772511340f1f6aca3ed927eea837b (patch)
tree0c8d97b4f5cdf1e33da04d074af60f3c813f35ef
parent87ed970eab78f8c2dc043ec2259d104da2352cbd (diff)
parent8cd372ead92ce8aa4cc8c4dce7a40f6c028f817f (diff)
Merge pull request #1250 from initard/improvement/1173/inotify-tedge-mapper-dynamic-discovery-of-operations
tedge mapper dynamic discovery of operations
-rw-r--r--Cargo.lock1
-rw-r--r--crates/common/mqtt_channel/Cargo.toml1
-rw-r--r--crates/core/c8y_smartrest/src/operations.rs51
-rw-r--r--crates/core/tedge_mapper/Cargo.toml2
-rw-r--r--crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs100
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs2
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs3
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs70
8 files changed, 103 insertions, 127 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 51299c3b..9dbf4aec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1641,6 +1641,7 @@ dependencies = [
"mqtt_tests",
"rumqttc",
"serial_test",
+ "tedge_utils",
"thiserror",
"tokio",
]
diff --git a/crates/common/mqtt_channel/Cargo.toml b/crates/common/mqtt_channel/Cargo.toml
index c4a2e352..9c75e543 100644
--- a/crates/common/mqtt_channel/Cargo.toml
+++ b/crates/common/mqtt_channel/Cargo.toml
@@ -12,6 +12,7 @@ async-trait = "0.1"
futures = "0.3"
fastrand = "1.8"
rumqttc = "0.10"
+tedge_utils = { path = "../tedge_utils" }
thiserror = "1.0"
tokio = { version = "1.12", features = ["rt", "time"] }
diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs
index 87e6aee9..ea0202b7 100644
--- a/crates/core/c8y_smartrest/src/operations.rs
+++ b/crates/core/c8y_smartrest/src/operations.rs
@@ -48,6 +48,16 @@ pub struct Operations {
operations_by_trigger: HashMap<String, usize>,
}
+/// depending on which editor you use, temporary files could be created that contain the name of
+/// the file.
+/// this `operation_name_is_valid` fn will ensure that only files that do not contain
+/// any special characters are allowed.
+pub fn is_valid_operation_name(operation: &str) -> bool {
+ operation
+ .chars()
+ .all(|c| c.is_ascii_alphabetic() || c.is_numeric() || c.eq(&'_'))
+}
+
impl Operations {
pub fn add_operation(&mut self, operation: Operation) {
if self.operations.iter().any(|o| o.name.eq(&operation.name)) {
@@ -106,23 +116,28 @@ fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations,
.collect::<Vec<PathBuf>>();
for path in dir_entries {
- let mut details = match fs::read(&path) {
- Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
- .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
+ if let Some(file_name) = path.file_name().and_then(|file_name| file_name.to_str()) {
+ if !is_valid_operation_name(file_name) {
+ continue;
+ }
+
+ let mut details = match fs::read(&path) {
+ Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
+ .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
- Err(err) => return Err(OperationsError::FromIo(err)),
- };
+ Err(err) => return Err(OperationsError::FromIo(err)),
+ };
- details.name = path
- .file_name()
- .and_then(|filename| filename.to_str())
- .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
- .to_owned();
- operations.add_operation(details);
+ details.name = path
+ .file_name()
+ .and_then(|filename| filename.to_str())
+ .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
+ .to_owned();
+ operations.add_operation(details);
+ }
}
Ok(operations)
}
-
pub fn get_operation(path: PathBuf) -> Result<Operation, OperationsError> {
let mut details = match fs::read(&path) {
Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
@@ -231,4 +246,16 @@ mod tests {
assert_eq!(operations.operations.len(), ops_count);
}
+
+ #[test_case("file_a?", false)]
+ #[test_case("~file_b", false)]
+ #[test_case("c8y_Command", true)]
+ #[test_case("c8y_CommandA~", false)]
+ #[test_case(".c8y_CommandB", false)]
+ #[test_case("c8y_CommandD?", false)]
+ #[test_case("c8y_CommandE?!£$%^&*(", false)]
+ #[test_case("?!£$%^&*(c8y_CommandF?!£$%^&*(", false)]
+ fn operation_name_should_contain_only_alphabetic_chars(operation: &str, expected_result: bool) {
+ assert_eq!(is_valid_operation_name(operation), expected_result)
+ }
}
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index fa5447fb..009d3cc6 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -47,7 +47,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clap = { version = "3.2", features = ["cargo", "derive"] }
tedge_config = { path = "../../common/tedge_config" }
-tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
+tedge_utils = { path = "../../common/tedge_utils", features = ["logging", "fs-notify"] }
thin_edge_json = { path = "../thin_edge_json" }
thiserror = "1.0"
time = "0.3"
diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
index 4eff35c4..afe0eb71 100644
--- a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
+++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
@@ -1,7 +1,8 @@
-use std::{ffi::OsString, path::PathBuf};
+use std::path::{Path, PathBuf};
-use inotify::{Event, EventMask, Inotify, WatchMask};
+use c8y_smartrest::operations::is_valid_operation_name;
use serde::{Deserialize, Serialize};
+use tedge_utils::fs_notify::FileEvent;
#[derive(Serialize, Deserialize, Debug)]
pub enum EventType {
@@ -19,81 +20,40 @@ pub struct DiscoverOp {
#[derive(thiserror::Error, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum DynamicDiscoverOpsError {
- #[error("Failed to add watch to directory: {0}")]
- FailedtoAddWatch(String),
-
- #[error("A non-UTF8 name cannot be used as an operation name: {0:?}")]
- NotAnOperationName(OsString),
+ #[error("A non-UTF8 path cannot be parsed as an operation: {0:?}")]
+ NotAnOperation(PathBuf),
#[error(transparent)]
EventError(#[from] std::io::Error),
}
-pub fn create_inotify_watch(ops_dir: PathBuf) -> Result<Inotify, DynamicDiscoverOpsError> {
- let mut inotify = Inotify::init()?;
- inotify
- .add_watch(ops_dir.clone(), WatchMask::CLOSE_WRITE | WatchMask::DELETE)
- .map_err(|_| {
- DynamicDiscoverOpsError::FailedtoAddWatch(ops_dir.to_string_lossy().to_string())
- })?;
- Ok(inotify)
-}
-
-pub fn create_inofity_event_stream(
- ops_dir: PathBuf,
-) -> Result<inotify::EventStream<[u8; 1024]>, DynamicDiscoverOpsError> {
- let buffer = [0; 1024];
- let mut ino = create_inotify_watch(ops_dir)?;
- Ok(ino.event_stream(buffer)?)
-}
-
pub fn process_inotify_events(
- ops_dir: PathBuf,
- event: Event<OsString>,
+ path: &Path,
+ mask: FileEvent,
) -> Result<Option<DiscoverOp>, DynamicDiscoverOpsError> {
- if let Some(ops_name) = event.clone().name {
- let operation_name = ops_name
- .to_str()
- .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperationName(ops_name.clone()));
-
- match operation_name {
- Ok(ops_name) => match event.mask {
- EventMask::DELETE => {
- return Ok(Some(DiscoverOp {
- ops_dir,
- event_type: EventType::Remove,
- operation_name: ops_name.to_string(),
- }))
- }
- EventMask::CLOSE_WRITE => {
- return Ok(Some(DiscoverOp {
- ops_dir,
- event_type: EventType::Add,
- operation_name: ops_name.to_string(),
- }))
- }
- _ => return Ok(None),
- },
- Err(e) => return Err(e),
+ let operation_name = path
+ .file_name()
+ .and_then(|file_name| file_name.to_str())
+ .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperation(path.to_path_buf()))?;
+
+ let parent_dir = path
+ .parent()
+ .ok_or_else(|| DynamicDiscoverOpsError::NotAnOperation(path.to_path_buf()))?;
+
+ if is_valid_operation_name(operation_name) {
+ match mask {
+ FileEvent::Deleted => Ok(Some(DiscoverOp {
+ ops_dir: parent_dir.to_path_buf(),
+ event_type: EventType::Remove,
+ operation_name: operation_name.to_string(),
+ })),
+ FileEvent::Created | FileEvent::Modified => Ok(Some(DiscoverOp {
+ ops_dir: parent_dir.to_path_buf(),
+ event_type: EventType::Add,
+ operation_name: operation_name.to_string(),
+ })),
}
+ } else {
+ Ok(None)
}
- Ok(None)
-}
-
-#[cfg(test)]
-#[test]
-fn create_inotify_with_non_existing_dir() {
- let err = create_inotify_watch("/tmp/discover_ops".into()).unwrap_err();
- assert_eq!(
- err.to_string(),
- "Failed to add watch to directory: /tmp/discover_ops"
- );
-}
-
-#[test]
-fn create_inotify_with_right_directory() {
- use tedge_test_utils::fs::TempTedgeDir;
- let dir = TempTedgeDir::new();
- let res = create_inotify_watch(dir.path().to_path_buf());
- assert!(res.is_ok());
}
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 81068adb..24bce5cb 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -90,7 +90,7 @@ impl TEdgeComponent for CumulocityMapper {
let ops_dir = PathBuf::from(format!("{}/operations/c8y", &config_dir));
mapper
- .run(Some(ops_dir))
+ .run(Some(&ops_dir))
.instrument(info_span!(CUMULOCITY_MAPPER_NAME))
.await?;
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index 2d2e0163..1932ab61 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -25,6 +25,9 @@ pub enum MapperError {
#[error(transparent)]
FromFlockfile(#[from] flockfile::FlockfileError),
+
+ #[error(transparent)]
+ FromNotifyFs(#[from] tedge_utils::fs_notify::NotifyStreamError),
}
#[derive(Debug, thiserror::Error)]
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index a16a042d..b9144ea5 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -5,8 +5,9 @@ use mqtt_channel::{
UnboundedSender,
};
use serde_json::json;
-use std::path::PathBuf;
+use std::path::Path;
use std::{process, time::Duration};
+use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent};
use time::OffsetDateTime;
use tracing::{error, info, instrument, warn};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
@@ -86,7 +87,7 @@ impl Mapper {
}
}
- pub(crate) async fn run(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> {
+ pub(crate) async fn run(&mut self, ops_dir: Option<&Path>) -> Result<(), MapperError> {
info!("Running");
self.process_messages(ops_dir).await?;
Ok(())
@@ -102,7 +103,7 @@ impl Mapper {
}
#[instrument(skip(self), name = "messages")]
- async fn process_messages(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> {
+ async fn process_messages(&mut self, ops_dir: Option<&Path>) -> Result<(), MapperError> {
let init_messages = self.converter.init_messages();
for init_message in init_messages.into_iter() {
let _ = self.output.send(init_message).await;
@@ -122,16 +123,7 @@ impl Mapper {
self.process_message(message).await;
}
- match ops_dir {
- // Create inotify steam for capturing the inotify events.
- Some(dir) => {
- process_inotify_and_mqtt_messages(self, dir).await?;
- }
- None => {
- // If there is no operation directory to watch, then continue processing only the mqtt messages
- let _ = process_mqtt_messages(self).await;
- }
- }
+ process_messages(self, ops_dir).await?;
Ok(())
}
@@ -155,28 +147,26 @@ impl Mapper {
}
}
-async fn process_inotify_and_mqtt_messages(
- mapper: &mut Mapper,
- dir: PathBuf,
-) -> Result<(), MqttError> {
- match create_inofity_event_stream(dir.clone()) {
- Ok(mut inotify_events) => loop {
+async fn process_messages(mapper: &mut Mapper, path: Option<&Path>) -> Result<(), MapperError> {
+ if let Some(path) = path {
+ let fs_notification_stream = fs_notify_stream(&[(
+ path,
+ None,
+ &[FileEvent::Created, FileEvent::Deleted, FileEvent::Modified],
+ )])?;
+ pin_mut!(fs_notification_stream); // needed for iteration
+
+ loop {
tokio::select! {
- msg = mapper.input.next() => {
- match msg {
- Some(message) => {
- mapper.process_message(message).await;
- } None => {
- break Ok(());
- }
- }
+ Some(message) = mapper.input.next() => {
+ mapper.process_message(message).await;
}
- Some(event_or_error) = inotify_events.next() => {
+ Some(event_or_error) = fs_notification_stream.next() => {
match event_or_error {
- Ok(event) => {
- match process_inotify_events(dir.clone(), event) {
+ Ok((path, mask)) => {
+ match process_inotify_events(&path, mask) {
Ok(Some(discovered_ops)) => {
- let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await;
+ let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await;
}
Ok(None) => {}
Err(e) => {eprintln!("Processing inotify event failed due to {}", e);}
@@ -185,24 +175,18 @@ async fn process_inotify_and_mqtt_messages(
Err(error) => {
eprintln!("Failed to extract event {}", error);
}
- }
+ } // On error continue to process only mqtt messages.
}
}
- }, // On error continue to process only mqtt messages.
- Err(e) => {
- eprintln!("Failed to create the inotify stream due to {:?}. So, dynamic operation discovery not supported, please restart the mapper on Add/Removal of an operation", e);
- process_mqtt_messages(mapper).await
}
+ } else {
+ while let Some(message) = mapper.input.next().await {
+ mapper.process_message(message).await;
+ }
+ Ok(())
}
}
-async fn process_mqtt_messages(mapper: &mut Mapper) -> Result<(), MqttError> {
- while let Some(message) = mapper.input.next().await {
- mapper.process_message(message).await;
- }
- Ok(())
-}
-
#[cfg(test)]
mod tests {
use super::*;