diff options
Diffstat (limited to 'crates/core/c8y_smartrest/src')
-rw-r--r-- | crates/core/c8y_smartrest/src/error.rs | 70 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/lib.rs | 2 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/operations.rs | 215 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_deserializer.rs | 79 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/topic.rs | 160 |
5 files changed, 525 insertions, 1 deletions
diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs index ee52f984..9531f372 100644 --- a/crates/core/c8y_smartrest/src/error.rs +++ b/crates/core/c8y_smartrest/src/error.rs @@ -1,4 +1,5 @@ use agent_interface::SoftwareUpdateResponse; +use std::path::PathBuf; #[derive(thiserror::Error, Debug)] pub enum SmartRestSerializerError { @@ -39,3 +40,72 @@ pub enum SmartRestDeserializerError { #[error("Empty request")] EmptyRequest, } + +#[derive(Debug, thiserror::Error)] +pub enum OperationsError { + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Cannot extract the operation name from the path: {0}")] + InvalidOperationName(PathBuf), + + #[error("Error while parsing operation file: '{0}': {1}.")] + TomlError(PathBuf, #[source] toml::de::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum SMCumulocityMapperError { + #[error("Invalid MQTT Message.")] + InvalidMqttMessage, + + #[error(transparent)] + InvalidTopicError(#[from] agent_interface::TopicError), + + #[error(transparent)] + InvalidThinEdgeJson(#[from] agent_interface::SoftwareError), + + #[error(transparent)] + FromElapsed(#[from] tokio::time::error::Elapsed), + + #[error(transparent)] + FromMqttClient(#[from] mqtt_channel::MqttError), + + #[error(transparent)] + FromReqwest(#[from] reqwest::Error), + + #[error(transparent)] + FromSmartRestSerializer(#[from] SmartRestSerializerError), + + #[error(transparent)] + FromSmartRestDeserializer(#[from] SmartRestDeserializerError), + + #[error(transparent)] + FromTedgeConfig(#[from] tedge_config::ConfigSettingError), + + #[error(transparent)] + FromLoadTedgeConfigError(#[from] tedge_config::TEdgeConfigError), + + #[error("Invalid date in file name: {0}")] + InvalidDateInFileName(String), + + #[error("Invalid path. Not UTF-8.")] + InvalidUtf8Path, + + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Request timed out")] + RequestTimeout, + + #[error("Operation execution failed: {0}")] + ExecuteFailed(String), + + #[error("An unknown operation template: {0}")] + UnknownOperation(String), + + #[error(transparent)] + FromTimeFormat(#[from] time::error::Format), + + #[error(transparent)] + FromTimeParse(#[from] time::error::Parse), +} diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs index 53976e8d..b6338b06 100644 --- a/crates/core/c8y_smartrest/src/lib.rs +++ b/crates/core/c8y_smartrest/src/lib.rs @@ -1,5 +1,7 @@ pub mod alarm; pub mod error; pub mod event; +pub mod operations; pub mod smartrest_deserializer; pub mod smartrest_serializer; +pub mod topic; diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs new file mode 100644 index 00000000..b3909aac --- /dev/null +++ b/crates/core/c8y_smartrest/src/operations.rs @@ -0,0 +1,215 @@ +use std::{ + collections::{HashMap, HashSet}, + fs, + path::{Path, PathBuf}, +}; + +use serde::Deserialize; + +use crate::error::OperationsError; + +/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory +/// Each operation is a file name in one of the subdirectories +/// The file name is the operation name + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub struct OnMessageExec { + command: Option<String>, + on_message: Option<String>, + topic: Option<String>, + user: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub struct Operation { + #[serde(skip)] + name: String, + exec: Option<OnMessageExec>, +} + +impl Operation { + pub fn exec(&self) -> Option<&OnMessageExec> { + self.exec.as_ref() + } + + pub fn command(&self) -> Option<String> { + self.exec().and_then(|exec| exec.command.clone()) + } + + pub fn topic(&self) -> Option<String> { + self.exec().and_then(|exec| exec.topic.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct Operations { + operations: Vec<Operation>, + operations_by_trigger: HashMap<String, usize>, +} + +impl Operations { + pub fn new() -> Self { + Self { + operations: vec![], + operations_by_trigger: HashMap::new(), + } + } + + pub fn add(&mut self, operation: Operation) { + if let Some(detail) = operation.exec() { + if let Some(on_message) = &detail.on_message { + self.operations_by_trigger + .insert(on_message.clone(), self.operations.len()); + } + } + self.operations.push(operation); + } + + pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> { + get_operations(dir.as_ref(), cloud_name) + } + + pub fn get_operations_list(&self) -> Vec<String> { + self.operations + .iter() + .map(|operation| operation.name.clone()) + .collect::<Vec<String>>() + } + + pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> { + self.operations_by_trigger + .get(operation_template) + .and_then(|index| self.operations.get(*index)) + } + + pub fn topics_for_operations(&self) -> HashSet<String> { + self.operations + .iter() + .filter_map(|operation| operation.topic()) + .collect::<HashSet<String>>() + } +} + +fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> { + let mut operations = Operations::new(); + + let path = dir.as_ref().join(&cloud_name); + let dir_entries = fs::read_dir(&path)? + .map(|entry| entry.map(|e| e.path())) + .collect::<Result<Vec<PathBuf>, _>>()? + .into_iter() + .filter(|path| path.is_file()) + .collect::<Vec<PathBuf>>(); + + for path in dir_entries { + let mut details = match fs::read(&path) { + Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) + .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, + + Err(err) => return Err(OperationsError::FromIo(err)), + }; + + details.name = path + .file_name() + .and_then(|filename| filename.to_str()) + .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? + .to_owned(); + + operations.add(details); + } + Ok(operations) +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use super::*; + use test_case::test_case; + + // Structs for state change with the builder pattern + // Structs for Operations + struct Ops(Vec<PathBuf>); + struct NoOps; + + struct TestOperationsBuilder<O> { + temp_dir: tempfile::TempDir, + operations: O, + } + + impl TestOperationsBuilder<NoOps> { + fn new() -> Self { + Self { + temp_dir: tempfile::tempdir().unwrap(), + operations: NoOps, + } + } + } + + impl TestOperationsBuilder<NoOps> { + fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> { + let Self { temp_dir, .. } = self; + + let mut operations = Vec::new(); + for i in 0..operations_count { + let file_path = temp_dir.path().join(format!("operation{}", i)); + let mut file = fs::File::create(&file_path).unwrap(); + file.write_all( + br#"[exec] + command = "echo" + on_message = "511""#, + ) + .unwrap(); + operations.push(file_path); + } + + TestOperationsBuilder { + operations: Ops(operations), + temp_dir, + } + } + } + + impl TestOperationsBuilder<Ops> { + fn build(self) -> TestOperations { + let Self { + temp_dir, + operations, + } = self; + + TestOperations { + temp_dir, + operations: operations.0, + } + } + } + + struct TestOperations { + temp_dir: tempfile::TempDir, + operations: Vec<PathBuf>, + } + + impl TestOperations { + fn builder() -> TestOperationsBuilder<NoOps> { + TestOperationsBuilder::new() + } + + fn temp_dir(&self) -> &tempfile::TempDir { + &self.temp_dir + } + } + + #[test_case(0)] + #[test_case(1)] + #[test_case(5)] + fn get_operations_all(ops_count: usize) { + let test_operations = TestOperations::builder().with_operations(ops_count).build(); + + let operations = get_operations(test_operations.temp_dir(), "").unwrap(); + dbg!(&operations); + + assert_eq!(operations.operations.len(), ops_count); + } +} diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index c995c7d1..fea01538 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -1,10 +1,11 @@ -use crate::error::SmartRestDeserializerError; +use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError}; use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest}; use csv::ReaderBuilder; use download::DownloadInfo; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use std::convert::{TryFrom, TryInto}; +use std::path::PathBuf; use time::{format_description, OffsetDateTime}; #[derive(Debug)] @@ -287,12 +288,50 @@ impl SmartRestJwtResponse { } } +/// Returns a date time object from a file path or file-path-like string +/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" +/// +/// # Examples: +/// ``` +/// use std::path::PathBuf; +/// use crate::c8y_smartrest::smartrest_deserializer::get_datetime_from_file_path; +/// +/// let mut path = PathBuf::new(); +/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z"); +/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap(); +/// ``` +pub fn get_datetime_from_file_path( + log_path: &PathBuf, +) -> Result<OffsetDateTime, SMCumulocityMapperError> { + if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) { + // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z. + // to extract the date, rsplit string on "-" and take (last) 3 + let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>(); + // reverse back the order (because of rsplit) + stem_string_vec.reverse(); + // join on '-' to get the date string + let date_string = stem_string_vec.join("-"); + let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?; + + return Ok(dt); + } + match log_path.to_str() { + Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName( + path.to_string(), + ))?, + None => Err(SMCumulocityMapperError::InvalidUtf8Path)?, + } +} + #[cfg(test)] mod tests { use super::*; use agent_interface::*; use assert_json_diff::*; use serde_json::json; + use std::fs::File; + use std::io::Write; + use std::str::FromStr; use test_case::test_case; // To avoid using an ID randomly generated, which is not convenient for testing. @@ -568,4 +607,42 @@ mod tests { let log = SmartRestRestartRequest::from_smartrest(&smartrest); assert!(log.is_ok()); } + + #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] + #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")] + fn test_datetime_parsing_from_path(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an Ok Result. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_ok()); + } + + #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")] + #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")] + fn test_datetime_parsing_from_path_fail(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an err. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_err()); + } + + fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] { + let mut files: Vec<&str> = vec![]; + for line in log_content.lines() { + if line.contains("filename: ") { + let filename: &str = line.split("filename: ").last().unwrap(); + files.push(filename); + } + } + match files.try_into() { + Ok(arr) => arr, + Err(_) => panic!("Could not convert to Array &str, size 5"), + } + } } diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs new file mode 100644 index 00000000..491c7a4b --- /dev/null +++ b/crates/core/c8y_smartrest/src/topic.rs @@ -0,0 +1,160 @@ +use agent_interface::topic::ResponseTopic; +use agent_interface::TopicError; +use mqtt_channel::Topic; +use mqtt_channel::{Message, MqttError}; + +use crate::error::SMCumulocityMapperError; +use crate::smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToSuccessful, +}; + +#[derive(Debug, Clone, PartialEq)] +pub enum C8yTopic { + SmartRestRequest, + SmartRestResponse, + OperationTopic(String), +} + +impl C8yTopic { + pub fn as_str(&self) -> &str { + match self { + Self::SmartRestRequest => r#"c8y/s/ds"#, + Self::SmartRestResponse => r#"c8y/s/us"#, + Self::OperationTopic(name) => name.as_str(), + } + } + + pub fn to_topic(&self) -> Result<Topic, MqttError> { + Ok(Topic::new(self.as_str())?) + } +} + +impl TryFrom<String> for C8yTopic { + type Error = TopicError; + + fn try_from(value: String) -> Result<Self, Self::Error> { + match value.as_str() { + r#"c8y/s/ds"# => Ok(C8yTopic::SmartRestRequest), + r#"c8y/s/us"# => Ok(C8yTopic::SmartRestResponse), + topic_name => { + if topic_name[..3].contains("c8y") { + Ok(C8yTopic::OperationTopic(topic_name.to_string())) + } else { + Err(TopicError::UnknownTopic { + topic: topic_name.to_string(), + }) + } + } + } + } +} +impl TryFrom<&str> for C8yTopic { + type Error = TopicError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + Self::try_from(value.to_string()) + } +} + +impl TryFrom<Topic> for C8yTopic { + type Error = TopicError; + + fn try_from(value: Topic) -> Result<Self, Self::Error> { + value.name.try_into() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum MapperSubscribeTopic { + C8yTopic(C8yTopic), + ResponseTopic(ResponseTopic), +} + +impl TryFrom<String> for MapperSubscribeTopic { + type Error = TopicError; + fn try_from(value: String) -> Result<Self, Self::Error> { + match ResponseTopic::try_from(value.clone()) { + Ok(response_topic) => Ok(MapperSubscribeTopic::ResponseTopic(response_topic)), + Err(_) => match C8yTopic::try_from(value) { + Ok(smart_rest_request) => Ok(MapperSubscribeTopic::C8yTopic(smart_rest_request)), + Err(err) => Err(err), + }, + } + } +} + +impl TryFrom<&str> for MapperSubscribeTopic { + type Error = TopicError; + + fn try_from(value: &str) -> Result<Self, Self::Error> { + Self::try_from(value.to_string()) + } +} + +impl TryFrom<Topic> for MapperSubscribeTopic { + type Error = TopicError; + + fn try_from(value: Topic) -> Result<Self, Self::Error> { + value.name.try_into() + } +} + +/// returns a c8y message specifying to set log status to executing. +/// +/// example message: '501,c8y_LogfileRequest' +pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest()?; + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y message specifying to set log status to successful. +/// +/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' +pub async fn get_log_file_request_done_message( + binary_upload_event_url: &str, +) -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(binary_upload_event_url) + .to_smartrest()?; + + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryInto; + + #[test] + fn convert_c8y_topic_to_str() { + assert_eq!(C8yTopic::SmartRestRequest.as_str(), "c8y/s/ds"); + assert_eq!(C8yTopic::SmartRestResponse.as_str(), "c8y/s/us"); + } + + #[test] + fn convert_str_into_c8y_topic() { + let c8y_req: C8yTopic = "c8y/s/ds".try_into().unwrap(); + assert_eq!(c8y_req, C8yTopic::SmartRestRequest); + let c8y_resp: C8yTopic = "c8y/s/us".try_into().unwrap(); + assert_eq!(c8y_resp, C8yTopic::SmartRestResponse); + let error: Result<C8yTopic, TopicError> = "test".try_into(); + assert!(error.is_err()); + } + + #[test] + fn convert_topic_into_c8y_topic() { + let c8y_req: C8yTopic = Topic::new("c8y/s/ds").unwrap().try_into().unwrap(); + assert_eq!(c8y_req, C8yTopic::SmartRestRequest); + + let c8y_resp: C8yTopic = Topic::new("c8y/s/us").unwrap().try_into().unwrap(); + assert_eq!(c8y_resp, C8yTopic::SmartRestResponse); + let error: Result<C8yTopic, TopicError> = Topic::new("test").unwrap().try_into(); + assert!(error.is_err()); + } +} |