diff options
author | initard <solo@softwareag.com> | 2022-02-01 17:49:53 +0100 |
---|---|---|
committer | initard <solo@softwareag.com> | 2022-02-15 11:43:49 +0000 |
commit | 2411743f17e14fa6031d4940d6a0135abdc00baf (patch) | |
tree | ddc7782f8e72a4a3b4531585ec93779bfaf0c05c | |
parent | ae402f67527d022a3cedd60f049d73724079850a (diff) |
c8y_api lib, tedge config, mqtt/http utilities (#790)
Preparing the repo for the log request plugin. Restructuring
folders, moving code out of sm_c8y_mapper and into c8y_api,
c8y_smartrest or tedge_config.
Signed-off-by: initard <solo@softwareag.com>
18 files changed, 574 insertions, 311 deletions
@@ -330,6 +330,40 @@ dependencies = [ ] [[package]] +name = "c8y_api" +version = "0.1.0" +dependencies = [ + "agent_interface", + "async-trait", + "batcher", + "c8y_smartrest", + "c8y_translator", + "chrono", + "clock", + "csv", + "download", + "flockfile", + "futures", + "mockall", + "mqtt_channel", + "reqwest", + "serde", + "serde_json", + "structopt", + "tedge_config", + "tedge_users", + "tedge_utils", + "tempfile", + "test-case", + "thin_edge_json", + "thiserror", + "time 0.3.5", + "tokio", + "toml", + "tracing", +] + +[[package]] name = "c8y_smartrest" version = "0.5.2" dependencies = [ @@ -339,12 +373,18 @@ dependencies = [ "assert_matches", "csv", "download", + "mqtt_channel", + "reqwest", "serde", "serde_json", + "tedge_config", + "tempfile", "test-case", "thin_edge_json", "thiserror", "time 0.3.5", + "tokio", + "toml", ] [[package]] @@ -424,6 +464,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "time 0.1.43", "winapi", ] @@ -2700,6 +2741,7 @@ dependencies = [ "assert_matches", "async-trait", "batcher", + "c8y_api", "c8y_smartrest", "c8y_translator", "clock", @@ -2753,13 +2795,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.2.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ "cfg-if 1.0.0", + "fastrand", "libc", - "rand", "redox_syscall", "remove_dir_all", "winapi", @@ -6,7 +6,7 @@ members = [ "crates/tests/*", "plugins/tedge_apt_plugin", "plugins/tedge_dummy_plugin", - "plugins/tedge_apama_plugin" + "plugins/tedge_apama_plugin", ] [profile.release] diff --git a/crates/common/tedge_config/src/tedge_config.rs b/crates/common/tedge_config/src/tedge_config.rs index 52e0178e..32587a15 100644 --- a/crates/common/tedge_config/src/tedge_config.rs +++ b/crates/common/tedge_config/src/tedge_config.rs @@ -2,6 +2,13 @@ use crate::*; use certificate::{CertificateError, PemCertificate}; use std::convert::{TryFrom, TryInto}; +/// loads tedge config from system default +pub fn get_tedge_config() -> Result<TEdgeConfig, TEdgeConfigError> { + let tedge_config_location = TEdgeConfigLocation::from_default_system_location(); + let config_repository = TEdgeConfigRepository::new(tedge_config_location); + Ok(config_repository.load()?) +} + /// Represents the complete configuration of a thin edge device. /// This configuration is a wrapper over the device specific configurations /// as well as the IoT cloud provider specific configurations. diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml new file mode 100644 index 00000000..1e3cb0f5 --- /dev/null +++ b/crates/core/c8y_api/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "c8y_api" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +agent_interface = { path = "../agent_interface"} +async-trait = "0.1" +batcher = { path = "../../common/batcher" } +c8y_smartrest = { path = "../c8y_smartrest" } +c8y_translator = { path = "../c8y_translator" } +chrono = "0.4" +clock = { path = "../../common/clock" } +csv = "1.1" +download = { path = "../../common/download" } +flockfile = { path = "../../common/flockfile" } +futures = "0.3" +mockall = "0.10" +mqtt_channel = { path = "../../common/mqtt_channel" } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +structopt = "0.3" +tedge_config = { path = "../../common/tedge_config" } +tedge_users = { path = "../../common/tedge_users" } +tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } +thin_edge_json = { path = "../thin_edge_json" } +thiserror = "1.0" +time = { version = "0.3", features = ["formatting"] } +tokio = { version = "1.8", features = ["rt", "sync", "time"] } +toml = "0.5" +tracing = { version = "0.1", features = ["attributes", "log"] } + +[dev-dependencies] +tempfile = "3.3" +test-case = "1.2" diff --git a/crates/core/c8y_api/src/error.rs b/crates/core/c8y_api/src/error.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/crates/core/c8y_api/src/error.rs @@ -0,0 +1 @@ + diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 4013bc0a..2a56946a 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -1,22 +1,51 @@ -use crate::sm_c8y_mapper::error::SMCumulocityMapperError; -use crate::sm_c8y_mapper::json_c8y::{ +use crate::json_c8y::{ C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, }; -use crate::sm_c8y_mapper::mapper::SmartRestLogEvent; use async_trait::async_trait; -use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; +use c8y_smartrest::{ + error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse, topic::C8yTopic, +}; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; use std::time::Duration; use tedge_config::{ - C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, - MqttPortSetting, TEdgeConfig, + get_tedge_config, C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, + DeviceIdSetting, MqttPortSetting, TEdgeConfig, }; use time::{format_description, OffsetDateTime}; + +use serde::{Deserialize, Serialize}; use tracing::{error, info, instrument}; const RETRY_TIMEOUT_SECS: u64 = 60; +/// creates an mqtt client with a given `session_name` +pub async fn create_mqtt_client( + session_name: &str, +) -> Result<mqtt_channel::Connection, SMCumulocityMapperError> { + let tedge_config = get_tedge_config()?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_session_name(session_name) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestResponse.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +/// creates an http client with a given `session_name` +pub async fn create_http_client( + session_name: &str, +) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> { + let config = get_tedge_config()?; + let mut http_proxy = JwtAuthHttpProxy::try_new(&config, &session_name).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + /// An HttpProxy handles http requests to C8y on behalf of the device. #[async_trait] pub trait C8YHttpProxy { @@ -117,6 +146,13 @@ impl C8yEndPoint { } } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +/// used to retrieve the id of a log event +pub struct SmartRestLogEvent { + pub id: String, +} + /// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device /// /// - Keep the connection info to c8y and the internal Id of the device @@ -147,6 +183,7 @@ impl JwtAuthHttpProxy { pub async fn try_new( tedge_config: &TEdgeConfig, + session_name: &str, ) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> { let c8y_host = tedge_config.query_string(C8yUrlSetting)?; let device_id = tedge_config.query_string(DeviceIdSetting)?; @@ -157,7 +194,7 @@ impl JwtAuthHttpProxy { let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) .with_clean_session(true) - .with_session_name("JWT-Requester") + .with_session_name(session_name) .with_subscriptions(topic); let mut mqtt_con = Connection::new(&mqtt_config).await?; @@ -233,6 +270,7 @@ impl JwtAuthHttpProxy { .build()?; let response = self.http_con.execute(request).await?; + dbg!(&response); let event_response_body = response.json::<SmartRestLogEvent>().await?; Ok(event_response_body.id) diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 8566a84c..8566a84c 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs new file mode 100644 index 00000000..2a0286e0 --- /dev/null +++ b/crates/core/c8y_api/src/lib.rs @@ -0,0 +1,12 @@ +pub mod error; +pub mod http_proxy; +pub mod json_c8y; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} diff --git a/crates/core/c8y_smartrest/Cargo.toml b/crates/core/c8y_smartrest/Cargo.toml index 03f575bb..a25ef0da 100644 --- a/crates/core/c8y_smartrest/Cargo.toml +++ b/crates/core/c8y_smartrest/Cargo.toml @@ -9,14 +9,20 @@ rust-version = "1.58.1" agent_interface = { path = "../agent_interface" } csv = "1.1" download = { path = "../../common/download" } +mqtt_channel = { path = "../../common/mqtt_channel" } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } +tedge_config = { path = "../../common/tedge_config" } thin_edge_json = { path = "../thin_edge_json" } thiserror = "1.0" time = { version = "0.3", features = ["formatting", "macros", "parsing", "serde"] } +tokio = { version = "1.8", features = ["rt", "sync", "time"] } +toml = "0.5" [dev-dependencies] anyhow = "1.0" assert_matches = "1.5" assert-json-diff = "2.0" serde_json = "1.0" +tempfile = "3.3" test-case = "1.2.1" diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs index ee52f984..9531f372 100644 --- a/crates/core/c8y_smartrest/src/error.rs +++ b/crates/core/c8y_smartrest/src/error.rs @@ -1,4 +1,5 @@ use agent_interface::SoftwareUpdateResponse; +use std::path::PathBuf; #[derive(thiserror::Error, Debug)] pub enum SmartRestSerializerError { @@ -39,3 +40,72 @@ pub enum SmartRestDeserializerError { #[error("Empty request")] EmptyRequest, } + +#[derive(Debug, thiserror::Error)] +pub enum OperationsError { + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Cannot extract the operation name from the path: {0}")] + InvalidOperationName(PathBuf), + + #[error("Error while parsing operation file: '{0}': {1}.")] + TomlError(PathBuf, #[source] toml::de::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum SMCumulocityMapperError { + #[error("Invalid MQTT Message.")] + InvalidMqttMessage, + + #[error(transparent)] + InvalidTopicError(#[from] agent_interface::TopicError), + + #[error(transparent)] + InvalidThinEdgeJson(#[from] agent_interface::SoftwareError), + + #[error(transparent)] + FromElapsed(#[from] tokio::time::error::Elapsed), + + #[error(transparent)] + FromMqttClient(#[from] mqtt_channel::MqttError), + + #[error(transparent)] + FromReqwest(#[from] reqwest::Error), + + #[error(transparent)] + FromSmartRestSerializer(#[from] SmartRestSerializerError), + + #[error(transparent)] + FromSmartRestDeserializer(#[from] SmartRestDeserializerError), + + #[error(transparent)] + FromTedgeConfig(#[from] tedge_config::ConfigSettingError), + + #[error(transparent)] + FromLoadTedgeConfigError(#[from] tedge_config::TEdgeConfigError), + + #[error("Invalid date in file name: {0}")] + InvalidDateInFileName(String), + + #[error("Invalid path. Not UTF-8.")] + InvalidUtf8Path, + + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Request timed out")] + RequestTimeout, + + #[error("Operation execution failed: {0}")] + ExecuteFailed(String), + + #[error("An unknown operation template: {0}")] + UnknownOperation(String), + + #[error(transparent)] + FromTimeFormat(#[from] time::error::Format), + + #[error(transparent)] + FromTimeParse(#[from] time::error::Parse), +} diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs index 53976e8d..b6338b06 100644 --- a/crates/core/c8y_smartrest/src/lib.rs +++ b/crates/core/c8y_smartrest/src/lib.rs @@ -1,5 +1,7 @@ pub mod alarm; pub mod error; pub mod event; +pub mod operations; pub mod smartrest_deserializer; pub mod smartrest_serializer; +pub mod topic; diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs new file mode 100644 index 00000000..b3909aac --- /dev/null +++ b/crates/core/c8y_smartrest/src/operations.rs @@ -0,0 +1,215 @@ +use std::{ + collections::{HashMap, HashSet}, + fs, + path::{Path, PathBuf}, +}; + +use serde::Deserialize; + +use crate::error::OperationsError; + +/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory +/// Each operation is a file name in one of the subdirectories +/// The file name is the operation name + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub struct OnMessageExec { + command: Option<String>, + on_message: Option<String>, + topic: Option<String>, + user: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub struct Operation { + #[serde(skip)] + name: String, + exec: Option<OnMessageExec>, +} + +impl Operation { + pub fn exec(&self) -> Option<&OnMessageExec> { + self.exec.as_ref() + } + + pub fn command(&self) -> Option<String> { + self.exec().and_then(|exec| exec.command.clone()) + } + + pub fn topic(&self) -> Option<String> { + self.exec().and_then(|exec| exec.topic.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct Operations { + operations: Vec<Operation>, + operations_by_trigger: HashMap<String, usize>, +} + +impl Operations { + pub fn new() -> Self { + Self { + operations: vec![], + operations_by_trigger: HashMap::new(), + } + } + + pub fn add(&mut self, operation: Operation) { + if let Some(detail) = operation.exec() { + if let Some(on_message) = &detail.on_message { + self.operations_by_trigger + .insert(on_message.clone(), self.operations.len()); + } + } + self.operations.push(operation); + } + + pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> { + get_operations(dir.as_ref(), cloud_name) + } + + pub fn get_operations_list(&self) -> Vec<String> { + self.operations + .iter() + .map(|operation| operation.name.clone()) + .collect::<Vec<String>>() + } + + pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> { + self.operations_by_trigger + .get(operation_template) + .and_then(|index| self.operations.get(*index)) + } + + pub fn topics_for_operations(&self) -> HashSet<String> { + self.operations + .iter() + .filter_map(|operation| operation.topic()) + .collect::<HashSet<String>>() + } +} + +fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> { + let mut operations = Operations::new(); + + let path = dir.as_ref().join(&cloud_name); + let dir_entries = fs::read_dir(&path)? + .map(|entry| entry.map(|e| e.path())) + .collect::<Result<Vec<PathBuf>, _>>()? + .into_iter() + .filter(|path| path.is_file()) + .collect::<Vec<PathBuf>>(); + + for path in dir_entries { + let mut details = match fs::read(&path) { + Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) + .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, + + Err(err) => return Err(OperationsError::FromIo(err)), + }; + + details.name = path + .file_name() + .and_then(|filename| filename.to_str()) + .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? + .to_owned(); + + operations.add(details); + } + Ok(operations) +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use super::*; + use test_case::test_case; + + // Structs for state change with the builder pattern + // Structs for Operations + struct Ops(Vec<PathBuf>); + struct NoOps; + + struct TestOperationsBuilder<O> { + temp_dir: tempfile::TempDir, + operations: O, + } + + impl TestOperationsBuilder<NoOps> { + fn new() -> Self { + Self { + temp_dir: tempfile::tempdir().unwrap(), + operations: NoOps, + } + } + } + + impl TestOperationsBuilder<NoOps> { + fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> { + let Self { temp_dir, .. } = self; + + let mut operations = Vec::new(); + for i in 0..operations_count { + let file_path = temp_dir.path().join(format!("operation{}", i)); + let mut file = fs::File::create(&file_path).unwrap(); + file.write_all( + br#"[exec] + command = "echo" + on_message = "511""#, + ) + .unwrap(); + operations.push(file_path); + } + + TestOperationsBuilder { + operations: Ops(operations), + temp_dir, + } + } + } + + impl TestOperationsBuilder<Ops> { + fn build(self) -> TestOperations { + let Self { + temp_dir, + operations, + } = self; + + TestOperations { + temp_dir, + operations: operations.0, + } + } + } + + struct TestOperations { + temp_dir: tempfile::TempDir, + operations: Vec<PathBuf>, + } + + impl TestOperations { + fn builder() -> TestOperationsBuilder<NoOps> { + TestOperationsBuilder::new() + } + + fn temp_dir(&self) -> &tempfile::TempDir { + &self.temp_dir + } + } + + #[test_case(0)] + #[test_case(1)] + #[test_case(5)] + fn get_operations_all(ops_count: usize) { + let test_operations = TestOperations::builder().with_operations(ops_count).build(); + + let operations = get_operations(test_operations.temp_dir(), "").unwrap(); + dbg!(&operations); + + assert_eq!(operations.operations.len(), ops_count); + } +} diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index c995c7d1..fea01538 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -1,10 +1,11 @@ -use crate::error::SmartRestDeserializerError; +use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError}; use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest}; use csv::ReaderBuilder; use download::DownloadInfo; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use std::convert::{TryFrom, TryInto}; +use std::path::PathBuf; use time::{format_description, OffsetDateTime}; #[derive(Debug)] @@ -287,12 +288,50 @@ impl SmartRestJwtResponse { } } +/// Returns a date time object from a file path or file-path-like string +/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" +/// +/// # Examples: +/// ``` +/// use std::path::PathBuf; +/// use crate::c8y_smartrest::smartrest_deserializer::get_datetime_from_file_path; +/// +/// let mut path = PathBuf::new(); +/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z"); +/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap(); +/// ``` +pub fn get_datetime_from_file_path( + log_path: &PathBuf, +) -> Result<OffsetDateTime, SMCumulocityMapperError> { + if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) { + // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z. + // to extract the date, rsplit string on "-" and take (last) 3 + let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>(); + // reverse back the order (because of rsplit) + stem_string_vec.reverse(); + // join on '-' to get the date string + let date_string = stem_string_vec.join("-"); + let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?; + + return Ok(dt); + } + match log_path.to_str() { + Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName( + path.to_string(), + ))?, + None => Err(SMCumulocityMapperError::InvalidUtf8Path)?, + } +} + #[cfg(test)] mod tests { use super::*; use agent_interface::*; use assert_json_diff::*; use serde_json::json; + use std::fs::File; + use std::io::Write; + use std::str::FromStr; use test_case::test_case; // To avoid using an ID randomly generated, which is not convenient for testing. @@ -568,4 +607,42 @@ mod tests { let log = SmartRestRestartRequest::from_smartrest(&smartrest); assert!(log.is_ok()); } + + #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] + #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")] + fn test_datetime_parsing_from_path(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an Ok Result. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_ok()); + } + + #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")] + #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")] + fn test_datetime_parsing_from_path_fail(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an err. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_err()); + } + + fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] { + let mut files: Vec<&str> = vec![]; + for line in log_content.lines() { + if line.contains("filename: ") { + let filename: &str = line.split("filename: ").last().unwrap(); + files.push(filename); + } + } + match files.try_into() { + Ok(arr) => arr, + Err(_) => panic!("Could not convert to Array &str, size 5"), + } + } } diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/c8y_smartrest/src/topic.rs index 4fe0069e..491c7a4b 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs +++ b/crates/core/c8y_smartrest/src/topic.rs @@ -1,6 +1,13 @@ -use agent_interface::{error::*, topic::ResponseTopic}; -use mqtt_channel::{MqttError, Topic}; -use std::convert::{TryFrom, TryInto}; +use agent_interface::topic::ResponseTopic; +use agent_interface::TopicError; +use mqtt_channel::Topic; +use mqtt_channel::{Message, MqttError}; + +use crate::error::SMCumulocityMapperError; +use crate::smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToSuccessful, +}; #[derive(Debug, Clone, PartialEq)] pub enum C8yTopic { @@ -93,6 +100,32 @@ impl TryFrom<Topic> for MapperSubscribeTopic { } } +/// returns a c8y message specifying to set log status to executing. +/// +/// example message: '501,c8y_LogfileRequest' +pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest()?; + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y message specifying to set log status to successful. +/// +/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' +pub async fn get_log_file_request_done_message( + binary_upload_event_url: &str, +) -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(binary_upload_event_url) |