summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-06-01 21:19:51 +0530
committerGitHub <noreply@github.com>2022-06-01 21:19:51 +0530
commit4f8f5ecddd7e835cbf79faf9c5e060530f86d8ae (patch)
tree55dd1026354f3e1a65a4e02a10195ee229a50f95 /crates
parenta19b1e462287186c6cf357e0c8a794fbc2e93195 (diff)
Dynamic discovery of new operations (#1140)
* Closes #612 discover operations dynamically Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com> * select! on async ops Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com> * update operations document Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com> * move inotify/mqtt message processing to separate fn * move mqtt process code to a separate fn
Diffstat (limited to 'crates')
-rw-r--r--crates/core/c8y_smartrest/Cargo.toml1
-rw-r--r--crates/core/c8y_smartrest/src/operations.rs43
-rw-r--r--crates/core/tedge_mapper/Cargo.toml1
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs32
-rw-r--r--crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs101
-rw-r--r--crates/core/tedge_mapper/src/c8y/error.rs3
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs17
-rw-r--r--crates/core/tedge_mapper/src/c8y/mod.rs1
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs2
-rw-r--r--crates/core/tedge_mapper/src/core/converter.rs20
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs89
12 files changed, 270 insertions, 42 deletions
diff --git a/crates/core/c8y_smartrest/Cargo.toml b/crates/core/c8y_smartrest/Cargo.toml
index 78d7a9bf..df288f79 100644
--- a/crates/core/c8y_smartrest/Cargo.toml
+++ b/crates/core/c8y_smartrest/Cargo.toml
@@ -18,6 +18,7 @@ thiserror = "1.0"
time = { version = "0.3", features = ["formatting", "macros", "parsing", "serde"] }
tokio = { version = "1.8", features = ["rt", "sync", "time"] }
toml = "0.5"
+tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
anyhow = "1.0"
diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs
index c7bc7c59..5304b8b2 100644
--- a/crates/core/c8y_smartrest/src/operations.rs
+++ b/crates/core/c8y_smartrest/src/operations.rs
@@ -4,9 +4,8 @@ use std::{
path::{Path, PathBuf},
};
-use serde::Deserialize;
-
use crate::error::OperationsError;
+use serde::Deserialize;
/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory
/// Each operation is a file name in one of the subdirectories
@@ -59,14 +58,22 @@ impl Default for Operations {
}
impl Operations {
- pub fn add(&mut self, operation: Operation) {
- if let Some(detail) = operation.exec() {
- if let Some(on_message) = &detail.on_message {
- self.operations_by_trigger
- .insert(on_message.clone(), self.operations.len());
+ pub fn add_operation(&mut self, operation: Operation) {
+ if self.operations.iter().any(|o| o.name.eq(&operation.name)) {
+ return;
+ } else {
+ if let Some(detail) = operation.exec() {
+ if let Some(on_message) = &detail.on_message {
+ self.operations_by_trigger
+ .insert(on_message.clone(), self.operations.len());
+ }
}
+ self.operations.push(operation);
}
- self.operations.push(operation);
+ }
+
+ pub fn remove_operation(&mut self, op_name: &str) {
+ self.operations.retain(|x| x.name.ne(&op_name));
}
pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> {
@@ -121,12 +128,28 @@ fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations,
.and_then(|filename| filename.to_str())
.ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
.to_owned();
-
- operations.add(details);
+ operations.add_operation(details);
}
Ok(operations)
}
+pub fn get_operation(path: PathBuf) -> Result<Operation, OperationsError> {
+ let mut details = match fs::read(&path) {
+ Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
+ .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
+
+ Err(err) => return Err(OperationsError::FromIo(err)),
+ };
+
+ details.name = path
+ .file_name()
+ .and_then(|filename| filename.to_str())
+ .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
+ .to_owned();
+
+ Ok(details)
+}
+
#[cfg(test)]
mod tests {
use std::io::Write;
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index cb7c1c3d..c3ee3aa1 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -37,6 +37,7 @@ csv = "1.1"
download = { path = "../../common/download" }
flockfile = { path = "../../common/flockfile" }
futures = "0.3"
+inotify = "0.10.0"
logged_command = { path = "../../common/logged_command" }
mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index 7f608f08..73ab05a9 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -58,7 +58,7 @@ impl TEdgeComponent for AzureMapper {
let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
mapper
- .run()
+ .run(None)
.instrument(info_span!(AZURE_MAPPER_NAME))
.await?;
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index dc407100..a7468ebe 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -1,3 +1,4 @@
+use crate::c8y::dynamic_discovery::*;
use crate::core::{converter::*, error::*, size_threshold::SizeThreshold};
use agent_interface::{
topic::{RequestTopic, ResponseTopic},
@@ -13,7 +14,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric;
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- operations::Operations,
+ operations::{get_operation, Operations},
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer,
@@ -35,6 +36,7 @@ use std::{
use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
use time::format_description::well_known::Rfc3339;
+
use tracing::{debug, info, log::error};
use super::{
@@ -68,7 +70,7 @@ where
device_name: String,
device_type: String,
alarm_converter: AlarmConverter,
- operations: Operations,
+ pub operations: Operations,
operation_logs: OperationLogs,
http_proxy: Proxy,
}
@@ -349,6 +351,24 @@ where
self.alarm_converter = AlarmConverter::Synced;
sync_messages
}
+
+ fn try_process_operation_update_message(
+ &mut self,
+ message: &DiscoverOp,
+ ) -> Result<Option<Message>, ConversionError> {
+ match message.event_type {
+ EventType::ADD => {
+ let ops_dir = message.ops_dir.clone();
+ let op_name = message.operation_name.clone();
+ let op = get_operation(ops_dir.join(op_name))?;
+ self.operations.add_operation(op);
+ }
+ EventType::REMOVE => {
+ self.operations.remove_operation(&message.operation_name);
+ }
+ }
+ Ok(Some(create_supported_operations_fragments_message()?))
+ }
}
async fn parse_c8y_topics(
@@ -359,7 +379,7 @@ async fn parse_c8y_topics(
) -> Result<Vec<Message>, ConversionError> {
match process_smartrest(
message.payload_str()?,
- operations,
+ &operations,
http_proxy,
operation_logs,
)
@@ -701,7 +721,7 @@ async fn process_smartrest(
match message_id {
"528" => forward_software_request(payload, http_proxy).await,
"510" => forward_restart_request(payload),
- template => forward_operation_request(payload, template, operations, operation_logs).await,
+ template => forward_operation_request(payload, template, &operations, operation_logs).await,
}
}
@@ -762,9 +782,7 @@ async fn forward_operation_request(
}
Ok(vec![])
}
- None => Err(CumulocityMapperError::UnknownOperation(
- template.to_string(),
- )),
+ None => Ok(vec![]),
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
new file mode 100644
index 00000000..535f396b
--- /dev/null
+++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs
@@ -0,0 +1,101 @@
+use std::{ffi::OsString, path::PathBuf};
+
+use inotify::{Event, EventMask, Inotify, WatchMask};
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Deserialize, Debug)]
+pub enum EventType {
+ ADD,
+ REMOVE,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct DiscoverOp {
+ pub ops_dir: PathBuf,
+ pub event_type: EventType,
+ pub operation_name: String,
+}
+
+#[derive(thiserror::Error, Debug)]
+#[allow(clippy::enum_variant_names)]
+pub enum DynamicDiscoverOpsError {
+ #[error("Failed to add watch to directory: {0}")]
+ FailedtoAddWatch(String),
+
+ #[error("A non-UTF8 name cannot be used as an operation name: {0:?}")]
+ NotAnOperationName(OsString),
+
+ #[error(transparent)]
+ EventError(#[from] std::io::Error),
+}
+
+pub fn create_inotify_watch(ops_dir: PathBuf) -> Result<Inotify, DynamicDiscoverOpsError> {
+ let mut inotify = Inotify::init()?;
+ inotify
+ .add_watch(ops_dir.clone(), WatchMask::CLOSE_WRITE | WatchMask::DELETE)
+ .map_err(|_| {
+ DynamicDiscoverOpsError::FailedtoAddWatch(ops_dir.to_string_lossy().to_string())
+ })?;
+ Ok(inotify)
+}
+
+pub fn create_inofity_event_stream(
+ ops_dir: PathBuf,
+) -> Result<inotify::EventStream<[u8; 1024]>, DynamicDiscoverOpsError> {
+ let buffer = [0; 1024];
+ let mut ino = create_inotify_watch(ops_dir)?;
+ Ok(ino.event_stream(buffer)?)
+}
+
+pub fn process_inotify_events(
+ ops_dir: PathBuf,
+ event: Event<OsString>,
+) -> Result<Option<DiscoverOp>, DynamicDiscoverOpsError> {
+ if let Some(ops_name) = event.clone().name {
+ let operation_name = ops_name
+ .to_str()
+ .ok_or(DynamicDiscoverOpsError::NotAnOperationName(
+ ops_name.clone(),
+ ));
+
+ match operation_name {
+ Ok(ops_name) => match event.mask {
+ EventMask::DELETE => {
+ return Ok(Some(DiscoverOp {
+ ops_dir,
+ event_type: EventType::REMOVE,
+ operation_name: ops_name.to_string(),
+ }))
+ }
+ EventMask::CLOSE_WRITE => {
+ return Ok(Some(DiscoverOp {
+ ops_dir,
+ event_type: EventType::ADD,
+ operation_name: ops_name.to_string(),
+ }))
+ }
+ _ => return Ok(None),
+ },
+ Err(e) => return Err(e),
+ }
+ }
+ Ok(None)
+}
+
+#[cfg(test)]
+#[test]
+fn create_inotify_with_non_existing_dir() {
+ let err = create_inotify_watch("/tmp/discover_ops".into()).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Failed to add watch to directory: /tmp/discover_ops"
+ );
+}
+
+#[test]
+fn create_inotify_with_right_directory() {
+ use tempfile::TempDir;
+ let dir = TempDir::new().unwrap().into_path();
+ let res = create_inotify_watch(dir);
+ assert!(res.is_ok());
+}
diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs
index 6a4f0a71..e388084d 100644
--- a/crates/core/tedge_mapper/src/c8y/error.rs
+++ b/crates/core/tedge_mapper/src/c8y/error.rs
@@ -49,9 +49,6 @@ pub enum CumulocityMapperError {
operation_name: String,
},
- #[error("An unknown operation template: {0}")]
- UnknownOperation(String),
-
#[error(transparent)]
FromOperationLogs(#[from] OperationLogsError),
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 6b5a8b3a..24dfde84 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -1,4 +1,4 @@
-use std::path::Path;
+use std::path::{Path, PathBuf};
use crate::{
c8y::converter::CumulocityConverter,
@@ -79,11 +79,18 @@ impl TEdgeComponent for CumulocityMapper {
http_proxy,
)?);
- let mut mapper =
- create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?;
+ let mut mapper = create_mapper(
+ CUMULOCITY_MAPPER_NAME,
+ mqtt_host.clone(),
+ mqtt_port,
+ converter,
+ )
+ .await?;
+
+ let ops_dir = PathBuf::from(format!("{}/operations/c8y", &config_dir));
mapper
- .run()
+ .run(Some(ops_dir))
.instrument(info_span!(CUMULOCITY_MAPPER_NAME))
.await?;
@@ -210,7 +217,7 @@ mod tests {
// run tedge_mapper in background
tokio::spawn(async move {
mapper
- .run()
+ .run(None)
.instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST))
.await
.unwrap();
diff --git a/crates/core/tedge_mapper/src/c8y/mod.rs b/crates/core/tedge_mapper/src/c8y/mod.rs
index e428b51d..87b994ea 100644
--- a/crates/core/tedge_mapper/src/c8y/mod.rs
+++ b/crates/core/tedge_mapper/src/c8y/mod.rs
@@ -1,4 +1,5 @@
pub mod converter;
+pub mod dynamic_discovery;
pub mod error;
mod fragments;
pub mod mapper;
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 117bdced..8596f594 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -924,7 +924,7 @@ async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Erro
.await?;
let mapper_task = tokio::spawn(async move {
- let _ = mapper.run().await;
+ let _ = mapper.run(None).await;
});
Ok(mapper_task)
}
diff --git a/crates/core/tedge_mapper/src/core/converter.rs b/crates/core/tedge_mapper/src/core/converter.rs
index 32e36b72..3fca735b 100644
--- a/crates/core/tedge_mapper/src/core/converter.rs
+++ b/crates/core/tedge_mapper/src/core/converter.rs
@@ -1,3 +1,4 @@
+use crate::c8y::dynamic_discovery::DiscoverOp;
use async_trait::async_trait;
use mqtt_channel::{Message, Topic, TopicFilter};
use std::fmt::Display;
@@ -67,6 +68,25 @@ pub trait Converter: Send + Sync {
fn sync_messages(&mut self) -> Vec<Message> {
vec![]
}
+
+ fn try_process_operation_update_message(
+ &mut self,
+ _input: &DiscoverOp,
+ ) -> Result<Option<Message>, Self::Error> {
+ Ok(None)
+ }
+
+ fn process_operation_update_message(&mut self, message: DiscoverOp) -> Message {
+ let message_or_err = self.try_process_operation_update_message(&message);
+ match message_or_err {
+ Ok(Some(msg)) => msg,
+ Ok(None) => Message::new(
+ &self.get_mapper_config().errors_topic,
+ "No operation update required",
+ ),
+ Err(err) => self.new_error_message(err),
+ }
+ }
}
pub fn make_valid_topic_or_panic(topic_name: &str) -> Topic {
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index fb5713f4..d4f0da23 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -1,16 +1,16 @@
-use std::{process, time::Duration};
-
+use crate::c8y::dynamic_discovery::*;
use crate::core::{converter::*, error::*};
-
use mqtt_channel::{
Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
use serde_json::json;
+use std::path::PathBuf;
+use std::{process, time::Duration};
use time::OffsetDateTime;
-use tracing::{error, info, instrument};
-
+use tracing::{error, info, instrument, warn};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
+use std::result::Result::Ok;
pub async fn create_mapper(
app_name: &str,
@@ -86,9 +86,9 @@ impl Mapper {
}
}
- pub(crate) async fn run(&mut self) -> Result<(), MqttError> {
+ pub(crate) async fn run(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> {
info!("Running");
- self.process_messages().await?;
+ self.process_messages(ops_dir).await?;
Ok(())
}
@@ -102,7 +102,7 @@ impl Mapper {
}
#[instrument(skip(self), name = "messages")]
- async fn process_messages(&mut self) -> Result<(), MqttError> {
+ async fn process_messages(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> {
let init_messages = self.converter.init_messages();
for init_message in init_messages.into_iter() {
let _ = self.output.send(init_message).await;
@@ -122,11 +122,16 @@ impl Mapper {
self.process_message(message).await;
}
- // Continue processing messages after the sync period
- while let Some(message) = self.input.next().await {
- self.process_message(message).await;
+ match ops_dir {
+ // Create inotify steam for capturing the inotify events.
+ Some(dir) => {
+ process_inotify_and_mqtt_messages(self, dir).await?;
+ }
+ None => {
+ // If there is no operation directory to watch, then continue processing only the mqtt messages
+ let _ = process_mqtt_messages(self).await;
+ }
}
-
Ok(())
}
@@ -142,6 +147,7 @@ impl Mapper {
let _ = self.output.send(health_message).await;
} else {
let converted_messages = self.converter.convert(&message).await;
+
for converted_message in converted_messages.into_iter() {
let _ = self.output.send(converted_message).await;
}
@@ -149,6 +155,59 @@ impl Mapper {
}
}
+async fn process_inotify_and_mqtt_messages(
+ mapper: &mut Mapper,
+ dir: PathBuf,
+) -> Result<(), MqttError> {
+ match create_inofity_event_stream(dir.clone()) {
+ Ok(mut inotify_events) => loop {
+ tokio::select! {
+ msg = mapper.input.next() => {
+ match msg {
+ Some(message) => {
+ mapper.process_message(message).await;
+ } None => {
+ break Ok(());
+ }
+ }
+ }
+ event = inotify_events.next() => {
+ match event {
+ Some(ev) => {
+ match ev {
+ Ok(ev_string) => {
+
+ match process_inotify_events(dir.clone(), ev_string) {
+ Ok(Some(discovered_ops)) => {
+ let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await;
+ }
+ Ok(None) => {}
+ Err(e) => {eprintln!("Processing inotify event failed due to {}", e);}
+ }
+
+
+ } Err(e) => {eprintln!("Failed to extract event {}", e);}
+ }
+ }
+ None => {}
+ }
+ }
+ }
+ }, // On error continue to process only mqtt messages.
+ Err(e) => {
+ eprintln!("Failed to create the inotify stream due to {:?}. So, dynamic operation discovery not supported, please restart the mapper on Add/Removal of an operation", e);
+ process_mqtt_messages(mapper).await
+ }
+ }
+}
+
+async fn process_mqtt_messages(mapper: &mut Mapper) -> Result<(), MqttError> {
+ while let Some(message) = mapper.input.next().await {
+ mapper.process_message(message).await;
+ }
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -177,7 +236,7 @@ mod tests {
// Let's run the mapper in the background
tokio::spawn(async move {
- let _ = mapper.run().await;
+ let _ = mapper.run(None).await;
});
sleep(Duration::from_secs(1)).await;
@@ -222,7 +281,7 @@ mod tests {
// Let's run the mapper in the background
tokio::spawn(async move {
- let _ = mapper.run().await;
+ let _ = mapper.run(None).await;
});
sleep(Duration::from_secs(1)).await;
@@ -294,7 +353,7 @@ mod tests {
&self.mapper_config.out_topic,
input.to_uppercase(),
)];
- Ok(msg)
+ anyhow::Result::Ok(msg)
} else {
Err(UppercaseConverter::conversion_error())
}