diff options
Diffstat (limited to 'crates/core')
-rw-r--r-- | crates/core/c8y_api/Cargo.toml | 38 | ||||
-rw-r--r-- | crates/core/c8y_api/src/error.rs | 1 | ||||
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs) | 52 | ||||
-rw-r--r-- | crates/core/c8y_api/src/json_c8y.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs) | 0 | ||||
-rw-r--r-- | crates/core/c8y_api/src/lib.rs | 12 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/Cargo.toml | 6 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/error.rs | 70 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/lib.rs | 2 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/operations.rs | 215 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_deserializer.rs | 79 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/topic.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs) | 39 | ||||
-rw-r--r-- | crates/core/tedge_mapper/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 299 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 10 |
15 files changed, 521 insertions, 307 deletions
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml new file mode 100644 index 00000000..1e3cb0f5 --- /dev/null +++ b/crates/core/c8y_api/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "c8y_api" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +agent_interface = { path = "../agent_interface"} +async-trait = "0.1" +batcher = { path = "../../common/batcher" } +c8y_smartrest = { path = "../c8y_smartrest" } +c8y_translator = { path = "../c8y_translator" } +chrono = "0.4" +clock = { path = "../../common/clock" } +csv = "1.1" +download = { path = "../../common/download" } +flockfile = { path = "../../common/flockfile" } +futures = "0.3" +mockall = "0.10" +mqtt_channel = { path = "../../common/mqtt_channel" } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +structopt = "0.3" +tedge_config = { path = "../../common/tedge_config" } +tedge_users = { path = "../../common/tedge_users" } +tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } +thin_edge_json = { path = "../thin_edge_json" } +thiserror = "1.0" +time = { version = "0.3", features = ["formatting"] } +tokio = { version = "1.8", features = ["rt", "sync", "time"] } +toml = "0.5" +tracing = { version = "0.1", features = ["attributes", "log"] } + +[dev-dependencies] +tempfile = "3.3" +test-case = "1.2" diff --git a/crates/core/c8y_api/src/error.rs b/crates/core/c8y_api/src/error.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/crates/core/c8y_api/src/error.rs @@ -0,0 +1 @@ + diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 4013bc0a..2a56946a 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -1,22 +1,51 @@ -use crate::sm_c8y_mapper::error::SMCumulocityMapperError; -use crate::sm_c8y_mapper::json_c8y::{ +use crate::json_c8y::{ C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, }; -use crate::sm_c8y_mapper::mapper::SmartRestLogEvent; use async_trait::async_trait; -use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; +use c8y_smartrest::{ + error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse, topic::C8yTopic, +}; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; use std::time::Duration; use tedge_config::{ - C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, - MqttPortSetting, TEdgeConfig, + get_tedge_config, C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, + DeviceIdSetting, MqttPortSetting, TEdgeConfig, }; use time::{format_description, OffsetDateTime}; + +use serde::{Deserialize, Serialize}; use tracing::{error, info, instrument}; const RETRY_TIMEOUT_SECS: u64 = 60; +/// creates an mqtt client with a given `session_name` +pub async fn create_mqtt_client( + session_name: &str, +) -> Result<mqtt_channel::Connection, SMCumulocityMapperError> { + let tedge_config = get_tedge_config()?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(mqtt_port) + .with_session_name(session_name) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestResponse.as_str(), + )); + + let mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + Ok(mqtt_client) +} + +/// creates an http client with a given `session_name` +pub async fn create_http_client( + session_name: &str, +) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> { + let config = get_tedge_config()?; + let mut http_proxy = JwtAuthHttpProxy::try_new(&config, &session_name).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + /// An HttpProxy handles http requests to C8y on behalf of the device. #[async_trait] pub trait C8YHttpProxy { @@ -117,6 +146,13 @@ impl C8yEndPoint { } } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +/// used to retrieve the id of a log event +pub struct SmartRestLogEvent { + pub id: String, +} + /// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device /// /// - Keep the connection info to c8y and the internal Id of the device @@ -147,6 +183,7 @@ impl JwtAuthHttpProxy { pub async fn try_new( tedge_config: &TEdgeConfig, + session_name: &str, ) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> { let c8y_host = tedge_config.query_string(C8yUrlSetting)?; let device_id = tedge_config.query_string(DeviceIdSetting)?; @@ -157,7 +194,7 @@ impl JwtAuthHttpProxy { let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) .with_clean_session(true) - .with_session_name("JWT-Requester") + .with_session_name(session_name) .with_subscriptions(topic); let mut mqtt_con = Connection::new(&mqtt_config).await?; @@ -233,6 +270,7 @@ impl JwtAuthHttpProxy { .build()?; let response = self.http_con.execute(request).await?; + dbg!(&response); let event_response_body = response.json::<SmartRestLogEvent>().await?; Ok(event_response_body.id) diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 8566a84c..8566a84c 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs diff --git a/crates/core/c8y_api/src/lib.rs b/crates/core/c8y_api/src/lib.rs new file mode 100644 index 00000000..2a0286e0 --- /dev/null +++ b/crates/core/c8y_api/src/lib.rs @@ -0,0 +1,12 @@ +pub mod error; +pub mod http_proxy; +pub mod json_c8y; + +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + let result = 2 + 2; + assert_eq!(result, 4); + } +} diff --git a/crates/core/c8y_smartrest/Cargo.toml b/crates/core/c8y_smartrest/Cargo.toml index 03f575bb..a25ef0da 100644 --- a/crates/core/c8y_smartrest/Cargo.toml +++ b/crates/core/c8y_smartrest/Cargo.toml @@ -9,14 +9,20 @@ rust-version = "1.58.1" agent_interface = { path = "../agent_interface" } csv = "1.1" download = { path = "../../common/download" } +mqtt_channel = { path = "../../common/mqtt_channel" } +reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } +tedge_config = { path = "../../common/tedge_config" } thin_edge_json = { path = "../thin_edge_json" } thiserror = "1.0" time = { version = "0.3", features = ["formatting", "macros", "parsing", "serde"] } +tokio = { version = "1.8", features = ["rt", "sync", "time"] } +toml = "0.5" [dev-dependencies] anyhow = "1.0" assert_matches = "1.5" assert-json-diff = "2.0" serde_json = "1.0" +tempfile = "3.3" test-case = "1.2.1" diff --git a/crates/core/c8y_smartrest/src/error.rs b/crates/core/c8y_smartrest/src/error.rs index ee52f984..9531f372 100644 --- a/crates/core/c8y_smartrest/src/error.rs +++ b/crates/core/c8y_smartrest/src/error.rs @@ -1,4 +1,5 @@ use agent_interface::SoftwareUpdateResponse; +use std::path::PathBuf; #[derive(thiserror::Error, Debug)] pub enum SmartRestSerializerError { @@ -39,3 +40,72 @@ pub enum SmartRestDeserializerError { #[error("Empty request")] EmptyRequest, } + +#[derive(Debug, thiserror::Error)] +pub enum OperationsError { + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Cannot extract the operation name from the path: {0}")] + InvalidOperationName(PathBuf), + + #[error("Error while parsing operation file: '{0}': {1}.")] + TomlError(PathBuf, #[source] toml::de::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum SMCumulocityMapperError { + #[error("Invalid MQTT Message.")] + InvalidMqttMessage, + + #[error(transparent)] + InvalidTopicError(#[from] agent_interface::TopicError), + + #[error(transparent)] + InvalidThinEdgeJson(#[from] agent_interface::SoftwareError), + + #[error(transparent)] + FromElapsed(#[from] tokio::time::error::Elapsed), + + #[error(transparent)] + FromMqttClient(#[from] mqtt_channel::MqttError), + + #[error(transparent)] + FromReqwest(#[from] reqwest::Error), + + #[error(transparent)] + FromSmartRestSerializer(#[from] SmartRestSerializerError), + + #[error(transparent)] + FromSmartRestDeserializer(#[from] SmartRestDeserializerError), + + #[error(transparent)] + FromTedgeConfig(#[from] tedge_config::ConfigSettingError), + + #[error(transparent)] + FromLoadTedgeConfigError(#[from] tedge_config::TEdgeConfigError), + + #[error("Invalid date in file name: {0}")] + InvalidDateInFileName(String), + + #[error("Invalid path. Not UTF-8.")] + InvalidUtf8Path, + + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Request timed out")] + RequestTimeout, + + #[error("Operation execution failed: {0}")] + ExecuteFailed(String), + + #[error("An unknown operation template: {0}")] + UnknownOperation(String), + + #[error(transparent)] + FromTimeFormat(#[from] time::error::Format), + + #[error(transparent)] + FromTimeParse(#[from] time::error::Parse), +} diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs index 53976e8d..b6338b06 100644 --- a/crates/core/c8y_smartrest/src/lib.rs +++ b/crates/core/c8y_smartrest/src/lib.rs @@ -1,5 +1,7 @@ pub mod alarm; pub mod error; pub mod event; +pub mod operations; pub mod smartrest_deserializer; pub mod smartrest_serializer; +pub mod topic; diff --git a/crates/core/c8y_smartrest/src/operations.rs b/crates/core/c8y_smartrest/src/operations.rs new file mode 100644 index 00000000..b3909aac --- /dev/null +++ b/crates/core/c8y_smartrest/src/operations.rs @@ -0,0 +1,215 @@ +use std::{ + collections::{HashMap, HashSet}, + fs, + path::{Path, PathBuf}, +}; + +use serde::Deserialize; + +use crate::error::OperationsError; + +/// Operations are derived by reading files subdirectories per cloud /etc/tedge/operations directory +/// Each operation is a file name in one of the subdirectories +/// The file name is the operation name + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub struct OnMessageExec { + command: Option<String>, + on_message: Option<String>, + topic: Option<String>, + user: Option<String>, +} + +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub struct Operation { + #[serde(skip)] + name: String, + exec: Option<OnMessageExec>, +} + +impl Operation { + pub fn exec(&self) -> Option<&OnMessageExec> { + self.exec.as_ref() + } + + pub fn command(&self) -> Option<String> { + self.exec().and_then(|exec| exec.command.clone()) + } + + pub fn topic(&self) -> Option<String> { + self.exec().and_then(|exec| exec.topic.clone()) + } +} + +#[derive(Debug, Clone)] +pub struct Operations { + operations: Vec<Operation>, + operations_by_trigger: HashMap<String, usize>, +} + +impl Operations { + pub fn new() -> Self { + Self { + operations: vec![], + operations_by_trigger: HashMap::new(), + } + } + + pub fn add(&mut self, operation: Operation) { + if let Some(detail) = operation.exec() { + if let Some(on_message) = &detail.on_message { + self.operations_by_trigger + .insert(on_message.clone(), self.operations.len()); + } + } + self.operations.push(operation); + } + + pub fn try_new(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Self, OperationsError> { + get_operations(dir.as_ref(), cloud_name) + } + + pub fn get_operations_list(&self) -> Vec<String> { + self.operations + .iter() + .map(|operation| operation.name.clone()) + .collect::<Vec<String>>() + } + + pub fn matching_smartrest_template(&self, operation_template: &str) -> Option<&Operation> { + self.operations_by_trigger + .get(operation_template) + .and_then(|index| self.operations.get(*index)) + } + + pub fn topics_for_operations(&self) -> HashSet<String> { + self.operations + .iter() + .filter_map(|operation| operation.topic()) + .collect::<HashSet<String>>() + } +} + +fn get_operations(dir: impl AsRef<Path>, cloud_name: &str) -> Result<Operations, OperationsError> { + let mut operations = Operations::new(); + + let path = dir.as_ref().join(&cloud_name); + let dir_entries = fs::read_dir(&path)? + .map(|entry| entry.map(|e| e.path())) + .collect::<Result<Vec<PathBuf>, _>>()? + .into_iter() + .filter(|path| path.is_file()) + .collect::<Vec<PathBuf>>(); + + for path in dir_entries { + let mut details = match fs::read(&path) { + Ok(bytes) => toml::from_slice::<Operation>(bytes.as_slice()) + .map_err(|e| OperationsError::TomlError(path.to_path_buf(), e))?, + + Err(err) => return Err(OperationsError::FromIo(err)), + }; + + details.name = path + .file_name() + .and_then(|filename| filename.to_str()) + .ok_or_else(|| OperationsError::InvalidOperationName(path.to_owned()))? + .to_owned(); + + operations.add(details); + } + Ok(operations) +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use super::*; + use test_case::test_case; + + // Structs for state change with the builder pattern + // Structs for Operations + struct Ops(Vec<PathBuf>); + struct NoOps; + + struct TestOperationsBuilder<O> { + temp_dir: tempfile::TempDir, + operations: O, + } + + impl TestOperationsBuilder<NoOps> { + fn new() -> Self { + Self { + temp_dir: tempfile::tempdir().unwrap(), + operations: NoOps, + } + } + } + + impl TestOperationsBuilder<NoOps> { + fn with_operations(self, operations_count: usize) -> TestOperationsBuilder<Ops> { + let Self { temp_dir, .. } = self; + + let mut operations = Vec::new(); + for i in 0..operations_count { + let file_path = temp_dir.path().join(format!("operation{}", i)); + let mut file = fs::File::create(&file_path).unwrap(); + file.write_all( + br#"[exec] + command = "echo" + on_message = "511""#, + ) + .unwrap(); + operations.push(file_path); + } + + TestOperationsBuilder { + operations: Ops(operations), + temp_dir, + } + } + } + + impl TestOperationsBuilder<Ops> { + fn build(self) -> TestOperations { + let Self { + temp_dir, + operations, + } = self; + + TestOperations { + temp_dir, + operations: operations.0, + } + } + } + + struct TestOperations { + temp_dir: tempfile::TempDir, + operations: Vec<PathBuf>, + } + + impl TestOperations { + fn builder() -> TestOperationsBuilder<NoOps> { + TestOperationsBuilder::new() + } + + fn temp_dir(&self) -> &tempfile::TempDir { + &self.temp_dir + } + } + + #[test_case(0)] + #[test_case(1)] + #[test_case(5)] + fn get_operations_all(ops_count: usize) { + let test_operations = TestOperations::builder().with_operations(ops_count).build(); + + let operations = get_operations(test_operations.temp_dir(), "").unwrap(); + dbg!(&operations); + + assert_eq!(operations.operations.len(), ops_count); + } +} diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index c995c7d1..fea01538 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -1,10 +1,11 @@ -use crate::error::SmartRestDeserializerError; +use crate::error::{SMCumulocityMapperError, SmartRestDeserializerError}; use agent_interface::{SoftwareModule, SoftwareModuleUpdate, SoftwareUpdateRequest}; use csv::ReaderBuilder; use download::DownloadInfo; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use std::convert::{TryFrom, TryInto}; +use std::path::PathBuf; use time::{format_description, OffsetDateTime}; #[derive(Debug)] @@ -287,12 +288,50 @@ impl SmartRestJwtResponse { } } +/// Returns a date time object from a file path or file-path-like string +/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" +/// +/// # Examples: +/// ``` +/// use std::path::PathBuf; +/// use crate::c8y_smartrest::smartrest_deserializer::get_datetime_from_file_path; +/// +/// let mut path = PathBuf::new(); +/// path.push("/path/to/file/with/date/in/path-2021-10-27T10:29:58Z"); +/// let path_bufdate_time = get_datetime_from_file_path(&path).unwrap(); +/// ``` +pub fn get_datetime_from_file_path( + log_path: &PathBuf, +) -> Result<OffsetDateTime, SMCumulocityMapperError> { + if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) { + // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z. + // to extract the date, rsplit string on "-" and take (last) 3 + let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>(); + // reverse back the order (because of rsplit) + stem_string_vec.reverse(); + // join on '-' to get the date string + let date_string = stem_string_vec.join("-"); + let dt = OffsetDateTime::parse(&date_string, &format_description::well_known::Rfc3339)?; + + return Ok(dt); + } + match log_path.to_str() { + Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName( + path.to_string(), + ))?, + None => Err(SMCumulocityMapperError::InvalidUtf8Path)?, + } +} + #[cfg(test)] mod tests { use super::*; use agent_interface::*; use assert_json_diff::*; use serde_json::json; + use std::fs::File; + use std::io::Write; + use std::str::FromStr; use test_case::test_case; // To avoid using an ID randomly generated, which is not convenient for testing. @@ -568,4 +607,42 @@ mod tests { let log = SmartRestRestartRequest::from_smartrest(&smartrest); assert!(log.is_ok()); } + + #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] + #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")] + fn test_datetime_parsing_from_path(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an Ok Result. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_ok()); + } + + #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")] + #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")] + #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")] + #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")] + fn test_datetime_parsing_from_path_fail(file_path: &str) { + // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object. + // this should return an err. + let path_buf = PathBuf::from_str(file_path).unwrap(); + let path_buf_datetime = get_datetime_from_file_path(&path_buf); + assert!(path_buf_datetime.is_err()); + } + + fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] { + let mut files: Vec<&str> = vec![]; + for line in log_content.lines() { + if line.contains("filename: ") { + let filename: &str = line.split("filename: ").last().unwrap(); + files.push(filename); + } + } + match files.try_into() { + Ok(arr) => arr, + Err(_) => panic!("Could not convert to Array &str, size 5"), + } + } } diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs b/crates/core/c8y_smartrest/src/topic.rs index 4fe0069e..491c7a4b 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs +++ b/crates/core/c8y_smartrest/src/topic.rs @@ -1,6 +1,13 @@ -use agent_interface::{error::*, topic::ResponseTopic}; -use mqtt_channel::{MqttError, Topic}; -use std::convert::{TryFrom, TryInto}; +use agent_interface::topic::ResponseTopic; +use agent_interface::TopicError; +use mqtt_channel::Topic; +use mqtt_channel::{Message, MqttError}; + +use crate::error::SMCumulocityMapperError; +use crate::smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToSuccessful, +}; #[derive(Debug, Clone, PartialEq)] pub enum C8yTopic { @@ -93,6 +100,32 @@ impl TryFrom<Topic> for MapperSubscribeTopic { } } +/// returns a c8y message specifying to set log status to executing. +/// +/// example message: '501,c8y_LogfileRequest' +pub async fn get_log_file_request_executing() -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest()?; + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y message specifying to set log status to successful. +/// +/// example message: '503,c8y_LogfileRequest,https://{c8y.url}/etc...' +pub async fn get_log_file_request_done_message( + binary_upload_event_url: &str, +) -> Result<Message, SMCumulocityMapperError> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yLogFileRequest) + .with_response_parameter(binary_upload_event_url) + .to_smartrest()?; + + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index baf8403f..5f518ce4 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -30,6 +30,7 @@ agent_interface = { path = "../agent_interface"} anyhow = "1.0" async-trait = "0.1" batcher = { path = "../../common/batcher" } +c8y_api = { path = "../c8y_api" } c8y_smartrest = { path = "../c8y_smartrest" } c8y_translator = { path = "../c8y_translator" } clock = { path = "../../common/clock" } diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index 7a76df52..703e3cad 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -1,42 +1,34 @@ -use crate::{ - component::TEdgeComponent, - operations::Operations, - sm_c8y_mapper::{ - error::*, - http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}, - json_c8y::C8yUpdateSoftwareListResponse, - topic::*, - }, -}; +use crate::component::TEdgeComponent; use agent_interface::{ topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, }; use async_trait::async_trait; -use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest}; +use c8y_api::{ + http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}, + json_c8y::C8yUpdateSoftwareListResponse, +}; +use c8y_smartrest::smartrest_deserializer::SmartRestRestartRequest; use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_smartrest::{ - error::SmartRestDeserializerError, + error::{SMCumulocityMapperError, SmartRestDeserializerError}, + operations::Operations, smartrest_deserializer::SmartRestUpdateSoftware, smartrest_serializer::{ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType, }, + topic::*, }; use download::{Auth, DownloadInfo}; use mqtt_channel::{Config, Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter}; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; use std::{convert::TryInto, process::Stdio}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; -use time::{format_description, OffsetDateTime}; use tracing::{debug, error, info, instrument}; -const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; const SM_MAPPER: &str = "SM-C8Y-Mapper"; - pub struct CumulocitySoftwareManagementMapper {} impl CumulocitySoftwareManagementMapper { @@ -87,7 +79,8 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")] async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; + let session_name = "SM-Tedge-Mapper"; + let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config, &session_name).await?; let mut sm_mapper = CumulocitySoftwareManagement::try_new(&tedge_config, http_proxy, operations).await?; @@ -173,9 +166,6 @@ where "528" => { let () = self.forward_software_request(payload).await?; } - "522" => { - let () = self.forward_log_request(payload).await?; - } "510" => { let () = self.forward_restart_request(payload).await?; } @@ -342,58 +332,6 @@ where } Ok(()) } - - async fn set_log_file_request_executing(&mut self) -> Result<(), SMCumulocityMapperError> { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = - SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) - .to_smartrest()?; - - let () = self.publish(&topic, smartrest_set_operation_status).await?; - Ok(()) - } - - async fn set_log_file_request_done( - &mut self, - binary_upload_event_url: &str, - ) -> Result<(), SMCumulocityMapperError> { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let smartrest_set_operation_status = SmartRestSetOperationToSuccessful::new( - CumulocitySupportedOperations::C8yLogFileRequest, - ) - .with_response_parameter(binary_upload_event_url) - .to_smartrest()?; - - let () = self.publish(&topic, smartrest_set_operation_status).await?; - Ok(()) - } - - async fn forward_log_request(&mut self, payload: &str) -> Result<(), SMCumulocityMapperError> { - // retrieve smartrest object from payload - let smartrest_obj = SmartRestLogRequest::from_smartrest(&payl |