summaryrefslogtreecommitdiffstats
path: root/crates
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2021-12-07 21:42:25 +0000
committerGitHub <noreply@github.com>2021-12-07 21:42:25 +0000
commitc2d8195b6a779752ae16628fa0a06d040066bc1c (patch)
treea6001216a3652ae149d0c1ae244712e317a792d1 /crates
parent13c6cd0195bbcf0ebc917d02d8dea4bf744635a7 (diff)
#596 Declare supported operations (#652)
* Add operations to tedge_mapper * Add operations directory on install, remove supported ops from sm-c8y mapper * Update install command in postinst script to correctly create ops files Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates')
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs35
-rw-r--r--crates/core/tedge_mapper/src/c8y_converter.rs17
-rw-r--r--crates/core/tedge_mapper/src/converter.rs24
-rw-r--r--crates/core/tedge_mapper/src/error.rs12
-rw-r--r--crates/core/tedge_mapper/src/main.rs1
-rw-r--r--crates/core/tedge_mapper/src/mapper.rs27
-rw-r--r--crates/core/tedge_mapper/src/operations.rs241
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs11
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs24
9 files changed, 330 insertions, 62 deletions
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index ce48c1a3..e7934d7f 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -41,7 +41,7 @@ impl Default for SmartRestSetSupportedLogType {
fn default() -> Self {
Self {
message_id: "118",
- supported_operations: vec!["software-management".into()],
+ supported_operations: vec!["software-management"],
}
}
}
@@ -49,25 +49,25 @@ impl Default for SmartRestSetSupportedLogType {
impl<'a> SmartRestSerializer<'a> for SmartRestSetSupportedLogType {}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
-pub struct SmartRestSetSupportedOperations {
+pub struct SmartRestSetSupportedOperations<'a> {
pub message_id: &'static str,
- pub supported_operations: Vec<&'static str>,
+ pub supported_operations: Vec<&'a str>,
}
-impl Default for SmartRestSetSupportedOperations {
- fn default() -> Self {
+impl<'a> SmartRestSetSupportedOperations<'a> {
+ pub fn new(supported_operations: &[&'a str]) -> Self {
Self {
message_id: "114",
- supported_operations: vec![
- CumulocitySupportedOperations::C8ySoftwareUpdate.into(),
- CumulocitySupportedOperations::C8yLogFileRequest.into(),
- CumulocitySupportedOperations::C8yRestartRequest.into(),
- ],
+ supported_operations: supported_operations.into(),
}
}
+
+ pub fn add_operation(&mut self, operation: &'a str) {
+ self.supported_operations.push(operation);
+ }
}
-impl<'a> SmartRestSerializer<'a> for SmartRestSetSupportedOperations {}
+impl<'a> SmartRestSerializer<'a> for SmartRestSetSupportedOperations<'a> {}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
pub struct SmartRestSoftwareModuleItem {
@@ -211,15 +211,12 @@ mod tests {
use json_sm::*;
#[test]
- // NOTE: this test always needs changing when a new operation is added
fn serialize_smartrest_supported_operations() {
- let smartrest = SmartRestSetSupportedOperations::default()
- .to_smartrest()
- .unwrap();
- assert_eq!(
- smartrest,
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n"
- );
+ let smartrest =
+ SmartRestSetSupportedOperations::new(&["c8y_SoftwareUpdate", "c8y_LogfileRequest"])
+ .to_smartrest()
+ .unwrap();
+ assert_eq!(smartrest, "114,c8y_SoftwareUpdate,c8y_LogfileRequest\n");
}
#[test]
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs
index f8b9b6c3..a0d7bf6d 100644
--- a/crates/core/tedge_mapper/src/c8y_converter.rs
+++ b/crates/core/tedge_mapper/src/c8y_converter.rs
@@ -1,6 +1,7 @@
-use crate::converter::*;
use crate::error::*;
use crate::size_threshold::SizeThreshold;
+use crate::{converter::*, operations::Operations};
+use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations};
use c8y_translator::json;
use mqtt_client::{Message, Topic};
use std::collections::HashSet;
@@ -77,6 +78,20 @@ impl Converter for CumulocityConverter {
}
Ok(vec)
}
+
+ fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> {
+ let ops = Operations::try_new("/etc/tedge/operations")?;
+ let ops = ops.get_operations_list("c8y");
+
+ if !ops.is_empty() {
+ let ops_msg = SmartRestSetSupportedOperations::new(&ops);
+ let topic = Topic::new_unchecked("c8y/s/us");
+ let msg = Message::new(&topic, ops_msg.to_smartrest()?);
+ Ok(vec![msg])
+ } else {
+ Ok(Vec::new())
+ }
+ }
}
fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> {
diff --git a/crates/core/tedge_mapper/src/converter.rs b/crates/core/tedge_mapper/src/converter.rs
index e2709f8b..3c9abd2e 100644
--- a/crates/core/tedge_mapper/src/converter.rs
+++ b/crates/core/tedge_mapper/src/converter.rs
@@ -21,7 +21,29 @@ pub trait Converter: Send + Sync {
fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error>;
fn convert(&mut self, input: &Message) -> Vec<Message> {
- match self.try_convert(input) {
+ let messages_or_err = self.try_convert(input);
+ self.wrap_error(messages_or_err)
+ }
+
+ fn wrap_error(&self, messages_or_err: Result<Vec<Message>, Self::Error>) -> Vec<Message> {
+ match messages_or_err {
+ Ok(messages) => messages,
+ Err(error) => {
+ error!("Mapping error: {}", error);
+ vec![Message::new(
+ &self.get_mapper_config().errors_topic,
+ error.to_string(),
+ )]
+ }
+ }
+ }
+
+ fn try_init_messages(&self) -> Result<Vec<Message>, Self::Error> {
+ Ok(vec![])
+ }
+
+ fn init_messages(&self) -> Vec<Message> {
+ match self.try_init_messages() {
Ok(messages) => messages,
Err(error) => {
error!("Mapping error: {}", error);
diff --git a/crates/core/tedge_mapper/src/error.rs b/crates/core/tedge_mapper/src/error.rs
index 0f6f0dad..21ec16df 100644
--- a/crates/core/tedge_mapper/src/error.rs
+++ b/crates/core/tedge_mapper/src/error.rs
@@ -43,4 +43,16 @@ pub enum ConversionError {
#[error(transparent)]
FromMqttClient(#[from] MqttClientError),
+
+ #[error(transparent)]
+ FromOperationsError(#[from] OperationsError),
+
+ #[error(transparent)]
+ FromSmartRestSerializerError(#[from] c8y_smartrest::error::SmartRestSerializerError),
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum OperationsError {
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
}
diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs
index 947dde03..d3324ef7 100644
--- a/crates/core/tedge_mapper/src/main.rs
+++ b/crates/core/tedge_mapper/src/main.rs
@@ -16,6 +16,7 @@ mod component;
mod converter;
mod error;
mod mapper;
+mod operations;
mod size_threshold;
mod sm_c8y_mapper;
diff --git a/crates/core/tedge_mapper/src/mapper.rs b/crates/core/tedge_mapper/src/mapper.rs
index f2d686e4..2f90e7bb 100644
--- a/crates/core/tedge_mapper/src/mapper.rs
+++ b/crates/core/tedge_mapper/src/mapper.rs
@@ -35,17 +35,6 @@ pub struct Mapper {
}
impl Mapper {
- pub(crate) async fn run(&mut self) -> Result<(), MqttClientError> {
- info!("Running");
- let errors_handle = self.subscribe_errors();
- let messages_handle = self.subscribe_messages();
- messages_handle.await?;
- errors_handle
- .await
- .map_err(|_| MqttClientError::JoinError)?;
- Ok(())
- }
-
pub fn new(
client: mqtt_client::Client,
converter: Box<dyn Converter<Error = ConversionError>>,
@@ -58,6 +47,17 @@ impl Mapper {
}
}
+ pub(crate) async fn run(&mut self) -> Result<(), MqttClientError> {
+ info!("Running");
+ let errors_handle = self.subscribe_errors();
+ let messages_handle = self.subscribe_messages();
+ messages_handle.await?;
+ errors_handle
+ .await
+ .map_err(|_| MqttClientError::JoinError)?;
+ Ok(())
+ }
+
#[instrument(skip(self), name = "errors")]
fn subscribe_errors(&self) -> JoinHandle<()> {
let mut errors = self.client.subscribe_errors();
@@ -70,6 +70,11 @@ impl Mapper {
#[instrument(skip(self), name = "messages")]
async fn subscribe_messages(&mut self) -> Result<(), MqttClientError> {
+ let init_messages = self.converter.init_messages();
+ for init_message in init_messages.into_iter() {
+ self.client.publish(init_message).await?
+ }
+
let mut messages = self
.client
.subscribe(self.converter.get_in_topic_filter().clone())
diff --git a/crates/core/tedge_mapper/src/operations.rs b/crates/core/tedge_mapper/src/operations.rs
new file mode 100644
index 00000000..b4dbe2a7
--- /dev/null
+++ b/crates/core/tedge_mapper/src/operations.rs
@@ -0,0 +1,241 @@
+use std::{
+ collections::{HashMap, HashSet},
+ fs,
+ path::{Path, PathBuf},
+};
+
+use crate::error::OperationsError;
+
+/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory
+/// Each operation is a file name in one of the subdirectories
+/// The file name is the operation name
+
+type Cloud = String;
+type OperationName = String;
+type Operation = HashSet<OperationName>;
+type OperationsMap = HashMap<Cloud, Operation>;
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Operations {
+ cloud: PathBuf,
+ operations: OperationsMap,
+}
+
+impl Operations {
+ pub fn try_new(dir: impl AsRef<Path>) -> Result<Self, OperationsError> {
+ let operations = get_operations(dir.as_ref())?;
+
+ Ok(Self {
+ cloud: dir.as_ref().to_path_buf(),
+ operations,
+ })
+ }
+
+ pub fn get_operations_list(&self, cloud: &str) -> Vec<&str> {
+ self.operations
+ .get(cloud)
+ .map(|operations| operations.iter().map(|k| k.as_str()).collect())
+ .unwrap_or_default()
+ }
+}
+
+fn get_clouds(dir: impl AsRef<Path>) -> Result<Vec<String>, OperationsError> {
+ Ok(fs::read_dir(dir)?
+ .map(|entry| entry.map(|e| e.path()))
+ .collect::<Result<Vec<PathBuf>, _>>()?
+ .into_iter()
+ .filter(|path| path.is_dir())
+ .map(|path| {
+ let filename = path.file_name();
+ filename.unwrap().to_str().unwrap().to_string()
+ })
+ .collect())
+}
+
+fn get_operations(dir: impl AsRef<Path>) -> Result<OperationsMap, OperationsError> {
+ let mut operations = OperationsMap::new();
+ for cloud in get_clouds(&dir)? {
+ let path = dir.as_ref().join(cloud.as_str());
+ let operations_map = fs::read_dir(&path)?
+ .map(|entry| entry.map(|e| e.path()))
+ .collect::<Result<Vec<PathBuf>, _>>()?
+ .into_iter()
+ .filter(|path| path.is_file())
+ .map(|path| {
+ let filename = path.file_name();
+ filename.unwrap().to_str().unwrap().to_string()
+ })
+ .collect();
+ operations.insert(cloud, operations_map);
+ }
+ Ok(operations)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use test_case::test_case;
+
+ #[test_case(0, false)]
+ #[test_case(0, true)]
+ #[test_case(2, false)]
+ #[test_case(2, true)]
+ fn get_clouds_tests(clouds_count: usize, files: bool) {
+ let operations = TestOperations::builder().with_clouds(clouds_count);
+
+ if files {
+ operations.with_random_file_in_clouds_directory();
+ }
+
+ let operations = operations.build();
+
+ let clouds = get_clouds(operations.temp_dir()).unwrap();
+
+ assert_eq!(clouds.len(), clouds_count);
+ }
+
+ #[test_case(0, 0)]
+ #[test_case(1, 1)]
+ #[test_case(1, 5)]
+ #[test_case(2, 5)]
+ fn get_operations_all(clouds_count: usize, ops_count: usize) {
+ let test_operations = TestOperations::builder()
+ .with_clouds(clouds_count)
+ .with_operations(ops_count)
+ .build();
+
+ let operations = get_operations(test_operations.temp_dir()).unwrap();
+
+ assert_eq!(operations.len(), clouds_count);
+ assert_eq!(
+ operations.values().map(|ops| ops.len()).sum::<usize>(),
+ ops_count * clouds_count
+ );
+ }
+
+ // Structs for state change with the builder pattern
+ // Structs for Clouds
+ struct Clouds(Vec<PathBuf>);
+ struct NoClouds;
+
+ // Structs for Operations
+ struct Ops(Vec<PathBuf>);
+ struct NoOps;
+
+ struct TestOperationsBuilder<C, O> {
+ temp_dir: tempfile::TempDir,
+ clouds: C,
+ operations: O,
+ }
+
+ impl TestOperationsBuilder<NoClouds, NoOps> {
+ fn new() -> Self {
+ Self {
+ temp_dir: tempfile::tempdir().unwrap(),
+ clouds: NoClouds,
+ operations: NoOps,
+ }
+ }
+ }
+
+ impl<O> TestOperationsBuilder<NoClouds, O> {
+ fn with_clouds(self, clouds_count: usize) -> TestOperationsBuilder<Clouds, O> {
+ let Self {
+ temp_dir,
+ operations,
+ ..
+ } = self;
+
+ let mut clouds = Vec::new();
+ for i in 0..clouds_count {
+ let cloud = temp_dir.as_ref().join(format!("cloud{}", i));
+ fs::create_dir(&cloud).unwrap();
+ clouds.push(cloud);
+ }
+
+ TestOperationsBuilder {
+ temp_dir,
+ clouds: Clouds(clouds),
+ operations,
+ }
+ }
+ }
+
+ impl TestOperationsBuilder<Clouds, NoOps> {
+ fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Clouds, Ops> {
+ let Self {
+ temp_dir, clouds, ..
+ } = self;
+
+ let mut operations = Vec::new();
+ clouds.0.iter().for_each(|path| {
+ for i in 0..operations_count {
+ let file_path = path.join(format!("operation{}", i));
+ fs::File::create(&file_path).unwrap();
+ operations.push(file_path);
+ }
+ });
+
+ TestOperationsBuilder {
+ operations: Ops(operations),
+ temp_dir,
+ clouds,
+ }
+ }
+
+ fn build(self) -> TestOperations {
+ let Self {
+ temp_dir, clouds, ..
+ } = self;
+
+ TestOperations {
+ temp_dir,
+ clouds: clouds.0,
+ operations: Vec::new(),
+ }
+ }
+ }
+
+ impl<C, O> TestOperationsBuilder<C, O> {
+ fn with_random_file_in_clouds_directory(&self) {
+ let path = self.temp_dir.as_ref().join("cloudfile");
+ fs::File::create(path).unwrap();
+ }
+ }
+
+ impl TestOperationsBuilder<Clouds, Ops> {
+ fn build(self) -> TestOperations {
+ let Self {
+ temp_dir,
+ clouds,
+ operations,
+ } = self;
+
+ TestOperations {
+ temp_dir,
+ clouds: clouds.0,
+ operations: operations.0,
+ }
+ }
+ }
+
+ struct TestOperations {
+ temp_dir: tempfile::TempDir,
+ clouds: Vec<PathBuf>,
+ operations: Vec<PathBuf>,
+ }
+
+ impl TestOperations {
+ fn builder() -> TestOperationsBuilder<NoClouds, NoOps> {
+ TestOperationsBuilder::new()
+ }
+
+ fn temp_dir(&self) -> &tempfile::TempDir {
+ &self.temp_dir
+ }
+
+ fn operations(&self) -> &Vec<PathBuf> {
+ &self.operations
+ }
+ }
+}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index 3c064e90..989718d1 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -11,7 +11,7 @@ use c8y_smartrest::{
smartrest_serializer::{
SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
- SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
+ SmartRestSetSupportedLogType,
},
};
use chrono::{DateTime, FixedOffset};
@@ -88,7 +88,6 @@ where
let () = self.http_proxy.init().await?;
info!("Running");
- let () = self.publish_supported_operations().await?;
let () = self.publish_supported_log_types().await?;
let () = self.publish_get_pending_operations().await?;
let () = self.ask_software_list().await?;
@@ -211,14 +210,6 @@ where
Ok(())
}
- async fn publish_supported_operations(&self) -> Result<(), SMCumulocityMapperError> {
- let data = SmartRestSetSupportedOperations::default();
- let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
- let payload = data.to_smartrest()?;
- let () = self.publish(&topic, payload).await?;
- Ok(())
- }
-
async fn publish_get_pending_operations(&self) -> Result<(), SMCumulocityMapperError> {
let data = SmartRestGetPendingOperations::default();
let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
index 7f471ef2..14cb2500 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
@@ -52,11 +52,7 @@ async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8
mqtt_tests::assert_received(
&mut messages,
TEST_TIMEOUT_MS,
- vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
- "118,software-management\n",
- "500\n",
- ],
+ vec!["118,software-management\n", "500\n"],
)
.await;
sm_mapper.unwrap().abort();
@@ -119,11 +115,7 @@ async fn mapper_publishes_software_update_status_onto_c8y_topic() {
mqtt_tests::assert_received(
&mut messages,
TEST_TIMEOUT_MS,
- vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
- "118,software-management\n",
- "500\n",
- ],
+ vec!["118,software-management\n", "500\n"],
)
.await;
@@ -184,11 +176,7 @@ async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
mqtt_tests::assert_received(
&mut messages,
TEST_TIMEOUT_MS,
- vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
- "118,software-management\n",
- "500\n",
- ],
+ vec!["118,software-management\n", "500\n"],
)
.await;
@@ -368,11 +356,7 @@ async fn mapper_publishes_software_update_request_with_wrong_action() {
mqtt_tests::assert_received(
&mut messages,
TEST_TIMEOUT_MS,
- vec![
- "114,c8y_SoftwareUpdate,c8y_LogfileRequest,c8y_Restart\n",
- "118,software-management\n",
- "500\n",
- ],
+ vec!["118,software-management\n", "500\n"],
)
.await;