summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs52
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs278
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs867
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs6
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs442
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs154
6 files changed, 1799 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
new file mode 100644
index 00000000..5cca31b4
--- /dev/null
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
@@ -0,0 +1,52 @@
+use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError};
+
+#[derive(thiserror::Error, Debug)]
+pub(crate) enum MapperTopicError {
+ #[error("Topic {topic} is unknown.")]
+ UnknownTopic { topic: String },
+}
+
+#[derive(thiserror::Error, Debug)]
+pub(crate) enum SMCumulocityMapperError {
+ #[error("Invalid MQTT Message.")]
+ InvalidMqttMessage,
+
+ #[error(transparent)]
+ InvalidTopicError(#[from] MapperTopicError),
+
+ #[error(transparent)]
+ InvalidThinEdgeJson(#[from] json_sm::SoftwareError),
+
+ #[error(transparent)]
+ FromElapsed(#[from] tokio::time::error::Elapsed),
+
+ #[error(transparent)]
+ FromMqttClient(#[from] mqtt_client::MqttClientError),
+
+ #[error(transparent)]
+ FromReqwest(#[from] reqwest::Error),
+
+ #[error(transparent)]
+ FromSmartRestSerializer(#[from] SmartRestSerializerError),
+
+ #[error(transparent)]
+ FromSmartRestDeserializer(#[from] SmartRestDeserializerError),
+
+ #[error(transparent)]
+ FromTedgeConfig(#[from] tedge_config::ConfigSettingError),
+
+ #[error("Invalid date in file name: {0}")]
+ InvalidDateInFileName(String),
+
+ #[error("Invalid path. Not UTF-8.")]
+ InvalidUtf8Path,
+
+ #[error(transparent)]
+ FromChronoParse(#[from] chrono::ParseError),
+
+ #[error(transparent)]
+ FromIo(#[from] std::io::Error),
+
+ #[error("Request timed out")]
+ RequestTimeout,
+}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs
new file mode 100644
index 00000000..f440ec7e
--- /dev/null
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs
@@ -0,0 +1,278 @@
+use json_sm::{
+ DownloadInfo, Jsonify, SoftwareListResponse, SoftwareModule, SoftwareType, SoftwareVersion,
+};
+use serde::{Deserialize, Serialize};
+
+const EMPTY_STRING: &str = "";
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct C8yCreateEvent {
+ source: C8yManagedObject,
+ #[serde(rename = "type")]
+ event_type: String,
+ time: String,
+ text: String,
+}
+
+#[derive(Debug, Serialize, Deserialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub struct C8yManagedObject {
+ pub id: String,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct InternalIdResponse {
+ managed_object: C8yManagedObject,
+ external_id: String,
+}
+
+impl InternalIdResponse {
+ pub fn id(&self) -> String {
+ self.managed_object.id.clone()
+ }
+}
+
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub struct C8ySoftwareModuleItem {
+ pub name: String,
+ pub version: Option<String>,
+ pub url: Option<DownloadInfo>,
+}
+
+impl<'a> Jsonify<'a> for C8ySoftwareModuleItem {}
+
+impl From<SoftwareModule> for C8ySoftwareModuleItem {
+ fn from(module: SoftwareModule) -> Self {
+ let url = if module.url.is_none() {
+ Some(EMPTY_STRING.into())
+ } else {
+ module.url
+ };
+
+ Self {
+ name: module.name,
+ version: Option::from(combine_version_and_type(
+ &module.version,
+ &module.module_type,
+ )),
+ url,
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[serde(rename_all = "camelCase")]
+pub struct C8yUpdateSoftwareListResponse {
+ #[serde(rename = "c8y_SoftwareList")]
+ c8y_software_list: Option<Vec<C8ySoftwareModuleItem>>,
+}
+
+impl<'a> Jsonify<'a> for C8yUpdateSoftwareListResponse {}
+
+impl From<&SoftwareListResponse> for C8yUpdateSoftwareListResponse {
+ fn from(list: &SoftwareListResponse) -> Self {
+ let mut new_list: Vec<C8ySoftwareModuleItem> = Vec::new();
+ list.modules().into_iter().for_each(|software_module| {
+ let c8y_software_module: C8ySoftwareModuleItem = software_module.into();
+ new_list.push(c8y_software_module);
+ });
+
+ Self {
+ c8y_software_list: Some(new_list),
+ }
+ }
+}
+
+impl C8yCreateEvent {
+ pub fn new(source: C8yManagedObject, event_type: &str, time: &str, text: &str) -> Self {
+ Self {
+ source,
+ event_type: event_type.into(),
+ time: time.into(),
+ text: text.into(),
+ }
+ }
+}
+
+impl<'a> Jsonify<'a> for C8yCreateEvent {}
+
+fn combine_version_and_type(
+ version: &Option<SoftwareVersion>,
+ module_type: &Option<SoftwareType>,
+) -> String {
+ match module_type {
+ Some(m) => {
+ if m.is_empty() {
+ match version {
+ Some(v) => v.into(),
+ None => EMPTY_STRING.into(),
+ }
+ } else {
+ match version {
+ Some(v) => format!("{}::{}", v, m),
+ None => format!("::{}", m),
+ }
+ }
+ }
+ None => match version {
+ Some(v) => {
+ if v.contains("::") {
+ format!("{}::", v)
+ } else {
+ v.into()
+ }
+ }
+ None => EMPTY_STRING.into(),
+ },
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn from_software_module_to_c8y_software_module_item() {
+ let software_module = SoftwareModule {
+ module_type: Some("a".into()),
+ name: "b".into(),
+ version: Some("c".into()),
+ url: Some("".into()),
+ file_path: None,
+ };
+
+ let expected_c8y_item = C8ySoftwareModuleItem {
+ name: "b".into(),
+ version: Some("c::a".into()),
+ url: Some("".into()),
+ };
+
+ let converted: C8ySoftwareModuleItem = software_module.into();
+
+ assert_eq!(converted, expected_c8y_item);
+ }
+
+ #[test]
+ fn from_thin_edge_json_to_c8y_set_software_list() {
+ let input_json = r#"{
+ "id":"1",
+ "status":"successful",
+ "currentSoftwareList":[
+ {"type":"debian", "modules":[
+ {"name":"a"},
+ {"name":"b","version":"1.0"},
+ {"name":"c","url":"https://foobar.io/c.deb"},
+ {"name":"d","version":"beta","url":"https://foobar.io/d.deb"}
+ ]},
+ {"type":"apama","modules":[
+ {"name":"m","url":"https://foobar.io/m.epl"}
+ ]}
+ ]}"#;
+
+ let json_obj = &SoftwareListResponse::from_json(input_json).unwrap();
+
+ let c8y_software_list: C8yUpdateSoftwareListResponse = json_obj.into();
+
+ let expected_struct = C8yUpdateSoftwareListResponse {
+ c8y_software_list: Some(vec![
+ C8ySoftwareModuleItem {
+ name: "a".into(),
+ version: Some("::debian".into()),
+ url: Some("".into()),
+ },
+ C8ySoftwareModuleItem {
+ name: "b".into(),
+ version: Some("1.0::debian".into()),
+ url: Some("".into()),
+ },
+ C8ySoftwareModuleItem {
+ name: "c".into(),
+ version: Some("::debian".into()),
+ url: Some("https://foobar.io/c.deb".into()),
+ },
+ C8ySoftwareModuleItem {
+ name: "d".into(),
+ version: Some("beta::debian".into()),
+ url: Some("https://foobar.io/d.deb".into()),
+ },
+ C8ySoftwareModuleItem {
+ name: "m".into(),
+ version: Some("::apama".into()),
+ url: Some("https://foobar.io/m.epl".into()),
+ },
+ ]),
+ };
+
+ let expected_json = r#"{"c8y_SoftwareList":[{"name":"a","version":"::debian","url":{"url":""}},{"name":"b","version":"1.0::debian","url":{"url":""}},{"name":"c","version":"::debian","url":{"url":"https://foobar.io/c.deb"}},{"name":"d","version":"beta::debian","url":{"url":"https://foobar.io/d.deb"}},{"name":"m","version":"::apama","url":{"url":"https://foobar.io/m.epl"}}]}"#;
+
+ assert_eq!(c8y_software_list, expected_struct);
+ assert_eq!(c8y_software_list.to_json().unwrap(), expected_json);
+ }
+
+ #[test]
+ fn empty_to_c8y_set_software_list() {
+ let input_json = r#"{
+ "id":"1",
+ "status":"successful",
+ "currentSoftwareList":[]
+ }"#;
+
+ let json_obj = &SoftwareListResponse::from_json(input_json).unwrap();
+ let c8y_software_list: C8yUpdateSoftwareListResponse = json_obj.into();
+
+ let expected_struct = C8yUpdateSoftwareListResponse {
+ c8y_software_list: Some(vec![]),
+ };
+ let expected_json = r#"{"c8y_SoftwareList":[]}"#;
+
+ assert_eq!(c8y_software_list, expected_struct);
+ assert_eq!(c8y_software_list.to_json().unwrap(), expected_json);
+ }
+
+ #[test]
+ fn get_id_from_c8y_response() {
+ let managed_object = C8yManagedObject { id: "12345".into() };
+ let response = InternalIdResponse {
+ managed_object,
+ external_id: "test".into(),
+ };
+
+ assert_eq!(response.id(), "12345".to_string());
+ }
+
+ #[test]
+ fn verify_combine_version_and_type() {
+ let some_version: Option<SoftwareVersion> = Some("1.0".to_string());
+ let some_version_with_colon: Option<SoftwareVersion> = Some("1.0.0::1".to_string());
+ let none_version: Option<SoftwareVersion> = None;
+ let some_module_type: Option<SoftwareType> = Some("debian".to_string());
+ let none_module_type: Option<SoftwareType> = None;
+
+ assert_eq!(
+ combine_version_and_type(&some_version, &some_module_type),
+ "1.0::debian"
+ );
+ assert_eq!(
+ combine_version_and_type(&some_version, &none_module_type),
+ "1.0"
+ );
+ assert_eq!(
+ combine_version_and_type(&some_version_with_colon, &some_module_type),
+ "1.0.0::1::debian"
+ );
+ assert_eq!(
+ combine_version_and_type(&some_version_with_colon, &none_module_type),
+ "1.0.0::1::"
+ );
+ assert_eq!(
+ combine_version_and_type(&none_version, &some_module_type),
+ "::debian"
+ );
+ assert_eq!(
+ combine_version_and_type(&none_version, &none_module_type),
+ EMPTY_STRING
+ );
+ }
+}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
new file mode 100644
index 00000000..beadef5d
--- /dev/null
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -0,0 +1,867 @@
+use crate::mapper::mqtt_config;
+use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
+use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
+use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
+use async_trait::async_trait;
+use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
+use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
+use c8y_smartrest::{
+ error::SmartRestDeserializerError,
+ smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware},
+ smartrest_serializer::{
+ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
+ },
+};
+use chrono::{DateTime, FixedOffset, Local};
+use json_sm::{
+ Auth, DownloadInfo, Jsonify, SoftwareListRequest, SoftwareListResponse,
+ SoftwareOperationStatus, SoftwareUpdateResponse,
+};
+use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
+use reqwest::Url;
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+use std::{convert::TryInto, time::Duration};
+use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
+use tokio::time::Instant;
+use tracing::{debug, error, info, instrument};
+
+const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
+
+const RETRY_TIMEOUT_SECS: u64 = 60;
+
+pub struct CumulocitySoftwareManagementMapper {}
+
+impl CumulocitySoftwareManagementMapper {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl TEdgeComponent for CumulocitySoftwareManagementMapper {
+ #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")]
+ async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ let mqtt_config = mqtt_config(&tedge_config)?;
+ let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
+
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config);
+
+ let messages = sm_mapper.subscribe().await?;
+ let () = sm_mapper.init().await?;
+ let () = sm_mapper.run(messages).await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+pub struct CumulocitySoftwareManagement {
+ pub client: Client,
+ config: TEdgeConfig,
+ c8y_internal_id: String,
+}
+
+impl CumulocitySoftwareManagement {
+ pub fn new(client: Client, config: TEdgeConfig) -> Self {
+ Self {
+ client,
+ config,
+ c8y_internal_id: "".into(),
+ }
+ }
+
+ pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
+ let mut topic_filter = TopicFilter::new(IncomingTopic::SoftwareListResponse.as_str())?;
+ topic_filter.add(IncomingTopic::SoftwareUpdateResponse.as_str())?;
+ topic_filter.add(IncomingTopic::SmartRestRequest.as_str())?;
+ let messages = self.client.subscribe(topic_filter).await?;
+
+ Ok(messages)
+ }
+
+ #[instrument(skip(self), name = "init")]
+ async fn init(&mut self) -> Result<(), anyhow::Error> {
+ info!("Initialisation");
+ while self.c8y_internal_id.is_empty() {
+ if let Err(error) = self.try_get_and_set_internal_id().await {
+ error!(
+ "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
+ RETRY_TIMEOUT_SECS, error
+ );
+
+ tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS))
+ .await;
+ continue;
+ };
+ }
+ info!("Initialisation done.");
+
+ Ok(())
+ }
+
+ pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> {
+ info!("Running");
+ let () = self.publish_supported_operations().await?;
+ let () = self.publish_supported_log_types().await?;
+ let () = self.publish_get_pending_operations().await?;
+ let () = self.ask_software_list().await?;
+
+ while let Err(err) = self.subscribe_messages_runtime(&mut messages).await {
+ if let SMCumulocityMapperError::FromSmartRestDeserializer(
+ SmartRestDeserializerError::InvalidParameter { operation, .. },
+ ) = &err
+ {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ // publish the operation status as `executing`
+ let () = self.publish(&topic, format!("501,{}", operation)).await?;
+ // publish the operation status as `failed`
+ let () = self
+ .publish(
+ &topic,
+ format!("502,{},\"{}\"", operation, &err.to_string()),
+ )
+ .await?;
+ }
+ error!("{}", err);
+ }
+
+ Ok(())
+ }
+
+ async fn process_smartrest(&self, payload: &str) -> Result<(), SMCumulocityMapperError> {
+ let message_id: &str = &payload[..3];
+ match message_id {
+ "528" => {
+ let () = self.forward_software_request(payload).await?;
+ }
+ "522" => {
+ let () = self.forward_log_request(payload).await?;
+ }
+ _ => {
+ return Err(SMCumulocityMapperError::InvalidMqttMessage);
+ }
+ }
+
+ Ok(())
+ }
+
+ #[instrument(skip(self, messages), name = "main-loop")]
+ async fn subscribe_messages_runtime(
+ &self,
+ messages: &mut Box<dyn MqttMessageStream>,
+ ) -> Result<(), SMCumulocityMapperError> {
+ while let Some(message) = messages.next().await {
+ debug!("Topic {:?}", message.topic.name);
+ debug!("Mapping {:?}", message.payload_str());
+
+ let incoming_topic = message.topic.clone().try_into()?;
+ match incoming_topic {
+ IncomingTopic::SoftwareListResponse => {
+ debug!("Software list");
+ let () = self
+ .validate_and_publish_software_list(message.payload_str()?)
+ .await?;
+ }
+ IncomingTopic::SoftwareUpdateResponse => {
+ debug!("Software update");
+ let () = self
+ .publish_operation_status(message.payload_str()?)
+ .await?;
+ }
+ IncomingTopic::SmartRestRequest => {
+ debug!("Cumulocity");
+ let () = self.process_smartrest(message.payload_str()?).await?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "software-list")]
+ async fn ask_software_list(&self) -> Result<(), SMCumulocityMapperError> {
+ let request = SoftwareListRequest::new();
+ let topic = OutgoingTopic::SoftwareListRequest.to_topic()?;
+ let json_list_request = request.to_json()?;
+ let () = self.publish(&topic, json_list_request).await?;
+
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "software-update")]
+ async fn validate_and_publish_software_list(
+ &self,
+ json_response: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let response = SoftwareListResponse::from_json(json_response)?;
+
+ match response.status() {
+ SoftwareOperationStatus::Successful => {
+ let () = self.send_software_list_http(&response).await?;
+ }
+
+ SoftwareOperationStatus::Failed => {
+ error!("Received a failed software response: {}", json_response);
+ }
+
+ SoftwareOperationStatus::Executing => {} // C8Y doesn't expect any message to be published
+ }
+
+ Ok(())
+ }
+
+ async fn publish_supported_log_types(&self) -> Result<(), SMCumulocityMapperError> {
+ let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_supported_operations(&self) -> Result<(), SMCumulocityMapperError> {
+ let data = SmartRestSetSupportedOperations::default();
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let payload = data.to_smartrest()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_get_pending_operations(&self) -> Result<(), SMCumulocityMapperError> {
+ let data = SmartRestGetPendingOperations::default();
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let payload = data.to_smartrest()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_operation_status(
+ &self,
+ json_response: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let response = SoftwareUpdateResponse::from_json(json_response)?;
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ match response.status() {
+ SoftwareOperationStatus::Executing => {
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::from_thin_edge_json(response)?
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ }
+ SoftwareOperationStatus::Successful => {
+ let smartrest_set_operation =
+ SmartRestSetOperationToSuccessful::from_thin_edge_json(response)?
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ let () = self
+ .validate_and_publish_software_list(json_response)
+ .await?;
+ }
+ SoftwareOperationStatus::Failed => {
+ let smartrest_set_operation =
+ SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ let () = self
+ .validate_and_publish_software_list(json_response)
+ .await?;
+ }
+ };
+ Ok(())
+ }
+
+ async fn set_log_file_request_executing(&self) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()?;
+
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ Ok(())
+ }
+
+ async fn set_log_file_request_done(
+ &self,
+ binary_upload_event_url: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status = SmartRestSetOperationToSuccessful::new(
+ CumulocitySupportedOperations::C8yLogFileRequest,
+ )
+ .with_response_parameter(binary_upload_event_url)
+ .to_smartrest()?;
+
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ Ok(())
+ }
+
+ async fn forward_log_request(&self, payload: &str) -> Result<(), SMCumulocityMapperError> {
+ // retrieve smartrest object from payload
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
+
+ // 1. set log file request to executing
+ let () = self.set_log_file_request_executing().await?;
+
+ // 2. read logs
+ let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?;
+
+ // 3. create log event
+ let token = get_jwt_token(&self.client).await?;
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+
+ let c8y_managed_object = C8yManagedObject {
+ id: self.c8y_internal_id.clone(),
+ };
+ let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?;
+
+ // 4. upload log file
+ let binary_upload_event_url =
+ get_url_for_event_binary_upload(&url_host, &event_response_id);
+
+ let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?;
+
+ // 5. set log file request to done
+ let () = self
+ .set_log_file_request_done(&binary_upload_event_url)
+ .await?;
+
+ info!("Log file request uploaded");
+
+ Ok(())
+ }
+
+ async fn forward_software_request(
+ &self,
+ smartrest: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SoftwareUpdateRequest.to_topic()?;
+ let update_software = SmartRestUpdateSoftware::new();
+ let mut software_update_request = update_software
+ .from_smartrest(smartrest)?
+ .to_thin_edge_json()?;
+
+ let token = get_jwt_token(&self.client).await?;
+ let tenant_uri = self.config.query_string(C8yUrlSetting)?;
+
+ software_update_request
+ .update_list
+ .iter_mut()
+ .for_each(|modules| {
+ modules.modules.iter_mut().for_each(|module| {
+ if let Some(url) = &module.url {
+ if url_is_in_my_tenant_domain(url.url(), &tenant_uri) {
+ module.url = module.url.as_ref().map(|s| {
+ DownloadInfo::new(&s.url)
+ .with_auth(Auth::new_bearer(&token.token()))
+ });
+ } else {
+ module.url = module.url.as_ref().map(|s| DownloadInfo::new(&s.url));
+ }
+ }
+ });
+ });
+
+ let () = self
+ .publish(&topic, software_update_request.to_json()?)
+ .await?;
+
+ Ok(())
+ }
+
+ async fn publish(&self, topic: &Topic, payload: String) -> Result<(), MqttClientError> {
+ let () = self
+ .client
+ .publish(mqtt_client::Message::new(topic, payload))
+ .await?;
+ Ok(())
+ }
+
+ async fn send_software_list_http(
+ &self,
+ json_response: &SoftwareListResponse,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let token = get_jwt_token(&self.client).await?;
+
+ let reqwest_client = reqwest::ClientBuilder::new().build()?;
+
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+ let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id);
+
+ let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into();
+
+ let _published =
+ publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list)
+ .await?;
+
+ Ok(())
+ }
+
+ async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
+ let token = get_jwt_token(&self.client).await?;
+ let reqwest_client = reqwest::ClientBuilder::new().build()?;
+
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+ let device_id = self.config.query_string(DeviceIdSetting)?;
+ let url_get_id = get_url_for_get_id(&url_host, &device_id);
+
+ self.c8y_internal_id =
+ try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?;
+
+ Ok(())
+ }
+}
+
+async fn upload_log_binary(
+ token: &SmartRestJwtResponse,
+ binary_upload_event_url: &str,
+ log_content: &str,
+) -> Result<(), SMCumulocityMapperError> {
+ let client = reqwest::ClientBuilder::new().build()?;
+
+ let request = client
+ .post(binary_upload_event_url)
+ .header("Accept", "application/json")
+ .header("Content-Type", "text/plain")
+ .body(log_content.to_string())
+ .bearer_auth(token.token())
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = client.execute(request).await?;
+ Ok(())
+}
+
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+#[serde(rename_all = "camelCase")]
+/// used to retrieve the id of a log event
+pub struct SmartRestLogEvent {
+ pub id: String,
+}
+
+/// Make a POST request to /event/events and return the event id from response body.
+/// The event id is used to upload the binary.
+async fn create_log_event(
+ url_host: &str,
+ c8y_managed_object: &C8yManagedObject,
+ token: &SmartRestJwtResponse,
+) -> Result<String, SMCumulocityMapperError> {
+ let client = reqwest::ClientBuilder::new().build()?;
+
+ let create_event_url = get_url_for_create_event(&url_host);
+
+ let local: DateTime<Local> = Local::now();
+
+ let c8y_log_event = C8yCreateEvent::new(
+ c8y_managed_object.to_owned(),
+ "c8y_Logfile",
+ &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
+ "software-management",
+ );
+
+ let request = client
+ .post(create_event_url)
+ .json(&c8y_log_event)
+ .bearer_auth(token.token())
+ .header("Accept", "application/json")
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let response = client.execute(request).await?;
+ let event_response_body = response.json::<SmartRestLogEvent>().await?;
+
+ Ok(event_response_body.id)
+}
+
+/// Returns a date time object from a file path or file-path-like string
+/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
+///
+/// # Examples:
+/// ```
+/// let path_buf = PathBuf::fromStr("/path/to/file/with/date/in/path").unwrap();
+/// let path_bufdate_time = get_datetime_from_file_path(&path_buf).unwrap();
+/// ```
+fn get_datetime_from_file_path(
+ log_path: &PathBuf,
+) -> Result<DateTime<FixedOffset>, SMCumulocityMapperError> {
+ if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) {
+ // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z.
+ // to extract the date, rsplit string on "-" and take (last) 3
+ let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>();
+ // reverse back the order (because of rsplit)
+ stem_string_vec.reverse();
+ // join on '-' to get the date string
+ let date_string = stem_string_vec.join("-");
+ let dt = DateTime::parse_from_rfc3339(&date_string)?;
+
+ return Ok(dt);
+ }
+ match log_path.to_str() {
+ Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName(
+ path.to_string(),
+ )),
+ None => Err(SMCumulocityMapperError::InvalidUtf8Path),
+ }
+}
+
+/// Reads tedge logs according to `SmartRestLogRequest`.
+///
+/// If needed, logs are concatenated.
+///
+/// Logs are sorted alphanumerically from oldest to newest.
+///
+/// # Examples
+///
+/// ```
+/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
+/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+/// )
+/// .unwrap();
+///
+/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
+/// ```
+fn read_tedge_logs(
+ smartrest_obj: &SmartRestLogRequest,
+ logs_dir: &str,
+) -> Result<String, SMCumulocityMapperError> {
+ let mut output = String::new();
+
+ // NOTE: As per documentation of std::fs::read_dir:
+ // "The order in which this iterator returns entries is platform and filesystem dependent."
+ // Therefore, files are sorted by date.
+ let mut read_vector: Vec<_> = std::fs::read_dir(logs_dir)?
+ .filter_map(|r| r.ok())
+ .filter_map(|dir_entry| {
+ let file_path = &dir_entry.path();
+ let datetime_object = get_datetime_from_file_path(&file_path);
+ match datetime_object {
+ Ok(dt) => {
+ if dt <