diff options
author | Lukasz Woznicki <75632179+makr11st@users.noreply.github.com> | 2021-12-07 21:42:25 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-07 21:42:25 +0000 |
commit | c2d8195b6a779752ae16628fa0a06d040066bc1c (patch) | |
tree | a6001216a3652ae149d0c1ae244712e317a792d1 /crates/core/tedge_mapper | |
parent | 13c6cd0195bbcf0ebc917d02d8dea4bf744635a7 (diff) |
#596 Declare supported operations (#652)
* Add operations to tedge_mapper
* Add operations directory on install, remove supported ops from sm-c8y mapper
* Update install command in postinst script to correctly create ops files
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_converter.rs | 17 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/converter.rs | 24 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/error.rs | 12 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapper.rs | 27 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/operations.rs | 241 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 11 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 24 |
8 files changed, 314 insertions, 43 deletions
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs index f8b9b6c3..a0d7bf6d 100644 --- a/crates/core/tedge_mapper/src/c8y_converter.rs +++ b/crates/core/tedge_mapper/src/c8y_converter.rs @@ -1,6 +1,7 @@ -use crate::converter::*; use crate::error::*; use crate::size_threshold::SizeThreshold; +use crate::{converter::*, operations::Operations}; +use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations}; use c8y_translator::json; use mqtt_client::{Message, Topic}; use std::collections::HashSet; @@ -77,6 +78,20 @@ impl Converter for CumulocityConverter { } Ok(vec) } + + fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> { + let ops = Operations::try_new("/etc/tedge/operations")?; + let ops = ops.get_operations_list("c8y"); + + if !ops.is_empty() { + let ops_msg = SmartRestSetSupportedOperations::new(&ops); + let topic = Topic::new_unchecked("c8y/s/us"); + let msg = Message::new(&topic, ops_msg.to_smartrest()?); + Ok(vec![msg]) + } else { + Ok(Vec::new()) + } + } } fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> { diff --git a/crates/core/tedge_mapper/src/converter.rs b/crates/core/tedge_mapper/src/converter.rs index e2709f8b..3c9abd2e 100644 --- a/crates/core/tedge_mapper/src/converter.rs +++ b/crates/core/tedge_mapper/src/converter.rs @@ -21,7 +21,29 @@ pub trait Converter: Send + Sync { fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error>; fn convert(&mut self, input: &Message) -> Vec<Message> { - match self.try_convert(input) { + let messages_or_err = self.try_convert(input); + self.wrap_error(messages_or_err) + } + + fn wrap_error(&self, messages_or_err: Result<Vec<Message>, Self::Error>) -> Vec<Message> { + match messages_or_err { + Ok(messages) => messages, + Err(error) => { + error!("Mapping error: {}", error); + vec![Message::new( + &self.get_mapper_config().errors_topic, + error.to_string(), + )] + } + } + } + + fn try_init_messages(&self) -> Result<Vec<Message>, Self::Error> { + Ok(vec![]) + } + + fn init_messages(&self) -> Vec<Message> { + match self.try_init_messages() { Ok(messages) => messages, Err(error) => { error!("Mapping error: {}", error); diff --git a/crates/core/tedge_mapper/src/error.rs b/crates/core/tedge_mapper/src/error.rs index 0f6f0dad..21ec16df 100644 --- a/crates/core/tedge_mapper/src/error.rs +++ b/crates/core/tedge_mapper/src/error.rs @@ -43,4 +43,16 @@ pub enum ConversionError { #[error(transparent)] FromMqttClient(#[from] MqttClientError), + + #[error(transparent)] + FromOperationsError(#[from] OperationsError), + + #[error(transparent)] + FromSmartRestSerializerError(#[from] c8y_smartrest::error::SmartRestSerializerError), +} + +#[derive(Debug, thiserror::Error)] +pub enum OperationsError { + #[error(transparent)] + FromIo(#[from] std::io::Error), } diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index 947dde03..d3324ef7 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -16,6 +16,7 @@ mod component; mod converter; mod error; mod mapper; +mod operations; mod size_threshold; mod sm_c8y_mapper; diff --git a/crates/core/tedge_mapper/src/mapper.rs b/crates/core/tedge_mapper/src/mapper.rs index f2d686e4..2f90e7bb 100644 --- a/crates/core/tedge_mapper/src/mapper.rs +++ b/crates/core/tedge_mapper/src/mapper.rs @@ -35,17 +35,6 @@ pub struct Mapper { } impl Mapper { - pub(crate) async fn run(&mut self) -> Result<(), MqttClientError> { - info!("Running"); - let errors_handle = self.subscribe_errors(); - let messages_handle = self.subscribe_messages(); - messages_handle.await?; - errors_handle - .await - .map_err(|_| MqttClientError::JoinError)?; - Ok(()) - } - pub fn new( client: mqtt_client::Client, converter: Box<dyn Converter<Error = ConversionError>>, @@ -58,6 +47,17 @@ impl Mapper { } } + pub(crate) async fn run(&mut self) -> Result<(), MqttClientError> { + info!("Running"); + let errors_handle = self.subscribe_errors(); + let messages_handle = self.subscribe_messages(); + messages_handle.await?; + errors_handle + .await + .map_err(|_| MqttClientError::JoinError)?; + Ok(()) + } + #[instrument(skip(self), name = "errors")] fn subscribe_errors(&self) -> JoinHandle<()> { let mut errors = self.client.subscribe_errors(); @@ -70,6 +70,11 @@ impl Mapper { #[instrument(skip(self), name = "messages")] async fn subscribe_messages(&mut self) -> Result<(), MqttClientError> { + let init_messages = self.converter.init_messages(); + for init_message in init_messages.into_iter() { + self.client.publish(init_message).await? + } + let mut messages = self .client .subscribe(self.converter.get_in_topic_filter().clone()) diff --git a/crates/core/tedge_mapper/src/operations.rs b/crates/core/tedge_mapper/src/operations.rs new file mode 100644 index 00000000..b4dbe2a7 --- /dev/null +++ b/crates/core/tedge_mapper/src/operations.rs @@ -0,0 +1,241 @@ +use std::{ + collections::{HashMap, HashSet}, + fs, + path::{Path, PathBuf}, +}; + +use crate::error::OperationsError; + +/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory +/// Each operation is a file name in one of the subdirectories +/// The file name is the operation name + +type Cloud = String; +type OperationName = String; +type Operation = HashSet<OperationName>; +type OperationsMap = HashMap<Cloud, Operation>; + +#[derive(Debug, Clone, PartialEq)] +pub struct Operations { + cloud: PathBuf, + operations: OperationsMap, +} + +impl Operations { + pub fn try_new(dir: impl AsRef<Path>) -> Result<Self, OperationsError> { + let operations = get_operations(dir.as_ref())?; + + Ok(Self { + cloud: dir.as_ref().to_path_buf(), + operations, + }) + } + + pub fn get_operations_list(&self, cloud: &str) -> Vec<&str> { + self.operations + .get(cloud) + .map(|operations| operations.iter().map(|k| k.as_str()).collect()) + .unwrap_or_default() + } +} + +fn get_clouds(dir: impl AsRef<Path>) -> Result<Vec<String>, OperationsError> { + Ok(fs::read_dir(dir)? + .map(|entry| entry.map(|e| e.path())) + .collect::<Result<Vec<PathBuf>, _>>()? + .into_iter() + .filter(|path| path.is_dir()) + .map(|path| { + let filename = path.file_name(); + filename.unwrap().to_str().unwrap().to_string() + }) + .collect()) +} + +fn get_operations(dir: impl AsRef<Path>) -> Result<OperationsMap, OperationsError> { + let mut operations = OperationsMap::new(); + for cloud in get_clouds(&dir)? { + let path = dir.as_ref().join(cloud.as_str()); + let operations_map = fs::read_dir(&path)? + .map(|entry| entry.map(|e| e.path())) + .collect::<Result<Vec<PathBuf>, _>>()? + .into_iter() + .filter(|path| path.is_file()) + .map(|path| { + let filename = path.file_name(); + filename.unwrap().to_str().unwrap().to_string() + }) + .collect(); + operations.insert(cloud, operations_map); + } + Ok(operations) +} + +#[cfg(test)] +mod tests { + use super::*; + use test_case::test_case; + + #[test_case(0, false)] + #[test_case(0, true)] + #[test_case(2, false)] + #[test_case(2, true)] + fn get_clouds_tests(clouds_count: usize, files: bool) { + let operations = TestOperations::builder().with_clouds(clouds_count); + + if files { + operations.with_random_file_in_clouds_directory(); + } + + let operations = operations.build(); + + let clouds = get_clouds(operations.temp_dir()).unwrap(); + + assert_eq!(clouds.len(), clouds_count); + } + + #[test_case(0, 0)] + #[test_case(1, 1)] + #[test_case(1, 5)] + #[test_case(2, 5)] + fn get_operations_all(clouds_count: usize, ops_count: usize) { + let test_operations = TestOperations::builder() + .with_clouds(clouds_count) + .with_operations(ops_count) + .build(); + + let operations = get_operations(test_operations.temp_dir()).unwrap(); + + assert_eq!(operations.len(), clouds_count); + assert_eq!( + operations.values().map(|ops| ops.len()).sum::<usize>(), + ops_count * clouds_count + ); + } + + // Structs for state change with the builder pattern + // Structs for Clouds + struct Clouds(Vec<PathBuf>); + struct NoClouds; + + // Structs for Operations + struct Ops(Vec<PathBuf>); + struct NoOps; + + struct TestOperationsBuilder<C, O> { + temp_dir: tempfile::TempDir, + clouds: C, + operations: O, + } + + impl TestOperationsBuilder<NoClouds, NoOps> { + fn new() -> Self { + Self { + temp_dir: tempfile::tempdir().unwrap(), + clouds: NoClouds, + operations: NoOps, + } + } + } + + impl<O> TestOperationsBuilder<NoClouds, O> { + fn with_clouds(self, clouds_count: usize) -> TestOperationsBuilder<Clouds, O> { + let Self { + temp_dir, + operations, + .. + } = self; + + let mut clouds = Vec::new(); + for i in 0..clouds_count { + let cloud = temp_dir.as_ref().join(format!("cloud{}", i)); + fs::create_dir(&cloud).unwrap(); + clouds.push(cloud); + } + + TestOperationsBuilder { + temp_dir, + clouds: Clouds(clouds), + operations, + } + } + } + + impl TestOperationsBuilder<Clouds, NoOps> { + fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Clouds, Ops> { + let Self { + temp_dir, clouds, .. + } = self; + + let mut operations = Vec::new(); + clouds.0.iter().for_each(|path| { + for i in 0..operations_count { + let file_path = path.join(format!("operation{}", i)); + fs::File::create(&file_path).unwrap(); + operations.push(file_path); + } + }); + + TestOperationsBuilder { + operations: Ops(operations), + temp_dir, + clouds, + } + } + + fn build(self) -> TestOperations { + let Self { + temp_dir, clouds, .. + } = self; + + TestOperations { + temp_dir, + clouds: clouds.0, + operations: Vec::new(), + } + } + } + + impl<C, O> TestOperationsBuilder<C, O> { + fn with_random_file_in_clouds_directory(&self) { + let path = self.temp_dir.as_ref().join("cloudfile"); + fs::File::create(path).unwrap(); + } + } + + impl TestOperationsBuilder<Clouds, Ops> { + fn build(self) -> TestOperations { + let Self { + temp_dir, + clouds, + operations, + } = self; + + TestOperations { + temp_dir, + clouds: clouds.0, + operations: operations.0, + } + } + } + + struct TestOperations { + temp_dir: tempfile::TempDir, + clouds: Vec<PathBuf>, + operations: Vec<PathBuf>, + } + + impl TestOperations { + fn builder() -> TestOperationsBuilder<NoClouds, NoOps> { + TestOperationsBuilder::new() + } + + fn temp_dir(&self) -> &tempfile::TempDir { + &self.temp_dir + } + + fn operations(&self) -> &Vec<PathBuf> { + &self.operations + } + } +} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index 3c064e90..989718d1 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -11,7 +11,7 @@ use c8y_smartrest::{ smartrest_serializer::{ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, - SmartRestSetSupportedLogType, SmartRestSetSupportedOperations, + SmartRestSetSupportedLogType, }, }; use chrono::{DateTime, FixedOffset}; @@ -88,7 +88,6 @@ where let () = self.http_proxy.init().await?; info!("Running"); - let () = self.publish_supported_operations().await?; let () = self.publish_supported_log_types().await?; let () = self.publish_get_pending_operations().await?; let () = self.ask_software_list().await?; @@ -211,14 +210,6 @@ where Ok(()) } - async fn publish_supported_operations(&self) -> Result<(), SMCumulocityMapperError> { - let data = SmartRestSetSupportedOperations::default(); - let topic = OutgoingTopic::SmartRestResponse.to_topic()?; - let payload = data.to_smartrest()?; - let () = self.publish(&topic, payload).await?; - Ok(()) - } - async fn publish_get_pending_operations(&self) -> Result<(), SMCumulocityMapperError> { let data = SmartRestGetPendingOperations::default(); let topic = OutgoingTopic::SmartRestResponse.to_topic()?; diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs index 7f471ef2..14cb2500 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs @@ -52,11 +52,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8 mqtt_tests::assert_received( &mut messages, TEST_TIMEOUT_MS, - vec![ - "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n", - "118,software-management\n", - "500\n", - ], + vec!["118,software-management\n", "500\n"], ) .await; sm_mapper.unwrap().abort(); @@ -119,11 +115,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() { mqtt_tests::assert_received( &mut messages, TEST_TIMEOUT_MS, - vec![ - "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n", - "118,software-management\n", - "500\n", - ], + vec!["118,software-management\n", "500\n"], ) .await; @@ -184,11 +176,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { mqtt_tests::assert_received( &mut messages, TEST_TIMEOUT_MS, - vec![ - "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n", - "118,software-management\n", - "500\n", - ], + vec!["118,software-management\n", "500\n"], ) .await; @@ -368,11 +356,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { mqtt_tests::assert_received( &mut messages, TEST_TIMEOUT_MS, - vec![ - "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n", - "118,software-management\n", - "500\n", - ], + vec!["118,software-management\n", "500\n"], ) .await; |