diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-06-01 21:19:51 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-01 21:19:51 +0530 |
commit | 4f8f5ecddd7e835cbf79faf9c5e060530f86d8ae (patch) | |
tree | 55dd1026354f3e1a65a4e02a10195ee229a50f95 /crates/core/tedge_mapper | |
parent | a19b1e462287186c6cf357e0c8a794fbc2e93195 (diff) |
Dynamic discovery of new operations (#1140)
* Closes #612 discover operations dynamically
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
* select! on async ops
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
* update operations document
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
* move inotify/mqtt message processing to separate fn
* move mqtt process code to a separate fn
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/mapper.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 32 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs | 101 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/error.rs | 3 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 17 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mod.rs | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/converter.rs | 20 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 89 |
10 files changed, 236 insertions, 32 deletions
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index cb7c1c3d..c3ee3aa1 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -37,6 +37,7 @@ csv = "1.1" download = { path = "../../common/download" } flockfile = { path = "../../common/flockfile" } futures = "0.3" +inotify = "0.10.0" logged_command = { path = "../../common/logged_command" } mockall = "0.11" mqtt_channel = { path = "../../common/mqtt_channel" } diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 7f608f08..73ab05a9 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -58,7 +58,7 @@ impl TEdgeComponent for AzureMapper { let mut mapper = create_mapper(AZURE_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; mapper - .run() + .run(None) .instrument(info_span!(AZURE_MAPPER_NAME)) .await?; diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index dc407100..a7468ebe 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -1,3 +1,4 @@ +use crate::c8y::dynamic_discovery::*; use crate::core::{converter::*, error::*, size_threshold::SizeThreshold}; use agent_interface::{ topic::{RequestTopic, ResponseTopic}, @@ -13,7 +14,7 @@ use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric; use c8y_smartrest::{ alarm, error::SmartRestDeserializerError, - operations::Operations, + operations::{get_operation, Operations}, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, smartrest_serializer::{ CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer, @@ -35,6 +36,7 @@ use std::{ use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting}; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; use time::format_description::well_known::Rfc3339; + use tracing::{debug, info, log::error}; use super::{ @@ -68,7 +70,7 @@ where device_name: String, device_type: String, alarm_converter: AlarmConverter, - operations: Operations, + pub operations: Operations, operation_logs: OperationLogs, http_proxy: Proxy, } @@ -349,6 +351,24 @@ where self.alarm_converter = AlarmConverter::Synced; sync_messages } + + fn try_process_operation_update_message( + &mut self, + message: &DiscoverOp, + ) -> Result<Option<Message>, ConversionError> { + match message.event_type { + EventType::ADD => { + let ops_dir = message.ops_dir.clone(); + let op_name = message.operation_name.clone(); + let op = get_operation(ops_dir.join(op_name))?; + self.operations.add_operation(op); + } + EventType::REMOVE => { + self.operations.remove_operation(&message.operation_name); + } + } + Ok(Some(create_supported_operations_fragments_message()?)) + } } async fn parse_c8y_topics( @@ -359,7 +379,7 @@ async fn parse_c8y_topics( ) -> Result<Vec<Message>, ConversionError> { match process_smartrest( message.payload_str()?, - operations, + &operations, http_proxy, operation_logs, ) @@ -701,7 +721,7 @@ async fn process_smartrest( match message_id { "528" => forward_software_request(payload, http_proxy).await, "510" => forward_restart_request(payload), - template => forward_operation_request(payload, template, operations, operation_logs).await, + template => forward_operation_request(payload, template, &operations, operation_logs).await, } } @@ -762,9 +782,7 @@ async fn forward_operation_request( } Ok(vec![]) } - None => Err(CumulocityMapperError::UnknownOperation( - template.to_string(), - )), + None => Ok(vec![]), } } diff --git a/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs new file mode 100644 index 00000000..535f396b --- /dev/null +++ b/crates/core/tedge_mapper/src/c8y/dynamic_discovery.rs @@ -0,0 +1,101 @@ +use std::{ffi::OsString, path::PathBuf}; + +use inotify::{Event, EventMask, Inotify, WatchMask}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub enum EventType { + ADD, + REMOVE, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct DiscoverOp { + pub ops_dir: PathBuf, + pub event_type: EventType, + pub operation_name: String, +} + +#[derive(thiserror::Error, Debug)] +#[allow(clippy::enum_variant_names)] +pub enum DynamicDiscoverOpsError { + #[error("Failed to add watch to directory: {0}")] + FailedtoAddWatch(String), + + #[error("A non-UTF8 name cannot be used as an operation name: {0:?}")] + NotAnOperationName(OsString), + + #[error(transparent)] + EventError(#[from] std::io::Error), +} + +pub fn create_inotify_watch(ops_dir: PathBuf) -> Result<Inotify, DynamicDiscoverOpsError> { + let mut inotify = Inotify::init()?; + inotify + .add_watch(ops_dir.clone(), WatchMask::CLOSE_WRITE | WatchMask::DELETE) + .map_err(|_| { + DynamicDiscoverOpsError::FailedtoAddWatch(ops_dir.to_string_lossy().to_string()) + })?; + Ok(inotify) +} + +pub fn create_inofity_event_stream( + ops_dir: PathBuf, +) -> Result<inotify::EventStream<[u8; 1024]>, DynamicDiscoverOpsError> { + let buffer = [0; 1024]; + let mut ino = create_inotify_watch(ops_dir)?; + Ok(ino.event_stream(buffer)?) +} + +pub fn process_inotify_events( + ops_dir: PathBuf, + event: Event<OsString>, +) -> Result<Option<DiscoverOp>, DynamicDiscoverOpsError> { + if let Some(ops_name) = event.clone().name { + let operation_name = ops_name + .to_str() + .ok_or(DynamicDiscoverOpsError::NotAnOperationName( + ops_name.clone(), + )); + + match operation_name { + Ok(ops_name) => match event.mask { + EventMask::DELETE => { + return Ok(Some(DiscoverOp { + ops_dir, + event_type: EventType::REMOVE, + operation_name: ops_name.to_string(), + })) + } + EventMask::CLOSE_WRITE => { + return Ok(Some(DiscoverOp { + ops_dir, + event_type: EventType::ADD, + operation_name: ops_name.to_string(), + })) + } + _ => return Ok(None), + }, + Err(e) => return Err(e), + } + } + Ok(None) +} + +#[cfg(test)] +#[test] +fn create_inotify_with_non_existing_dir() { + let err = create_inotify_watch("/tmp/discover_ops".into()).unwrap_err(); + assert_eq!( + err.to_string(), + "Failed to add watch to directory: /tmp/discover_ops" + ); +} + +#[test] +fn create_inotify_with_right_directory() { + use tempfile::TempDir; + let dir = TempDir::new().unwrap().into_path(); + let res = create_inotify_watch(dir); + assert!(res.is_ok()); +} diff --git a/crates/core/tedge_mapper/src/c8y/error.rs b/crates/core/tedge_mapper/src/c8y/error.rs index 6a4f0a71..e388084d 100644 --- a/crates/core/tedge_mapper/src/c8y/error.rs +++ b/crates/core/tedge_mapper/src/c8y/error.rs @@ -49,9 +49,6 @@ pub enum CumulocityMapperError { operation_name: String, }, - #[error("An unknown operation template: {0}")] - UnknownOperation(String), - #[error(transparent)] FromOperationLogs(#[from] OperationLogsError), diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 6b5a8b3a..24dfde84 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use crate::{ c8y::converter::CumulocityConverter, @@ -79,11 +79,18 @@ impl TEdgeComponent for CumulocityMapper { http_proxy, )?); - let mut mapper = - create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_host, mqtt_port, converter).await?; + let mut mapper = create_mapper( + CUMULOCITY_MAPPER_NAME, + mqtt_host.clone(), + mqtt_port, + converter, + ) + .await?; + + let ops_dir = PathBuf::from(format!("{}/operations/c8y", &config_dir)); mapper - .run() + .run(Some(ops_dir)) .instrument(info_span!(CUMULOCITY_MAPPER_NAME)) .await?; @@ -210,7 +217,7 @@ mod tests { // run tedge_mapper in background tokio::spawn(async move { mapper - .run() + .run(None) .instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST)) .await .unwrap(); diff --git a/crates/core/tedge_mapper/src/c8y/mod.rs b/crates/core/tedge_mapper/src/c8y/mod.rs index e428b51d..87b994ea 100644 --- a/crates/core/tedge_mapper/src/c8y/mod.rs +++ b/crates/core/tedge_mapper/src/c8y/mod.rs @@ -1,4 +1,5 @@ pub mod converter; +pub mod dynamic_discovery; pub mod error; mod fragments; pub mod mapper; diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 117bdced..8596f594 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -924,7 +924,7 @@ async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Erro .await?; let mapper_task = tokio::spawn(async move { - let _ = mapper.run().await; + let _ = mapper.run(None).await; }); Ok(mapper_task) } diff --git a/crates/core/tedge_mapper/src/core/converter.rs b/crates/core/tedge_mapper/src/core/converter.rs index 32e36b72..3fca735b 100644 --- a/crates/core/tedge_mapper/src/core/converter.rs +++ b/crates/core/tedge_mapper/src/core/converter.rs @@ -1,3 +1,4 @@ +use crate::c8y::dynamic_discovery::DiscoverOp; use async_trait::async_trait; use mqtt_channel::{Message, Topic, TopicFilter}; use std::fmt::Display; @@ -67,6 +68,25 @@ pub trait Converter: Send + Sync { fn sync_messages(&mut self) -> Vec<Message> { vec![] } + + fn try_process_operation_update_message( + &mut self, + _input: &DiscoverOp, + ) -> Result<Option<Message>, Self::Error> { + Ok(None) + } + + fn process_operation_update_message(&mut self, message: DiscoverOp) -> Message { + let message_or_err = self.try_process_operation_update_message(&message); + match message_or_err { + Ok(Some(msg)) => msg, + Ok(None) => Message::new( + &self.get_mapper_config().errors_topic, + "No operation update required", + ), + Err(err) => self.new_error_message(err), + } + } } pub fn make_valid_topic_or_panic(topic_name: &str) -> Topic { diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index fb5713f4..d4f0da23 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -1,16 +1,16 @@ -use std::{process, time::Duration}; - +use crate::c8y::dynamic_discovery::*; use crate::core::{converter::*, error::*}; - use mqtt_channel::{ Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver, UnboundedSender, }; use serde_json::json; +use std::path::PathBuf; +use std::{process, time::Duration}; use time::OffsetDateTime; -use tracing::{error, info, instrument}; - +use tracing::{error, info, instrument, warn}; const SYNC_WINDOW: Duration = Duration::from_secs(3); +use std::result::Result::Ok; pub async fn create_mapper( app_name: &str, @@ -86,9 +86,9 @@ impl Mapper { } } - pub(crate) async fn run(&mut self) -> Result<(), MqttError> { + pub(crate) async fn run(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> { info!("Running"); - self.process_messages().await?; + self.process_messages(ops_dir).await?; Ok(()) } @@ -102,7 +102,7 @@ impl Mapper { } #[instrument(skip(self), name = "messages")] - async fn process_messages(&mut self) -> Result<(), MqttError> { + async fn process_messages(&mut self, ops_dir: Option<PathBuf>) -> Result<(), MqttError> { let init_messages = self.converter.init_messages(); for init_message in init_messages.into_iter() { let _ = self.output.send(init_message).await; @@ -122,11 +122,16 @@ impl Mapper { self.process_message(message).await; } - // Continue processing messages after the sync period - while let Some(message) = self.input.next().await { - self.process_message(message).await; + match ops_dir { + // Create inotify steam for capturing the inotify events. + Some(dir) => { + process_inotify_and_mqtt_messages(self, dir).await?; + } + None => { + // If there is no operation directory to watch, then continue processing only the mqtt messages + let _ = process_mqtt_messages(self).await; + } } - Ok(()) } @@ -142,6 +147,7 @@ impl Mapper { let _ = self.output.send(health_message).await; } else { let converted_messages = self.converter.convert(&message).await; + for converted_message in converted_messages.into_iter() { let _ = self.output.send(converted_message).await; } @@ -149,6 +155,59 @@ impl Mapper { } } +async fn process_inotify_and_mqtt_messages( + mapper: &mut Mapper, + dir: PathBuf, +) -> Result<(), MqttError> { + match create_inofity_event_stream(dir.clone()) { + Ok(mut inotify_events) => loop { + tokio::select! { + msg = mapper.input.next() => { + match msg { + Some(message) => { + mapper.process_message(message).await; + } None => { + break Ok(()); + } + } + } + event = inotify_events.next() => { + match event { + Some(ev) => { + match ev { + Ok(ev_string) => { + + match process_inotify_events(dir.clone(), ev_string) { + Ok(Some(discovered_ops)) => { + let _ = mapper.output.send(mapper.converter.process_operation_update_message(discovered_ops)).await; + } + Ok(None) => {} + Err(e) => {eprintln!("Processing inotify event failed due to {}", e);} + } + + + } Err(e) => {eprintln!("Failed to extract event {}", e);} + } + } + None => {} + } + } + } + }, // On error continue to process only mqtt messages. + Err(e) => { + eprintln!("Failed to create the inotify stream due to {:?}. So, dynamic operation discovery not supported, please restart the mapper on Add/Removal of an operation", e); + process_mqtt_messages(mapper).await + } + } +} + +async fn process_mqtt_messages(mapper: &mut Mapper) -> Result<(), MqttError> { + while let Some(message) = mapper.input.next().await { + mapper.process_message(message).await; + } + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -177,7 +236,7 @@ mod tests { // Let's run the mapper in the background tokio::spawn(async move { - let _ = mapper.run().await; + let _ = mapper.run(None).await; }); sleep(Duration::from_secs(1)).await; @@ -222,7 +281,7 @@ mod tests { // Let's run the mapper in the background tokio::spawn(async move { - let _ = mapper.run().await; + let _ = mapper.run(None).await; }); sleep(Duration::from_secs(1)).await; @@ -294,7 +353,7 @@ mod tests { &self.mapper_config.out_topic, input.to_uppercase(), )]; - Ok(msg) + anyhow::Result::Ok(msg) } else { Err(UppercaseConverter::conversion_error()) } |