summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2021-11-24 20:54:56 +0000
committerGitHub <noreply@github.com>2021-11-24 20:54:56 +0000
commita4ffeccf60090e4456755bc53a6e3b8c8038e855 (patch)
tree9583f187114913a92866571920dd3bb205bd50a3 /crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
parent8217e80670e76dbf9168780f5e0545355a39f8f3 (diff)
Restructure directories of the workspace (#559)
* Restructure directories of the workspace * Rename c8y_translator_lib to c8y_translator * Update comment on how to get dummy plugin path Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs867
1 files changed, 867 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
new file mode 100644
index 00000000..beadef5d
--- /dev/null
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -0,0 +1,867 @@
+use crate::mapper::mqtt_config;
+use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
+use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
+use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
+use async_trait::async_trait;
+use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
+use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
+use c8y_smartrest::{
+ error::SmartRestDeserializerError,
+ smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware},
+ smartrest_serializer::{
+ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
+ },
+};
+use chrono::{DateTime, FixedOffset, Local};
+use json_sm::{
+ Auth, DownloadInfo, Jsonify, SoftwareListRequest, SoftwareListResponse,
+ SoftwareOperationStatus, SoftwareUpdateResponse,
+};
+use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
+use reqwest::Url;
+use serde::{Deserialize, Serialize};
+use std::path::PathBuf;
+use std::{convert::TryInto, time::Duration};
+use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
+use tokio::time::Instant;
+use tracing::{debug, error, info, instrument};
+
+const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
+
+const RETRY_TIMEOUT_SECS: u64 = 60;
+
+pub struct CumulocitySoftwareManagementMapper {}
+
+impl CumulocitySoftwareManagementMapper {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl TEdgeComponent for CumulocitySoftwareManagementMapper {
+ #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")]
+ async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ let mqtt_config = mqtt_config(&tedge_config)?;
+ let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
+
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config);
+
+ let messages = sm_mapper.subscribe().await?;
+ let () = sm_mapper.init().await?;
+ let () = sm_mapper.run(messages).await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+pub struct CumulocitySoftwareManagement {
+ pub client: Client,
+ config: TEdgeConfig,
+ c8y_internal_id: String,
+}
+
+impl CumulocitySoftwareManagement {
+ pub fn new(client: Client, config: TEdgeConfig) -> Self {
+ Self {
+ client,
+ config,
+ c8y_internal_id: "".into(),
+ }
+ }
+
+ pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
+ let mut topic_filter = TopicFilter::new(IncomingTopic::SoftwareListResponse.as_str())?;
+ topic_filter.add(IncomingTopic::SoftwareUpdateResponse.as_str())?;
+ topic_filter.add(IncomingTopic::SmartRestRequest.as_str())?;
+ let messages = self.client.subscribe(topic_filter).await?;
+
+ Ok(messages)
+ }
+
+ #[instrument(skip(self), name = "init")]
+ async fn init(&mut self) -> Result<(), anyhow::Error> {
+ info!("Initialisation");
+ while self.c8y_internal_id.is_empty() {
+ if let Err(error) = self.try_get_and_set_internal_id().await {
+ error!(
+ "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
+ RETRY_TIMEOUT_SECS, error
+ );
+
+ tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS))
+ .await;
+ continue;
+ };
+ }
+ info!("Initialisation done.");
+
+ Ok(())
+ }
+
+ pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> {
+ info!("Running");
+ let () = self.publish_supported_operations().await?;
+ let () = self.publish_supported_log_types().await?;
+ let () = self.publish_get_pending_operations().await?;
+ let () = self.ask_software_list().await?;
+
+ while let Err(err) = self.subscribe_messages_runtime(&mut messages).await {
+ if let SMCumulocityMapperError::FromSmartRestDeserializer(
+ SmartRestDeserializerError::InvalidParameter { operation, .. },
+ ) = &err
+ {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ // publish the operation status as `executing`
+ let () = self.publish(&topic, format!("501,{}", operation)).await?;
+ // publish the operation status as `failed`
+ let () = self
+ .publish(
+ &topic,
+ format!("502,{},\"{}\"", operation, &err.to_string()),
+ )
+ .await?;
+ }
+ error!("{}", err);
+ }
+
+ Ok(())
+ }
+
+ async fn process_smartrest(&self, payload: &str) -> Result<(), SMCumulocityMapperError> {
+ let message_id: &str = &payload[..3];
+ match message_id {
+ "528" => {
+ let () = self.forward_software_request(payload).await?;
+ }
+ "522" => {
+ let () = self.forward_log_request(payload).await?;
+ }
+ _ => {
+ return Err(SMCumulocityMapperError::InvalidMqttMessage);
+ }
+ }
+
+ Ok(())
+ }
+
+ #[instrument(skip(self, messages), name = "main-loop")]
+ async fn subscribe_messages_runtime(
+ &self,
+ messages: &mut Box<dyn MqttMessageStream>,
+ ) -> Result<(), SMCumulocityMapperError> {
+ while let Some(message) = messages.next().await {
+ debug!("Topic {:?}", message.topic.name);
+ debug!("Mapping {:?}", message.payload_str());
+
+ let incoming_topic = message.topic.clone().try_into()?;
+ match incoming_topic {
+ IncomingTopic::SoftwareListResponse => {
+ debug!("Software list");
+ let () = self
+ .validate_and_publish_software_list(message.payload_str()?)
+ .await?;
+ }
+ IncomingTopic::SoftwareUpdateResponse => {
+ debug!("Software update");
+ let () = self
+ .publish_operation_status(message.payload_str()?)
+ .await?;
+ }
+ IncomingTopic::SmartRestRequest => {
+ debug!("Cumulocity");
+ let () = self.process_smartrest(message.payload_str()?).await?;
+ }
+ }
+ }
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "software-list")]
+ async fn ask_software_list(&self) -> Result<(), SMCumulocityMapperError> {
+ let request = SoftwareListRequest::new();
+ let topic = OutgoingTopic::SoftwareListRequest.to_topic()?;
+ let json_list_request = request.to_json()?;
+ let () = self.publish(&topic, json_list_request).await?;
+
+ Ok(())
+ }
+
+ #[instrument(skip(self), name = "software-update")]
+ async fn validate_and_publish_software_list(
+ &self,
+ json_response: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let response = SoftwareListResponse::from_json(json_response)?;
+
+ match response.status() {
+ SoftwareOperationStatus::Successful => {
+ let () = self.send_software_list_http(&response).await?;
+ }
+
+ SoftwareOperationStatus::Failed => {
+ error!("Received a failed software response: {}", json_response);
+ }
+
+ SoftwareOperationStatus::Executing => {} // C8Y doesn't expect any message to be published
+ }
+
+ Ok(())
+ }
+
+ async fn publish_supported_log_types(&self) -> Result<(), SMCumulocityMapperError> {
+ let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_supported_operations(&self) -> Result<(), SMCumulocityMapperError> {
+ let data = SmartRestSetSupportedOperations::default();
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let payload = data.to_smartrest()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_get_pending_operations(&self) -> Result<(), SMCumulocityMapperError> {
+ let data = SmartRestGetPendingOperations::default();
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let payload = data.to_smartrest()?;
+ let () = self.publish(&topic, payload).await?;
+ Ok(())
+ }
+
+ async fn publish_operation_status(
+ &self,
+ json_response: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let response = SoftwareUpdateResponse::from_json(json_response)?;
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ match response.status() {
+ SoftwareOperationStatus::Executing => {
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::from_thin_edge_json(response)?
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ }
+ SoftwareOperationStatus::Successful => {
+ let smartrest_set_operation =
+ SmartRestSetOperationToSuccessful::from_thin_edge_json(response)?
+ .to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ let () = self
+ .validate_and_publish_software_list(json_response)
+ .await?;
+ }
+ SoftwareOperationStatus::Failed => {
+ let smartrest_set_operation =
+ SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?;
+ let () = self.publish(&topic, smartrest_set_operation).await?;
+ let () = self
+ .validate_and_publish_software_list(json_response)
+ .await?;
+ }
+ };
+ Ok(())
+ }
+
+ async fn set_log_file_request_executing(&self) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yLogFileRequest)
+ .to_smartrest()?;
+
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ Ok(())
+ }
+
+ async fn set_log_file_request_done(
+ &self,
+ binary_upload_event_url: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status = SmartRestSetOperationToSuccessful::new(
+ CumulocitySupportedOperations::C8yLogFileRequest,
+ )
+ .with_response_parameter(binary_upload_event_url)
+ .to_smartrest()?;
+
+ let () = self.publish(&topic, smartrest_set_operation_status).await?;
+ Ok(())
+ }
+
+ async fn forward_log_request(&self, payload: &str) -> Result<(), SMCumulocityMapperError> {
+ // retrieve smartrest object from payload
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(&payload)?;
+
+ // 1. set log file request to executing
+ let () = self.set_log_file_request_executing().await?;
+
+ // 2. read logs
+ let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?;
+
+ // 3. create log event
+ let token = get_jwt_token(&self.client).await?;
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+
+ let c8y_managed_object = C8yManagedObject {
+ id: self.c8y_internal_id.clone(),
+ };
+ let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?;
+
+ // 4. upload log file
+ let binary_upload_event_url =
+ get_url_for_event_binary_upload(&url_host, &event_response_id);
+
+ let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?;
+
+ // 5. set log file request to done
+ let () = self
+ .set_log_file_request_done(&binary_upload_event_url)
+ .await?;
+
+ info!("Log file request uploaded");
+
+ Ok(())
+ }
+
+ async fn forward_software_request(
+ &self,
+ smartrest: &str,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let topic = OutgoingTopic::SoftwareUpdateRequest.to_topic()?;
+ let update_software = SmartRestUpdateSoftware::new();
+ let mut software_update_request = update_software
+ .from_smartrest(smartrest)?
+ .to_thin_edge_json()?;
+
+ let token = get_jwt_token(&self.client).await?;
+ let tenant_uri = self.config.query_string(C8yUrlSetting)?;
+
+ software_update_request
+ .update_list
+ .iter_mut()
+ .for_each(|modules| {
+ modules.modules.iter_mut().for_each(|module| {
+ if let Some(url) = &module.url {
+ if url_is_in_my_tenant_domain(url.url(), &tenant_uri) {
+ module.url = module.url.as_ref().map(|s| {
+ DownloadInfo::new(&s.url)
+ .with_auth(Auth::new_bearer(&token.token()))
+ });
+ } else {
+ module.url = module.url.as_ref().map(|s| DownloadInfo::new(&s.url));
+ }
+ }
+ });
+ });
+
+ let () = self
+ .publish(&topic, software_update_request.to_json()?)
+ .await?;
+
+ Ok(())
+ }
+
+ async fn publish(&self, topic: &Topic, payload: String) -> Result<(), MqttClientError> {
+ let () = self
+ .client
+ .publish(mqtt_client::Message::new(topic, payload))
+ .await?;
+ Ok(())
+ }
+
+ async fn send_software_list_http(
+ &self,
+ json_response: &SoftwareListResponse,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let token = get_jwt_token(&self.client).await?;
+
+ let reqwest_client = reqwest::ClientBuilder::new().build()?;
+
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+ let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id);
+
+ let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into();
+
+ let _published =
+ publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list)
+ .await?;
+
+ Ok(())
+ }
+
+ async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
+ let token = get_jwt_token(&self.client).await?;
+ let reqwest_client = reqwest::ClientBuilder::new().build()?;
+
+ let url_host = self.config.query_string(C8yUrlSetting)?;
+ let device_id = self.config.query_string(DeviceIdSetting)?;
+ let url_get_id = get_url_for_get_id(&url_host, &device_id);
+
+ self.c8y_internal_id =
+ try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?;
+
+ Ok(())
+ }
+}
+
+async fn upload_log_binary(
+ token: &SmartRestJwtResponse,
+ binary_upload_event_url: &str,
+ log_content: &str,
+) -> Result<(), SMCumulocityMapperError> {
+ let client = reqwest::ClientBuilder::new().build()?;
+
+ let request = client
+ .post(binary_upload_event_url)
+ .header("Accept", "application/json")
+ .header("Content-Type", "text/plain")
+ .body(log_content.to_string())
+ .bearer_auth(token.token())
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = client.execute(request).await?;
+ Ok(())
+}
+
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+#[serde(rename_all = "camelCase")]
+/// used to retrieve the id of a log event
+pub struct SmartRestLogEvent {
+ pub id: String,
+}
+
+/// Make a POST request to /event/events and return the event id from response body.
+/// The event id is used to upload the binary.
+async fn create_log_event(
+ url_host: &str,
+ c8y_managed_object: &C8yManagedObject,
+ token: &SmartRestJwtResponse,
+) -> Result<String, SMCumulocityMapperError> {
+ let client = reqwest::ClientBuilder::new().build()?;
+
+ let create_event_url = get_url_for_create_event(&url_host);
+
+ let local: DateTime<Local> = Local::now();
+
+ let c8y_log_event = C8yCreateEvent::new(
+ c8y_managed_object.to_owned(),
+ "c8y_Logfile",
+ &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
+ "software-management",
+ );
+
+ let request = client
+ .post(create_event_url)
+ .json(&c8y_log_event)
+ .bearer_auth(token.token())
+ .header("Accept", "application/json")
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let response = client.execute(request).await?;
+ let event_response_body = response.json::<SmartRestLogEvent>().await?;
+
+ Ok(event_response_body.id)
+}
+
+/// Returns a date time object from a file path or file-path-like string
+/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
+///
+/// # Examples:
+/// ```
+/// let path_buf = PathBuf::fromStr("/path/to/file/with/date/in/path").unwrap();
+/// let path_bufdate_time = get_datetime_from_file_path(&path_buf).unwrap();
+/// ```
+fn get_datetime_from_file_path(
+ log_path: &PathBuf,
+) -> Result<DateTime<FixedOffset>, SMCumulocityMapperError> {
+ if let Some(stem_string) = log_path.file_stem().and_then(|s| s.to_str()) {
+ // a typical file stem looks like this: software-list-2021-10-27T10:29:58Z.
+ // to extract the date, rsplit string on "-" and take (last) 3
+ let mut stem_string_vec = stem_string.rsplit('-').take(3).collect::<Vec<_>>();
+ // reverse back the order (because of rsplit)
+ stem_string_vec.reverse();
+ // join on '-' to get the date string
+ let date_string = stem_string_vec.join("-");
+ let dt = DateTime::parse_from_rfc3339(&date_string)?;
+
+ return Ok(dt);
+ }
+ match log_path.to_str() {
+ Some(path) => Err(SMCumulocityMapperError::InvalidDateInFileName(
+ path.to_string(),
+ )),
+ None => Err(SMCumulocityMapperError::InvalidUtf8Path),
+ }
+}
+
+/// Reads tedge logs according to `SmartRestLogRequest`.
+///
+/// If needed, logs are concatenated.
+///
+/// Logs are sorted alphanumerically from oldest to newest.
+///
+/// # Examples
+///
+/// ```
+/// let smartrest_obj = SmartRestLogRequest::from_smartrest(
+/// "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+/// )
+/// .unwrap();
+///
+/// let log = read_tedge_system_logs(&smartrest_obj, "/var/log/tedge").unwrap();
+/// ```
+fn read_tedge_logs(
+ smartrest_obj: &SmartRestLogRequest,
+ logs_dir: &str,
+) -> Result<String, SMCumulocityMapperError> {
+ let mut output = String::new();
+
+ // NOTE: As per documentation of std::fs::read_dir:
+ // "The order in which this iterator returns entries is platform and filesystem dependent."
+ // Therefore, files are sorted by date.
+ let mut read_vector: Vec<_> = std::fs::read_dir(logs_dir)?
+ .filter_map(|r| r.ok())
+ .filter_map(|dir_entry| {
+ let file_path = &dir_entry.path();
+ let datetime_object = get_datetime_from_file_path(&file_path);
+ match datetime_object {
+ Ok(dt) => {
+ if dt < smartrest_obj.date_from || dt > smartrest_obj.date_to {
+ return None;
+ }
+ Some(dir_entry)
+ }
+ Err(_) => None,
+ }
+ })
+ .collect();
+
+ read_vector.sort_by_key(|dir| dir.path());
+
+ // loop sorted vector and push store log file to `output`
+ let mut line_counter: usize = 0;
+ for entry in read_vector {
+ let file_path = entry.path();
+ let file_content = std::fs::read_to_string(&file_path)?;
+ if file_content.is_empty() {
+ continue;
+ }
+
+ // adding file header only if line_counter permits more lines to be added
+ match &file_path.file_stem().and_then(|f| f.to_str()) {
+ Some(file_name) if line_counter < smartrest_obj.lines => {
+ output.push_str(&format!("filename: {}\n", file_name));
+ }
+ _ => {}
+ }
+
+ // split at new line delimiter ("\n")
+ let mut lines = file_content.lines();
+ while line_counter < smartrest_obj.lines {
+ if let Some(haystack) = lines.next() {
+ if let Some(needle) = &smartrest_obj.needle {
+ if haystack.contains(needle) {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ output.push_str(&format!("{}\n", haystack));
+ line_counter += 1;
+ }
+ } else {
+ // there are no lines.next()
+ break;
+ }
+ }
+ }
+ Ok(output)
+}
+
+fn url_is_in_my_tenant_domain(url: &str, tenant_uri: &str) -> bool {
+ // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options:
+ // * <tenant_name>.<domain> eg: sample.c8y.io
+ // * <tenant_id>.<domain> eg: t12345.c8y.io
+ // These URLs may be both equivalent and point to the same tenant.
+ // We are going to remove that and only check if the domain is the same.
+ let url_host = match Url::parse(url) {
+ Ok(url) => match url.host() {
+ Some(host) => host.to_string(),
+ None => return false,
+ },
+ Err(_err) => {
+ return false;
+ }
+ };
+
+ let url_domain = url_host.splitn(2, '.').collect::<Vec<&str>>();
+ let tenant_domain = tenant_uri.splitn(2, '.').collect::<Vec<&str>>();
+ if url_domain.get(1) == tenant_domain.get(1) {
+ return true;
+ }
+ false
+}
+
+async fn publish_software_list_http(
+ client: &reqwest::Client,
+ url: &str,
+ token: &str,
+ list: &C8yUpdateSoftwareListResponse,
+) -> Result<(), SMCumulocityMapperError> {
+ let request = client
+ .put(url)
+ .json(list)
+ .bearer_auth(token)
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = client.execute(request).await?;
+
+ Ok(())
+}
+
+async fn try_get_internal_id(
+ client: &reqwest::Client,
+ url_get_id: &str,
+ token: &str,
+) -> Result<String, SMCumulocityMapperError> {
+ let internal_id = client.get(url_get_id).bearer_auth(token).send().await?;
+ let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
+
+ let internal_id = internal_id_response.id();
+ Ok(internal_id)
+}
+
+fn get_url_for_sw_list(url_host: &str, internal_id: &str) -> String {
+ let mut url_update_swlist = String::new();
+ url_update_swlist.push_str("https://");
+ url_update_swlist.push_str(url_host);
+ url_update_swlist.push_str("/inventory/managedObjects/");
+ url_update_swlist.push_str(internal_id);
+
+ url_update_swlist
+}
+
+fn get_url_for_get_id(url_host: &str, device_id: &str) -> String {
+ let mut url_get_id = String::new();
+ url_get_id.push_str("https://");
+ url_get_id.push_str(url_host);
+ url_get_id.push_str("/identity/externalIds/c8y_Serial/");
+ url_get_id.push_str(device_id);
+
+ url_get_id
+}
+
+fn get_url_for_create_event(url_host: &str) -> String {
+ let mut url_create_event = String::new();
+ url_create_event.push_str("https://");
+ url_create_event.push_str(url_host);
+ url_create_event.push_str("/event/events/");
+
+ url_create_event
+}
+
+fn get_url_for_event_binary_upload(url_host: &str, event_id: &str) -> String {
+ let mut url_event_binary = get_url_for_create_event(url_host);
+ url_event_binary.push_str(event_id);
+ url_event_binary.push_str("/binaries");
+
+ url_event_binary
+}
+
+async fn get_jwt_token(client: &Client) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ let mut subscriber = client.subscribe(Topic::new("c8y/s/dat")?.filter()).await?;
+
+ let () = client
+ .publish(mqtt_client::Message::new(
+ &Topic::new("c8y/s/uat")?,
+ "".to_string(),
+ ))
+ .await?;
+
+ let token_smartrest =
+ match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await {
+ Ok(Some(msg)) => msg.payload_str()?.to_string(),
+ Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
+ Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
+ };
+
+ Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use mqtt_tests::with_timeout::{Maybe, WithTimeout};
+ use std::fs::File;
+ use std::io::Write;
+ use std::str::FromStr;
+ use test_case::test_case;
+
+ const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000);
+
+ #[tokio::test]
+ #[serial_test::serial]
+ async fn get_jwt_token_full_run() {
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mut messages = broker.messages_published_on("c8y/s/uat").await;
+
+ let publisher = Client::connect(
+ "get_jwt_token_full_run",
+ &mqtt_client::Config::default().with_port(broker.port),
+ )
+ .await
+ .unwrap();
+
+ // Setup listener stream to publish on first message received on topic `c8y/s/us`.
+ let responder_task = tokio::spawn(async move {
+ let msg = messages
+ .recv()
+ .with_timeout(TEST_TIMEOUT_MS)
+ .await
+ .expect_or("No JWT request received.");
+ assert_eq!(&msg, "");
+
+ // After receiving successful message publish response with a custom 'token' on topic `c8y/s/dat`.
+ let _ = broker.publish("c8y/s/dat", "71,1111").await;
+ });
+
+ // Wait till token received.
+ let (jwt_token, _responder) = tokio::join!(get_jwt_token(&publisher), responder_task);
+
+ // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`.
+ assert!(jwt_token.is_ok());
+ assert_eq!(jwt_token.unwrap().token(), "1111");
+ }
+
+ #[test]
+ fn get_url_for_get_id_returns_correct_address() {
+ let res = get_url_for_get_id("test_host", "test_device");
+
+ assert_eq!(
+ res,
+ "https://test_host/identity/externalIds/c8y_Serial/test_device"
+ );
+ }
+
+ #[test]
+ fn get_url_for_sw_list_returns_correct_address() {
+ let res = get_url_for_sw_list("test_host", "12345");
+
+ assert_eq!(res, "https://test_host/inventory/managedObjects/12345");
+ }
+
+ #[test_case("http://aaa.test.com")]
+ #[test_case("https://aaa.test.com")]
+ #[test_case("ftp://aaa.test.com")]
+ #[test_case("mqtt://aaa.test.com")]
+ #[test_case("https://t1124124.test.com")]
+ #[test_case("https://t1124124.test.com:12345")]
+ #[test_case("https://t1124124.test.com/path")]
+ #[test_case("https://t1124124.test.com/path/to/file.test")]
+ #[test_case("https://t1124124.test.com/path/to/file")]
+ fn url_is_my_tenant_correct_urls(url: &str) {
+ assert!(url_is_in_my_tenant_domain(url, "test.test.com"));
+ }
+
+ #[test_case("test.com")]
+ #[test_case("http://test.co")]
+ #[test_case("http://test.co.te")]
+ #[test_case("http://test.com:123456")]
+ #[test_case("http://test.com::12345")]
+ fn url_is_my_tenant_incorrect_urls(url: &str) {
+ assert!(!url_is_in_my_tenant_domain(url, "test.test.com"));
+ }
+
+ #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45:41Z.log")]
+ fn test_datetime_parsing_from_path(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an Ok Result.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_ok());
+ }
+
+ #[test_case("/path/to/software-list-2021-10-27-10:44:44Z.log")]
+ #[test_case("/path/to/tedge/agent/software-update-10-25-2021T07:45:41Z.log")]
+ #[test_case("/path/to/another-variant-07:45:41Z-2021-10-25T.log")]
+ #[test_case("/yet-another-variant-2021-10-25T07:45Z.log")]
+ fn test_datetime_parsing_from_path_fail(file_path: &str) {
+ // checking that `get_date_from_file_path` unwraps a `chrono::NaiveDateTime` object.
+ // this should return an err.
+ let path_buf = PathBuf::from_str(file_path).unwrap();
+ let path_buf_datetime = get_datetime_from_file_path(&path_buf);
+ assert!(path_buf_datetime.is_err());
+ }
+
+ fn parse_file_names_from_log_content(log_content: &str) -> [&str; 5] {
+ let mut files: Vec<&str> = vec![];
+ for line in log_content.lines() {
+ if line.contains("filename: ") {
+ let filename: &str = line.split("filename: ").last().unwrap();
+ files.push(filename);
+ }
+ }
+ match files.try_into() {
+ Ok(arr) => arr,
+ Err(_) => panic!("Could not convert to Array &str, size 5"),
+ }
+ }
+
+ #[test]
+ /// testing read_tedge_logs
+ ///
+ /// this test creates 5 fake log files in a temporary directory.
+ /// files are dated 2021-01-0XT01:00Z, where X = a different day.
+ ///
+ /// this tests will assert that files are read alphanumerically from oldest to newest
+ fn test_read_logs() {
+ // order in which files are created
+ const LOG_FILE_NAMES: [&str; 5] = [
+ "software-list-2021-01-03T01:00:00Z.log",
+ "software-list-2021-01-02T01:00:00Z.log",
+ "software-list-2021-01-01T01:00:00Z.log",
+ "software-update-2021-01-03T01:00:00Z.log",
+ "software-update-2021-01-02T01:00:00Z.log",
+ ];
+
+ // expected (sorted) output
+ const EXPECTED_OUTPUT: [&str; 5] = [
+ "software-list-2021-01-01T01:00:00Z",
+ "software-list-2021-01-02T01:00:00Z",
+ "software-list-2021-01-03T01:00:00Z",
+ "software-update-2021-01-02T01:00:00Z",
+ "software-update-2021-01-03T01:00:00Z",
+ ];
+
+ let smartrest_obj = SmartRestLogRequest::from_smartrest(
+ "522,DeviceSerial,syslog,2021-01-01T00:00:00+0200,2021-01-10T00:00:00+0200,,1000",
+ )
+ .unwrap();
+
+ let temp_dir = tempfile::tempdir().unwrap();
+ // creating the files
+ for (idx, file) in LOG_FILE_NAMES.iter().enumerate() {
+ let file_path = &temp_dir.path().join(file);
+ let mut file = File::create(file_path).unwrap();
+ writeln!(file, "file num {}", idx).unwrap();
+ }
+
+ // reading the logs and extracting the