diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper')
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs | 52 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs | 278 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 867 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 442 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs | 154 |
6 files changed, 1799 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs new file mode 100644 index 00000000..5cca31b4 --- /dev/null +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs @@ -0,0 +1,52 @@ +use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError}; + +#[derive(thiserror::Error, Debug)] +pub(crate) enum MapperTopicError { + #[error("Topic {topic} is unknown.")] + UnknownTopic { topic: String }, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum SMCumulocityMapperError { + #[error("Invalid MQTT Message.")] + InvalidMqttMessage, + + #[error(transparent)] + InvalidTopicError(#[from] MapperTopicError), + + #[error(transparent)] + InvalidThinEdgeJson(#[from] json_sm::SoftwareError), + + #[error(transparent)] + FromElapsed(#[from] tokio::time::error::Elapsed), + + #[error(transparent)] + FromMqttClient(#[from] mqtt_client::MqttClientError), + + #[error(transparent)] + FromReqwest(#[from] reqwest::Error), + + #[error(transparent)] + FromSmartRestSerializer(#[from] SmartRestSerializerError), + + #[error(transparent)] + FromSmartRestDeserializer(#[from] SmartRestDeserializerError), + + #[error(transparent)] + FromTedgeConfig(#[from] tedge_config::ConfigSettingError), + + #[error("Invalid date in file name: {0}")] + InvalidDateInFileName(String), + + #[error("Invalid path. Not UTF-8.")] + InvalidUtf8Path, + + #[error(transparent)] + FromChronoParse(#[from] chrono::ParseError), + + #[error(transparent)] + FromIo(#[from] std::io::Error), + + #[error("Request timed out")] + RequestTimeout, +} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs new file mode 100644 index 00000000..f440ec7e --- /dev/null +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs @@ -0,0 +1,278 @@ +use json_sm::{ + DownloadInfo, Jsonify, SoftwareListResponse, SoftwareModule, SoftwareType, SoftwareVersion, +}; +use serde::{Deserialize, Serialize}; + +const EMPTY_STRING: &str = ""; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct C8yCreateEvent { + source: C8yManagedObject, + #[serde(rename = "type")] + event_type: String, + time: String, + text: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct C8yManagedObject { + pub id: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct InternalIdResponse { + managed_object: C8yManagedObject, + external_id: String, +} + +impl InternalIdResponse { + pub fn id(&self) -> String { + self.managed_object.id.clone() + } +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct C8ySoftwareModuleItem { + pub name: String, + pub version: Option<String>, + pub url: Option<DownloadInfo>, +} + +impl<'a> Jsonify<'a> for C8ySoftwareModuleItem {} + +impl From<SoftwareModule> for C8ySoftwareModuleItem { + fn from(module: SoftwareModule) -> Self { + let url = if module.url.is_none() { + Some(EMPTY_STRING.into()) + } else { + module.url + }; + + Self { + name: module.name, + version: Option::from(combine_version_and_type( + &module.version, + &module.module_type, + )), + url, + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct C8yUpdateSoftwareListResponse { + #[serde(rename = "c8y_SoftwareList")] + c8y_software_list: Option<Vec<C8ySoftwareModuleItem>>, +} + +impl<'a> Jsonify<'a> for C8yUpdateSoftwareListResponse {} + +impl From<&SoftwareListResponse> for C8yUpdateSoftwareListResponse { + fn from(list: &SoftwareListResponse) -> Self { + let mut new_list: Vec<C8ySoftwareModuleItem> = Vec::new(); + list.modules().into_iter().for_each(|software_module| { + let c8y_software_module: C8ySoftwareModuleItem = software_module.into(); + new_list.push(c8y_software_module); + }); + + Self { + c8y_software_list: Some(new_list), + } + } +} + +impl C8yCreateEvent { + pub fn new(source: C8yManagedObject, event_type: &str, time: &str, text: &str) -> Self { + Self { + source, + event_type: event_type.into(), + time: time.into(), + text: text.into(), + } + } +} + +impl<'a> Jsonify<'a> for C8yCreateEvent {} + +fn combine_version_and_type( + version: &Option<SoftwareVersion>, + module_type: &Option<SoftwareType>, +) -> String { + match module_type { + Some(m) => { + if m.is_empty() { + match version { + Some(v) => v.into(), + None => EMPTY_STRING.into(), + } + } else { + match version { + Some(v) => format!("{}::{}", v, m), + None => format!("::{}", m), + } + } + } + None => match version { + Some(v) => { + if v.contains("::") { + format!("{}::", v) + } else { + v.into() + } + } + None => EMPTY_STRING.into(), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_software_module_to_c8y_software_module_item() { + let software_module = SoftwareModule { + module_type: Some("a".into()), + name: "b".into(), + version: Some("c".into()), + url: Some("".into()), + file_path: None, + }; + + let expected_c8y_item = C8ySoftwareModuleItem { + name: "b".into(), + version: Some("c::a".into()), + url: Some("".into()), + }; + + let converted: C8ySoftwareModuleItem = software_module.into(); + + assert_eq!(converted, expected_c8y_item); + } + + #[test] + fn from_thin_edge_json_to_c8y_set_software_list() { + let input_json = r#"{ + "id":"1", + "status":"successful", + "currentSoftwareList":[ + {"type":"debian", "modules":[ + {"name":"a"}, + {"name":"b","version":"1.0"}, + {"name":"c","url":"https://foobar.io/c.deb"}, + {"name":"d","version":"beta","url":"https://foobar.io/d.deb"} + ]}, + {"type":"apama","modules":[ + {"name":"m","url":"https://foobar.io/m.epl"} + ]} + ]}"#; + + let json_obj = &SoftwareListResponse::from_json(input_json).unwrap(); + + let c8y_software_list: C8yUpdateSoftwareListResponse = json_obj.into(); + + let expected_struct = C8yUpdateSoftwareListResponse { + c8y_software_list: Some(vec![ + C8ySoftwareModuleItem { + name: "a".into(), + version: Some("::debian".into()), + url: Some("".into()), + }, + C8ySoftwareModuleItem { + name: "b".into(), + version: Some("1.0::debian".into()), + url: Some("".into()), + }, + C8ySoftwareModuleItem { + name: "c".into(), + version: Some("::debian".into()), + url: Some("https://foobar.io/c.deb".into()), + }, + C8ySoftwareModuleItem { + name: "d".into(), + version: Some("beta::debian".into()), + url: Some("https://foobar.io/d.deb".into()), + }, + C8ySoftwareModuleItem { + name: "m".into(), + version: Some("::apama".into()), + url: Some("https://foobar.io/m.epl".into()), + }, + ]), + }; + + let expected_json = r#"{"c8y_SoftwareList":[{"name":"a","version":"::debian","url":{"url":""}},{"name":"b","version":"1.0::debian","url":{"url":""}},{"name":"c","version":"::debian","url":{"url":"https://foobar.io/c.deb"}},{"name":"d","version":"beta::debian","url":{"url":"https://foobar.io/d.deb"}},{"name":"m","version":"::apama","url":{"url":"https://foobar.io/m.epl"}}]}"#; + + assert_eq!(c8y_software_list, expected_struct); + assert_eq!(c8y_software_list.to_json().unwrap(), expected_json); + } + + #[test] + fn empty_to_c8y_set_software_list() { + let input_json = r#"{ + "id":"1", + "status":"successful", + "currentSoftwareList":[] + }"#; + + let json_obj = &SoftwareListResponse::from_json(input_json).unwrap(); + let c8y_software_list: C8yUpdateSoftwareListResponse = json_obj.into(); + + let expected_struct = C8yUpdateSoftwareListResponse { + c8y_software_list: Some(vec![]), + }; + let expected_json = r#"{"c8y_SoftwareList":[]}"#; + + assert_eq!(c8y_software_list, expected_struct); + assert_eq!(c8y_software_list.to_json().unwrap(), expected_json); + } + + #[test] + fn get_id_from_c8y_response() { + let managed_object = C8yManagedObject { id: "12345".into() }; + let response = InternalIdResponse { + managed_object, + external_id: "test".into(), + }; + + assert_eq!(response.id(), "12345".to_string()); + } + + #[test] + fn verify_combine_version_and_type() { + let some_version: Option<SoftwareVersion> = Some("1.0".to_string()); + let some_version_with_colon: Option<SoftwareVersion> = Some("1.0.0::1".to_string()); + let none_version: Option<SoftwareVersion> = None; + let some_module_type: Option<SoftwareType> = Some("debian".to_string()); + let none_module_type: Option<SoftwareType> = None; + + assert_eq!( + combine_version_and_type(&some_version, &some_module_type), + "1.0::debian" + ); + assert_eq!( + combine_version_and_type(&some_version, &none_module_type), + "1.0" + ); + assert_eq!( + combine_version_and_type(&some_version_with_colon, &some_module_type), + "1.0.0::1::debian" + ); + assert_eq!( + combine_version_and_type(&some_version_with_colon, &none_module_type), + "1.0.0::1::" + ); + assert_eq!( + combine_version_and_type(&none_version, &some_module_type), + "::debian" + ); + assert_eq!( + combine_version_and_type(&none_version, &none_module_type), + EMPTY_STRING + ); + } +} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs new file mode 100644 index 00000000..beadef5d --- /dev/null +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -0,0 +1,867 @@ +use crate::mapper::mqtt_config; +use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject}; +use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*}; +use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse}; +use async_trait::async_trait; +use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest; +use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations; +use c8y_smartrest::{ + error::SmartRestDeserializerError, + smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware}, + smartrest_serializer::{ + SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, + SmartRestSetSupportedLogType, SmartRestSetSupportedOperations, + }, +}; +use chrono::{DateTime, FixedOffset, Local}; +use json_sm::{ + Auth, DownloadInfo, Jsonify, SoftwareListRequest, SoftwareListResponse, + SoftwareOperationStatus, SoftwareUpdateResponse, +}; +use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::{convert::TryInto, time::Duration}; +use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig}; +use tokio::time::Instant; +use tracing::{debug, error, info, instrument}; + +const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; + +const RETRY_TIMEOUT_SECS: u64 = 60; + +pub struct CumulocitySoftwareManagementMapper {} + +impl CumulocitySoftwareManagementMapper { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl TEdgeComponent for CumulocitySoftwareManagementMapper { + #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")] + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + let mqtt_config = mqtt_config(&tedge_config)?; + let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?; + + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config); + + let messages = sm_mapper.subscribe().await?; + let () = sm_mapper.init().await?; + let () = sm_mapper.run(messages).await?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct CumulocitySoftwareManagement { + pub client: Client, + config: TEdgeConfig, + c8y_internal_id: String, +} + +impl CumulocitySoftwareManagement { + pub fn new(client: Client, config: TEdgeConfig) -> Self { + Self { + client, + config, + c8y_internal_id: "".into(), + } + } + + pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> { + let mut topic_filter = TopicFilter::new(IncomingTopic::SoftwareListResponse.as_str())?; + topic_filter.add(IncomingTopic::SoftwareUpdateResponse.as_str())?; + topic_filter.add(IncomingTopic::SmartRestRequest.as_str())?; + let messages = self.client.subscribe(topic_filter).await?; + + Ok(messages) + } + + #[instrument(skip(self), name = "init")] + async fn init(&mut self) -> Result<(), anyhow::Error> { + info!("Initialisation"); + while self.c8y_internal_id.is_empty() { + if let Err(error) = self.try_get_and_set_internal_id().await { + error!( + "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}", + RETRY_TIMEOUT_SECS, error + ); + + tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS)) + .await; + continue; + }; + } + info!("Initialisation done."); + + Ok(()) + } + + pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> { + info!("Running"); + let () = self.publish_supported_operations().await?; + let () = self.publish_supported_log_types().await?; + let () = self.publish_get_pending_operations().await?; + let () = self.ask_software_list().await?; + + while let Err(err) = self.subscribe_messages_runtime(&mut messages).await { + if let SMCumulocityMapperError::FromSmartRestDeserializer( + SmartRestDeserializerError::InvalidParameter { operation, .. }, + ) = &err + { + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + // publish the operation status as `executing` + let () = self.publish(&topic, format!("501,{}", operation)).await?; + // publish the operation status as `failed` + let () = self + .publish( + &topic, + format!("502,{},\"{}\"", operation, &err.to_string()), + ) + .await?; + } + error!("{}", err); + } + + Ok(()) + } + + async fn process_smartrest(&self, payload: &str) -> Result<(), SMCumulocityMapperError> { + let message_id: &str = &payload[..3]; + match message_id { + "528" => { + let () = self.forward_software_request(payload).await?; + } + "522" => { + let () = self.forward_log_request(payload).await?; + } + _ => { + return Err(SMCumulocityMapperError::InvalidMqttMessage); + } + } + + Ok(()) + } + + #[instrument(skip(self, messages), name = "main-loop")] + async fn subscribe_messages_runtime( + &self, + messages: &mut Box<dyn MqttMessageStream>, + ) -> Result<(), SMCumulocityMapperError> { + while let Some(message) = messages.next().await { + debug!("Topic {:?}", message.topic.name); + debug!("Mapping {:?}", message.payload_str()); + + let incoming_topic = message.topic.clone().try_into()?; + match incoming_topic { + IncomingTopic::SoftwareListResponse => { + debug!("Software list"); + let () = self + .validate_and_publish_software_list(message.payload_str()?) + .await?; + } + IncomingTopic::SoftwareUpdateResponse => { + debug!("Software update"); + let () = self + .publish_operation_status(message.payload_str()?) + .await?; + } + IncomingTopic::SmartRestRequest => { + debug!("Cumulocity"); + let () = self.process_smartrest(message.payload_str()?).await?; + } + } + } + Ok(()) + } + + #[instrument(skip(self), name = "software-list")] + async fn ask_software_list(&self) -> Result<(), SMCumulocityMapperError> { + let request = SoftwareListRequest::new(); + let topic = OutgoingTopic::SoftwareListRequest.to_topic()?; + let json_list_request = request.to_json()?; + let () = self.publish(&topic, json_list_request).await?; + + Ok(()) + } + + #[instrument(skip(self), name = "software-update")] + async fn validate_and_publish_software_list( + &self, + json_response: &str, + ) -> Result<(), SMCumulocityMapperError> { + let response = SoftwareListResponse::from_json(json_response)?; + + match response.status() { + SoftwareOperationStatus::Successful => { + let () = self.send_software_list_http(&response).await?; + } + + SoftwareOperationStatus::Failed => { + error!("Received a failed software response: {}", json_response); + } + + SoftwareOperationStatus::Executing => {} // C8Y doesn't expect any message to be published + } + + Ok(()) + } + + async fn publish_supported_log_types(&self) -> Result<(), SMCumulocityMapperError> { + let payload = SmartRestSetSupportedLogType::default().to_smartrest()?; + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + let () = self.publish(&topic, payload).await?; + Ok(()) + } + + async fn publish_supported_operations(&self) -> Result<(), SMCumulocityMapperError> { + let data = SmartRestSetSupportedOperations::default(); + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + let payload = data.to_smartrest()?; + let () = self.publish(&topic, payload).await?; + Ok(()) + } + + async fn publish_get_pending_operations(&self) -> Result<(), SMCumulocityMapperError> { + let data = SmartRestGetPendingOperations::default(); + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + let payload = data.to_smartrest()?; + let () = self.publish(&topic, payload).await?; + Ok(()) + } + + async fn publish_operation_status( + &self, + json_response: &str, + ) -> Result<(), SMCumulocityMapperError> { + let response = SoftwareUpdateResponse::from_json(json_response)?; + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + match response.status() { + SoftwareOperationStatus::Executing => { + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::from_thin_edge_json(response)? + .to_smartrest()?; + let () = self.publish(&topic, smartrest_set_operation_status).await?; + } + SoftwareOperationStatus::Successful => { + let smartrest_set_operation = + SmartRestSetOperationToSuccessful::from_thin_edge_json(response)? + .to_smartrest()?; + let () = self.publish(&topic, smartrest_set_operation).await?; + let () = self + .validate_and_publish_software_list(json_response) + .await?; + } + SoftwareOperationStatus::Failed => { + let smartrest_set_operation = + SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?; + let () = self.publish(&topic, smartrest_set_operation).await?; + let () = self + .validate_and_publish_software_list(json_response) + .await?; + } + }; + Ok(()) + } + + async fn set_log_file_request_executing(&self) -> Result<(), SMCumulocityMapperError> { + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest) + .to_smartrest()?; + + let () = self.publish(&topic, smartrest_set_operation_status).await?; + Ok(()) + } + + async fn set_log_file_request_done( + &self, + binary_upload_event_url: &str, + ) -> Result<(), SMCumulocityMapperError> { + let topic = OutgoingTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = SmartRestSetOperationToSuccessful::new( + CumulocitySupportedOperations::C8yLogFileRequest, + ) + .with_response_parameter(binary_upload_event_url) + .to_smartrest()?; + + let () = self.publish(&topic, smartrest_set_operation_status).await?; + Ok(()) + } + + async fn forward_log_request(&self, payload: &str) -> Result<(), SMCumulocityMapperError> { + // retrieve smartrest object from payload + let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?; + + // 1. set log file request to executing + let () = self.set_log_file_request_executing().await?; + + // 2. read logs + let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?; + + // 3. create log event + let token = get_jwt_token(&self.client).await?; + let url_host = self.config.query_string(C8yUrlSetting)?; + + let c8y_managed_object = C8yManagedObject { + id: self.c8y_internal_id.clone(), + }; + let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?; + + // 4. upload log file + let binary_upload_event_url = + get_url_for_event_binary_upload(&url_host, &event_response_id); + + let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?; + + // 5. set log file request to done + let () = self + .set_log_file_request_done(&binary_upload_event_url) + .await?; + + info!("Log file request uploaded"); + + Ok(()) + } + + async fn forward_software_request( + &self, + smartrest: &str, + ) -> Result<(), SMCumulocityMapperError> { + let topic = OutgoingTopic::SoftwareUpdateRequest.to_topic()?; + let update_software = SmartRestUpdateSoftware::new(); + let mut software_update_request = update_software + .from_smartrest(smartrest)? + .to_thin_edge_json()?; + + let token = get_jwt_token(&self.client).await?; + let tenant_uri = self.config.query_string(C8yUrlSetting)?; + + software_update_request + .update_list + .iter_mut() + .for_each(|modules| { + modules.modules.iter_mut().for_each(|module| { + if let Some(url) = &module.url { + if url_is_in_my_tenant_domain(url.url(), &tenant_uri) { + module.url = module.url.as_ref().map(|s| { + DownloadInfo::new(&s.url) + .with_auth(Auth::new_bearer(&token.token())) + }); + } else { + module.url = module.url.as_ref().map(|s| DownloadInfo::new(&s.url)); + } + } + }); + }); + + let () = self + .publish(&topic, software_update_request.to_json()?) + .await?; + + Ok(()) + } + + async fn publish(&self, topic: &Topic, payload: String) -> Result<(), MqttClientError> { + let () = self + .client + .publish(mqtt_client::Message::new(topic, payload)) + .await?; + Ok(()) + } + + async fn send_software_list_http( + &self, + json_response: &SoftwareListResponse, + ) -> Result<(), SMCumulocityMapperError> { + let token = get_jwt_token(&self.client).await?; + + let reqwest_client = reqwest::ClientBuilder::new().build()?; + + let url_host = self.config.query_string(C8yUrlSetting)?; + let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id); + + let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into(); + + let _published = + publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list) + .await?; + + Ok(()) + } + + async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { + let token = get_jwt_token(&self.client).await?; + let reqwest_client = reqwest::ClientBuilder::new().build()?; + + let url_host = self.config.query_string(C8yUrlSetting)?; + let device_id = self.config.query_string(DeviceIdSetting)?; + let url_get_id = get_url_for_get_id(&url_host, &device_id); + + self.c8y_internal_id = + try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?; + + Ok(()) + } +} + +async fn upload_log_binary( + token: &SmartRestJwtResponse, + binary_upload_event_url: &str, + log_content: &str, +) -> Result<(), SMCumulocityMapperError> { + let client = reqwest::ClientBuilder::new().build()?; + + let request = client + .post(binary_upload_event_url) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(log_content.to_string()) + .bearer_auth(token.token()) + .timeout(Duration::from_millis(10000)) + .build()?; + + let _response = client.execute(request).await?; + Ok(()) +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +/// used to retrieve the id of a log event +pub struct SmartRestLogEvent { + pub id: String, +} + +/// Make a POST request to /event/events and return the event id from response body. +/// The event id is used to upload the binary. +async fn create_log_event( + url_host: &str, + c8y_managed_object: &C8yManagedObject, + token: &SmartRestJwtResponse, +) -> Result<String, SMCumulocityMapperError> { + let client = reqwest::ClientBuilder::new().build()?; + + let create_event_url = get_url_for_create_event(&url_host); + + let local: DateTime<Local> = Local::now(); + + let c8y_log_event = C8yCreateEvent::new( + c8y_managed_object.to_owned(), + "c8y_Logfile", + &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + "software-management", + ); + + let request = client + .post(create_event_url) + .json(&c8y_log_event) + .bearer_auth(token.token()) + .header("Accept", "application/json") + .timeout(Duration::from_millis(10000)) + .build()?; + + let response = client.execute(request).await?; + let event_response_body = response.json::<SmartRestLogEvent>().await?; + + Ok(event_response_body.id) +} + +/// Returns a date time object from a file path or file-path-like string +/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" +/// +/// # Examples: +/// ``` +/// let path_buf = PathBuf::fromStr("/path/to/file/with/date/in/path").unwrap(); +/// let path_bufdate_time = get_datetime_from_file_path(&path_buf).unwrap(); +/// ``` +fn get_datetime_from_file_path( + log_path: &PathBuf, +) -> Result<DateTime<FixedOffset>, SMCumulocityMapperError> { + if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) { + // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z. + // to extract the date, rsplit string on "-" and take (last) 3 + let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>(); + // reverse back the order (because of rsplit) + stem_string_vec.reverse(); + // join on '-' to get the date string + let date_string = stem_string_vec.join("-"); + let dt = DateTime::parse_from_rfc3339(&date_string)?; + + return Ok(dt); + } + match log_path.to_str() { + Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName( + path.to_string(), + )), + None => Err(SMCumulocityMapperError::InvalidUtf8Path), + } +} + +/// Reads tedge logs according to `SmartRestLogRequest`. +/// +/// If needed, logs are concatenated. +/// +/// Logs are sorted alphanumerically from oldest to newest. +/// +/// # Examples +/// +/// ``` +/// let smartrest_obj = SmartRestLogRequest::from_smartrest( +/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000", +/// ) +/// .unwrap(); +/// +/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap(); +/// ``` +fn read_tedge_logs( + smartrest_obj: &SmartRestLogRequest, + logs_dir: &str, +) -> Result<String, SMCumulocityMapperError> { + |