summaryrefslogtreecommitdiffstats
path: root/crates/core/c8y_smartrest/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/c8y_smartrest/src')
-rw-r--r--crates/core/c8y_smartrest/src/error.rs70
-rw-r--r--crates/core/c8y_smartrest/src/lib.rs2
-rw-r--r--crates/core/c8y_smartrest/src/operations.rs215
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs79
-rw-r--r--crates/core/c8y_smartrest/src/topic.rs160
5 files changed, 525 insertions, 1 deletions
diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs
index ee52f984..9531f372 100644
--- a/crates/core/c8y_smartrest/src/error.rs
+++ b/crates/core/c8y_smartrest/src/error.rs
@@ -1,4 +1,5 @@
use agent_interface::SoftwareUpdateResponse;
+use std::path::PathBuf;
#[derive(thiserror::Error, Debug)]
pub enum SmartRestSerializerError {
@@ -39,3 +40,72 @@ pub enum SmartRestDeserializerError {
#[error("Empty request")]
EmptyRequest,
}
+
+#[derive(Debug, thiserror::Error)]
+pub enum OperationsError {
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
+
+ #[error("Cannot extract the operation name from the path: {0}")]
+ InvalidOperationName(PathBuf),
+
+ #[error("Error while parsing operation file: '{0}': {1}.")]
+ TomlError(PathBuf, #[source] toml::de::Error),
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum SMCumulocityMapperError {
+ #[error("Invalid MQTT Message.")]
+ InvalidMqttMessage,
+
+ #[error(transparent)]
+ InvalidTopicError(#[from] agent_interface::TopicError),
+
+ #[error(transparent)]
+ InvalidThinEdgeJson(#[from] agent_interface::SoftwareError),
+
+ #[error(transparent)]
+ FromElapsed(#[from] tokio::time::error::Elapsed),
+
+ #[error(transparent)]
+ FromMqttClient(#[from] mqtt_channel::MqttError),
+
+ #[error(transparent)]
+ FromReqwest(#[from] reqwest::Error),
+
+ #[error(transparent)]
+ FromSmartRestSerializer(#[from] SmartRestSerializerError),
+
+ #[error(transparent)]
+ FromSmartRestDeserializer(#[from] SmartRestDeserializerError),
+
+ #[error(transparent)]
+ FromTedgeConfig(#[from] tedge_config::ConfigSettingError),
+
+ #[error(transparent)]
+ FromLoadTedgeConfigError(#[from] tedge_config::TEdgeConfigError),
+
+ #[error("Invalid date in file name: {0}")]
+ InvalidDateInFileName(String),
+
+ #[error("Invalid path. Not UTF-8.")]
+ InvalidUtf8Path,
+
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
+
+ #[error("Request timed out")]
+ RequestTimeout,
+
+ #[error("Operation execution failed: {0}")]
+ ExecuteFailed(String),
+
+ #[error("An unknown operation template: {0}")]
+ UnknownOperation(String),
+
+ #[error(transparent)]
+ FromTimeFormat(#[from] time::error::Format),
+
+ #[error(transparent)]
+ FromTimeParse(#[from] time::error::Parse),
+}
diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs
index 53976e8d..b6338b06 100644
--- a/crates/core/c8y_smartrest/src/lib.rs
+++ b/crates/core/c8y_smartrest/src/lib.rs
@@ -1,5 +1,7 @@
pub mod alarm;
pub mod error;
pub mod event;
+pub mod operations;
pub mod smartrest_deserializer;
pub mod smartrest_serializer;
+pub mod topic;
diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs
new file mode 100644
index 00000000..b3909aac
--- /dev/null
+++ b/crates/core/c8y_smartrest/src/operations.rs
@@ -0,0 +1,215 @@
+use std::{
+ collections::{HashMap, HashSet},
+ fs,
+ path::{Path, PathBuf},
+};
+
+use serde::Deserialize;
+
+use crate::error::OperationsError;
+
+/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory
+/// Each operation is a file name in one of the subdirectories
+/// The file name is the operation name
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub struct OnMessageExec {
+ command: Option<String>,
+ on_message: Option<String>,
+ topic: Option<String>,
+ user: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "lowercase")]
+pub struct Operation {
+ #[serde(skip)]
+ name: String,
+ exec: Option<OnMessageExec>,
+}
+
+impl Operation {
+ pub fn exec(&self) -> Option<&OnMessageExec> {
+ self.exec.as_ref()
+ }
+
+ pub fn command(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.command.clone())
+ }
+
+ pub fn topic(&self) -> Option<String> {
+ self.exec().and_then(|exec| exec.topic.clone())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Operations {
+ operations: Vec<Operation>,
+ operations_by_trigger: HashMap<String, usize>,
+}
+
+impl Operations {
+ pub fn new() -> Self {
+ Self {
+ operations: vec![],
+ operations_by_trigger: HashMap::new(),
+ }
+ }
+
+ pub fn add(&mut self, operation: Operation) {
+ if let Some(detail) = operation.exec() {
+ if let Some(on_message) = &detail.on_message {
+ self.operations_by_trigger
+ .insert(on_message.clone(), self.operations.len());
+ }
+ }
+ self.operations.push(operation);
+ }
+
+ pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> {
+ get_operations(dir.as_ref(), cloud_name)
+ }
+
+ pub fn get_operations_list(&self) -> Vec<String> {
+ self.operations
+ .iter()
+ .map(|operation| operation.name.clone())
+ .collect::<Vec<String>>()
+ }
+
+ pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> {
+ self.operations_by_trigger
+ .get(operation_template)
+ .and_then(|index| self.operations.get(*index))
+ }
+
+ pub fn topics_for_operations(&self) -> HashSet<String> {
+ self.operations
+ .iter()
+ .filter_map(|operation| operation.topic())
+ .collect::<HashSet<String>>()
+ }
+}
+
+fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> {
+ let mut operations = Operations::new();
+
+ let path = dir.as_ref().join(&cloud_name);
+ let dir_entries = fs::read_dir(&path)?
+ .map(|entry| entry.map(|e| e.path()))
+ .collect::<Result<Vec<PathBuf>, _>>()?
+ .into_iter()
+ .filter(|path| path.is_file())
+ .collect::<Vec<PathBuf>>();
+
+ for path in dir_entries {
+ let mut details = match fs::read(&path) {
+ Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice())
+ .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?,
+
+ Err(err) => return Err(OperationsError::FromIo(err)),
+ };
+
+ details.name = path
+ .file_name()
+ .and_then(|filename| filename.to_str())
+ .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))?
+ .to_owned();
+
+ operations.add(details);
+ }
+ Ok(operations)
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Write;
+
+ use super::*;
+ use test_case::test_case;
+
+ // Structs for state change with the builder pattern
+ // Structs for Operations
+ struct Ops(Vec<PathBuf>);
+ struct NoOps;
+
+ struct TestOperationsBuilder<O> {
+ temp_dir: tempfile::TempDir,
+ operations: O,
+ }
+
+ impl TestOperationsBuilder<NoOps> {
+ fn new() -> Self {
+ Self {
+ temp_dir: tempfile::tempdir().unwrap(),
+ operations: NoOps,
+ }
+ }
+ }
+
+ impl TestOperationsBuilder<NoOps> {
+ fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> {
+ let Self { temp_dir, .. } = self;
+
+ let mut operations = Vec::new();
+ for i in 0..operations_count {
+ let file_path = temp_dir.path().join(format!("operation{}", i));
+ let mut file = fs::File::create(&file_path).unwrap();
+ file.write_all(
+ br#"[exec]
+ command = "echo"
+ on_message = "511""#,
+ )
+ .unwrap();
+ operations.push(file_path);
+ }
+
+ TestOperationsBuilder {
+ operations: Ops(operations),
+ temp_dir,
+ }
+ }
+ }
+
+ impl TestOperationsBuilder<Ops> {
+ fn build(self) -> TestOperations {
+ let Self {
+ temp_dir,
+ operations,
+ } = self;
+
+ TestOperations {
+ temp_dir,
+ operations: operations.0,
+ }
+ }
+ }
+
+ struct TestOperations {
+ temp_dir: tempfile::TempDir,
+ operations: Vec<PathBuf>,
+ }
+
+ impl TestOperations {
+ fn builder() -> TestOperationsBuilder<NoOps> {
+ TestOperationsBuilder::new()
+ }
+
+ fn temp_dir(&self) -> &tempfile::TempDir {
+ &self.temp_dir
+ }
+ }
+
+ #[test_case(0)]
+ #[test_case(1)]
+ #[test_case(5)]
+ fn get_operations_all(ops_count: usize) {
+ let test_operations = TestOperations::builder().with_operations(ops_count).build();
+
+ let operations = get_operations(test_operations.temp_dir(), "").unwrap();
+ dbg!(&operations);
+
+ assert_eq!(operations.operations.len(), ops_count);
+ }
+}
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index c995c7d1..fea01538 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -1,10 +1,11 @@
-use crate::error::SmartRestDeserializerError;
+use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError};
use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest};
use csv::ReaderBuilder;
use download::DownloadInfo;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use std::convert::{TryFrom, TryInto};
+use std::path::PathBuf;
use time::{format_description, OffsetDateTime};
#[derive(Debug)]
@@ -287,12 +288,50 @@ impl SmartRestJwtResponse {
}
}
+/// Returns a date time object from a file path or file-path-like string
+/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
+///
+/// # Examples:
+/// ```
+/// use std::path::PathBuf;
+/// use crate::c8y_smartrest::smartrest_deserializer::get_datetime_from_file_path;
+///
+/// let mut path = PathBuf::new();
+/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z");
+/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap();
+/// ```
+pub fn get_datetime_from_file_path(
+ log_path: &PathBuf,
+) -> Result<OffsetDateTime, SMCumulocityMapperError> {
+ if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) {
+ // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z.
+ // to extract the date, rsplit string on "-" and take (last) 3
+ let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>();
+ // reverse back the order (because of rsplit)
+ stem_string_vec.reverse();
+ // join on '-' to get the date string
+ let date_string = stem_string_vec.join("-");
+ let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?;
+
+ return Ok(dt);
+ }
+ match log_path.to_str() {
+ Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName(
+ path.to_string(),
+ ))?,
+ None => Err(SMCumulocityMapperError::InvalidUtf8Path)?,
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use agent_interface::*;
use assert_json_diff::*;
use serde_json::json;
+ use std::fs::File;
+ use std::io::Write;
+ use std::str::FromStr;
use test_case::test_case;
// To avoid using an ID randomly generated, which is not convenient for testing.
@@ -568,4 +607,42 @@ mod tests {
let log = SmartRestRestartRequest::from_smartrest(&smartrest);
assert!(log.is_ok());
}
+
+ #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")]
+ fn test_datetime_parsing_from_path(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an Ok Result.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_ok());
+ }
+
+ #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")]
+ fn test_datetime_parsing_from_path_fail(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an err.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_err());
+ }
+
+ fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] {
+ let mut files: Vec<&str> = vec![];
+ for line in log_content.lines() {
+ if line.contains("filename: ") {
+ let filename: &str = line.split("filename: ").last().unwrap();
+ files.push(filename);
+ }
+ }
+ match files.try_into() {
+ Ok(arr) => arr,
+ Err(_) => panic!("Could not convert to Array &str, size 5"),
+ }
+ }
}
diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs
new file mode 100644
index 00000000..491c7a4b
--- /dev/null
+++ b/crates/core/c8y_smartrest/src/topic.rs
@@ -0,0 +1,160 @@
+use agent_interface::topic::ResponseTopic;
+use agent_interface::TopicError;
+use mqtt_channel::Topic;
+use mqtt_channel::{Message, MqttError};
+
+use crate::error::SMCumulocityMapperError;
+use crate::smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToSuccessful,
+};
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum C8yTopic {
+ SmartRestRequest,
+ SmartRestResponse,
+ OperationTopic(String),
+}
+
+impl C8yTopic {
+ pub fn as_str(&self) -> &str {
+ match self {
+ Self::SmartRestRequest => r#"c8y/s/ds"#,
+ Self::SmartRestResponse => r#"c8y/s/us"#,
+ Self::OperationTopic(name) => name.as_str(),
+ }
+ }
+
+ pub fn to_topic(&self) -> Result<Topic, MqttError> {
+ Ok(Topic::new(self.as_str())?)
+ }
+}
+
+impl TryFrom<String> for C8yTopic {
+ type Error = TopicError;
+
+ fn try_from(value: String) -> Result<Self, Self::Error> {
+ match value.as_str() {
+ r#"c8y/s/ds"# => Ok(C8yTopic::SmartRestRequest),
+ r#"c8y/s/us"# => Ok(C8yTopic::SmartRestResponse),
+ topic_name => {
+ if topic_name[..3].contains("c8y") {
+ Ok(C8yTopic::OperationTopic(topic_name.to_string()))
+ } else {
+ Err(TopicError::UnknownTopic {
+ topic: topic_name.to_string(),
+ })
+ }
+ }
+ }
+ }
+}
+impl TryFrom<&str> for C8yTopic {
+ type Error = TopicError;
+
+ fn try_from(value: &str) -> Result<Self, Self::Error> {
+ Self::try_from(value.to_string())
+ }
+}
+
+impl TryFrom<Topic> for C8yTopic {
+ type Error = TopicError;
+
+ fn try_from(value: Topic) -> Result<Self, Self::Error> {
+ value.name.try_into()
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum MapperSubscribeTopic {
+ C8yTopic(C8yTopic),
+ ResponseTopic(ResponseTopic),
+}
+
+impl TryFrom<String> for MapperSubscribeTopic {
+ type Error = TopicError;
+ fn try_from(value: String) -> Result<Self, Self::Error> {
+ match ResponseTopic::try_from(value.clone()) {
+ Ok(response_topic) => Ok(MapperSubscribeTopic::ResponseTopic(response_topic)),
+ Err(_) => match C8yTopic::try_from(value) {
+ Ok(smart_rest_request) => Ok(MapperSubscribeTopic::C8yTopic(smart_rest_request)),
+ Err(err) => Err(err),
+ },
+ }
+ }
+}
+
+impl TryFrom<&str> for MapperSubscribeTopic {
+ type Error = TopicError;
+
+ fn try_from(value: &str) -> Result<Self, Self::Error> {
+ Self::try_from(value.to_string())
+ }
+}
+
+impl TryFrom<Topic> for MapperSubscribeTopic {
+ type Error = TopicError;
+
+ fn try_from(value: Topic) -> Result<Self, Self::Error> {
+ value.name.try_into()
+ }
+}
+
+/// returns a c8y message specifying to set log status to executing.
+///
+/// example message: '501,c8y_LogfileRequest'
+pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()?;
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+/// returns a c8y message specifying to set log status to successful.
+///
+/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...'
+pub async fn get_log_file_request_done_message(
+ binary_upload_event_url: &str,
+) -> Result<Message, SMCumulocityMapperError> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .with_response_parameter(binary_upload_event_url)
+ .to_smartrest()?;
+
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::convert::TryInto;
+
+ #[test]
+ fn convert_c8y_topic_to_str() {
+ assert_eq!(C8yTopic::SmartRestRequest.as_str(), "c8y/s/ds");
+ assert_eq!(C8yTopic::SmartRestResponse.as_str(), "c8y/s/us");
+ }
+
+ #[test]
+ fn convert_str_into_c8y_topic() {
+ let c8y_req: C8yTopic = "c8y/s/ds".try_into().unwrap();
+ assert_eq!(c8y_req, C8yTopic::SmartRestRequest);
+ let c8y_resp: C8yTopic = "c8y/s/us".try_into().unwrap();
+ assert_eq!(c8y_resp, C8yTopic::SmartRestResponse);
+ let error: Result<C8yTopic, TopicError> = "test".try_into();
+ assert!(error.is_err());
+ }
+
+ #[test]
+ fn convert_topic_into_c8y_topic() {
+ let c8y_req: C8yTopic = Topic::new("c8y/s/ds").unwrap().try_into().unwrap();
+ assert_eq!(c8y_req, C8yTopic::SmartRestRequest);
+
+ let c8y_resp: C8yTopic = Topic::new("c8y/s/us").unwrap().try_into().unwrap();
+ assert_eq!(c8y_resp, C8yTopic::SmartRestResponse);
+ let error: Result<C8yTopic, TopicError> = Topic::new("test").unwrap().try_into();
+ assert!(error.is_err());
+ }
+}