summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2021-12-06 18:00:57 +0000
committerGitHub <noreply@github.com>2021-12-06 18:00:57 +0000
commitbde1f0f2a2d07184a26c68a97022b26c3961a47f (patch)
treee07bc6769feeac25b28c8ce7b4a3b4bf329eef44 /crates/core/tedge_mapper/src/sm_c8y_mapper
parent07f7d72b3c207c7c9e651fee5438b6f89fc97371 (diff)
Refactor the sm_c8y mapper extracting http code (#659)
* Move out any HTTP related code from the MQTT mapper to an HTTP proxy * Update the tests * Simplify the jwt tests with a background generator * Reuse the reqwest client for all requests * Rename `JwtAuthHttpProxy` the main `C8yHttpProxy` implementation * Fix typo Co-authored-by: Wenzek <diw@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs4
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs364
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs364
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs4
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs70
5 files changed, 473 insertions, 333 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
index 5cca31b4..296f17e4 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
@@ -1,13 +1,13 @@
use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError};
#[derive(thiserror::Error, Debug)]
-pub(crate) enum MapperTopicError {
+pub enum MapperTopicError {
#[error("Topic {topic} is unknown.")]
UnknownTopic { topic: String },
}
#[derive(thiserror::Error, Debug)]
-pub(crate) enum SMCumulocityMapperError {
+pub enum SMCumulocityMapperError {
#[error("Invalid MQTT Message.")]
InvalidMqttMessage,
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs
new file mode 100644
index 00000000..277b0a11
--- /dev/null
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs
@@ -0,0 +1,364 @@
+use crate::sm_c8y_mapper::error::SMCumulocityMapperError;
+use crate::sm_c8y_mapper::json_c8y::{
+ C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
+};
+use crate::sm_c8y_mapper::mapper::SmartRestLogEvent;
+use async_trait::async_trait;
+use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
+use chrono::{DateTime, Local};
+use mqtt_client::{Client, MqttClient, Topic};
+use reqwest::Url;
+use std::time::Duration;
+use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
+use tracing::{error, info, instrument};
+
+const RETRY_TIMEOUT_SECS: u64 = 60;
+
+/// An HttpProxy handles http requests to C8y on behalf of the device.
+#[async_trait]
+pub trait C8YHttpProxy {
+ async fn init(&mut self) -> Result<(), SMCumulocityMapperError>;
+
+ fn url_is_in_my_tenant_domain(&self, url: &str) -> bool;
+
+ async fn get_jwt_token(&self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+
+ async fn send_software_list_http(
+ &self,
+ c8y_software_list: &C8yUpdateSoftwareListResponse,
+ ) -> Result<(), SMCumulocityMapperError>;
+
+ async fn upload_log_binary(&self, log_content: &str)
+ -> Result<String, SMCumulocityMapperError>;
+}
+
+/// Define a C8y endpoint
+pub struct C8yEndPoint {
+ c8y_host: String,
+ device_id: String,
+ c8y_internal_id: String,
+}
+
+impl C8yEndPoint {
+ fn new(c8y_host: &str, device_id: &str, c8y_internal_id: &str) -> C8yEndPoint {
+ C8yEndPoint {
+ c8y_host: c8y_host.into(),
+ device_id: device_id.into(),
+ c8y_internal_id: c8y_internal_id.into(),
+ }
+ }
+
+ fn get_url_for_sw_list(&self) -> String {
+ let mut url_update_swlist = String::new();
+ url_update_swlist.push_str("https://");
+ url_update_swlist.push_str(&self.c8y_host);
+ url_update_swlist.push_str("/inventory/managedObjects/");
+ url_update_swlist.push_str(&self.c8y_internal_id);
+
+ url_update_swlist
+ }
+
+ fn get_url_for_get_id(&self) -> String {
+ let mut url_get_id = String::new();
+ url_get_id.push_str("https://");
+ url_get_id.push_str(&self.c8y_host);
+ url_get_id.push_str("/identity/externalIds/c8y_Serial/");
+ url_get_id.push_str(&self.device_id);
+
+ url_get_id
+ }
+
+ fn get_url_for_create_event(&self) -> String {
+ let mut url_create_event = String::new();
+ url_create_event.push_str("https://");
+ url_create_event.push_str(&self.c8y_host);
+ url_create_event.push_str("/event/events/");
+
+ url_create_event
+ }
+
+ fn get_url_for_event_binary_upload(&self, event_id: &str) -> String {
+ let mut url_event_binary = self.get_url_for_create_event();
+ url_event_binary.push_str(event_id);
+ url_event_binary.push_str("/binaries");
+
+ url_event_binary
+ }
+
+ fn url_is_in_my_tenant_domain(&self, url: &str) -> bool {
+ // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options:
+ // * <tenant_name>.<domain> eg: sample.c8y.io
+ // * <tenant_id>.<domain> eg: t12345.c8y.io
+ // These URLs may be both equivalent and point to the same tenant.
+ // We are going to remove that and only check if the domain is the same.
+ let tenant_uri = &self.c8y_host;
+ let url_host = match Url::parse(url) {
+ Ok(url) => match url.host() {
+ Some(host) => host.to_string(),
+ None => return false,
+ },
+ Err(_err) => {
+ return false;
+ }
+ };
+
+ let url_domain = url_host.splitn(2, '.').collect::<Vec<&str>>();
+ let tenant_domain = tenant_uri.splitn(2, '.').collect::<Vec<&str>>();
+ if url_domain.get(1) == tenant_domain.get(1) {
+ return true;
+ }
+ false
+ }
+}
+
+/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device
+///
+/// - Keep the connection info to c8y and the internal Id of the device
+/// - Handle JWT requests
+pub struct JwtAuthHttpProxy {
+ mqtt_con: Client,
+ http_con: reqwest::Client,
+ end_point: C8yEndPoint,
+}
+
+impl JwtAuthHttpProxy {
+ pub fn new(
+ mqtt_con: Client,
+ http_con: reqwest::Client,
+ c8y_host: &str,
+ device_id: &str,
+ ) -> JwtAuthHttpProxy {
+ JwtAuthHttpProxy {
+ mqtt_con,
+ http_con,
+ end_point: C8yEndPoint {
+ c8y_host: c8y_host.into(),
+ device_id: device_id.into(),
+ c8y_internal_id: "".into(),
+ },
+ }
+ }
+
+ pub fn try_new(
+ mqtt_con: Client,
+ tedge_config: &TEdgeConfig,
+ ) -> Result<JwtAuthHttpProxy, SMCumulocityMapperError> {
+ let c8y_host = tedge_config.query_string(C8yUrlSetting)?;
+ let device_id = tedge_config.query_string(DeviceIdSetting)?;
+ let http_con = reqwest::ClientBuilder::new().build()?;
+ Ok(JwtAuthHttpProxy::new(
+ mqtt_con, http_con, &c8y_host, &device_id,
+ ))
+ }
+
+ async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+ let url_get_id = self.end_point.get_url_for_get_id();
+
+ self.end_point.c8y_internal_id = self
+ .try_get_internal_id(&url_get_id, &token.token())
+ .await?;
+
+ Ok(())
+ }
+
+ async fn try_get_internal_id(
+ &self,
+ url_get_id: &str,
+ token: &str,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let internal_id = self
+ .http_con
+ .get(url_get_id)
+ .bearer_auth(token)
+ .send()
+ .await?;
+ let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
+
+ let internal_id = internal_id_response.id();
+ Ok(internal_id)
+ }
+
+ /// Make a POST request to /event/events and return the event id from response body.
+ /// The event id is used to upload the binary.
+ fn create_log_event(&self) -> C8yCreateEvent {
+ let local: DateTime<Local> = Local::now();
+
+ let c8y_managed_object = C8yManagedObject {
+ id: self.end_point.c8y_internal_id.clone(),
+ };
+
+ C8yCreateEvent::new(
+ c8y_managed_object.to_owned(),
+ "c8y_Logfile",
+ &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
+ "software-management",
+ )
+ }
+
+ async fn get_event_id(
+ &self,
+ c8y_event: C8yCreateEvent,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+ let create_event_url = self.end_point.get_url_for_create_event();
+
+ let request = self
+ .http_con
+ .post(create_event_url)
+ .json(&c8y_event)
+ .bearer_auth(token.token())
+ .header("Accept", "application/json")
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let response = self.http_con.execute(request).await?;
+ let event_response_body = response.json::<SmartRestLogEvent>().await?;
+
+ Ok(event_response_body.id)
+ }
+}
+
+#[async_trait]
+impl C8YHttpProxy for JwtAuthHttpProxy {
+ fn url_is_in_my_tenant_domain(&self, url: &str) -> bool {
+ self.end_point.url_is_in_my_tenant_domain(url)
+ }
+
+ #[instrument(skip(self), name = "init")]
+ async fn init(&mut self) -> Result<(), SMCumulocityMapperError> {
+ info!("Initialisation");
+ while self.end_point.c8y_internal_id.is_empty() {
+ if let Err(error) = self.try_get_and_set_internal_id().await {
+ error!(
+ "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
+ RETRY_TIMEOUT_SECS, error
+ );
+
+ tokio::time::sleep(Duration::from_secs(RETRY_TIMEOUT_SECS)).await;
+ continue;
+ };
+ }
+ info!("Initialisation done.");
+ Ok(())
+ }
+
+ async fn get_jwt_token(&self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ let mut subscriber = self
+ .mqtt_con
+ .subscribe(Topic::new("c8y/s/dat")?.filter())
+ .await?;
+
+ let () = self
+ .mqtt_con
+ .publish(mqtt_client::Message::new(
+ &Topic::new("c8y/s/uat")?,
+ "".to_string(),
+ ))
+ .await?;
+
+ let token_smartrest =
+ match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await {
+ Ok(Some(msg)) => msg.payload_str()?.to_string(),
+ Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
+ Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
+ };
+
+ Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ }
+
+ async fn send_software_list_http(
+ &self,
+ c8y_software_list: &C8yUpdateSoftwareListResponse,
+ ) -> Result<(), SMCumulocityMapperError> {
+ let url = self.end_point.get_url_for_sw_list();
+ let token = self.get_jwt_token().await?;
+
+ let request = self
+ .http_con
+ .put(url)
+ .json(c8y_software_list)
+ .bearer_auth(&token.token())
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = self.http_con.execute(request).await?;
+
+ Ok(())
+ }
+
+ async fn upload_log_binary(
+ &self,
+ log_content: &str,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+
+ let log_event = self.create_log_event();
+ let event_response_id = self.get_event_id(log_event).await?;
+ let binary_upload_event_url = self
+ .end_point
+ .get_url_for_event_binary_upload(&event_response_id);
+
+ let request = self
+ .http_con
+ .post(&binary_upload_event_url)
+ .header("Accept", "application/json")
+ .header("Content-Type", "text/plain")
+ .body(log_content.to_string())
+ .bearer_auth(token.token())
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = self.http_con.execute(request).await?;
+ Ok(binary_upload_event_url)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use test_case::test_case;
+
+ #[test]
+ fn get_url_for_get_id_returns_correct_address() {
+ let c8y = C8yEndPoint::new("test_host", "test_device", "internal-id");
+ let res = c8y.get_url_for_get_id();
+
+ assert_eq!(
+ res,
+ "https://test_host/identity/externalIds/c8y_Serial/test_device"
+ );
+ }
+
+ #[test]
+ fn get_url_for_sw_list_returns_correct_address() {
+ let c8y = C8yEndPoint::new("test_host", "test_device", "12345");
+ let res = c8y.get_url_for_sw_list();
+
+ assert_eq!(res, "https://test_host/inventory/managedObjects/12345");
+ }
+
+ #[test_case("http://aaa.test.com")]
+ #[test_case("https://aaa.test.com")]
+ #[test_case("ftp://aaa.test.com")]
+ #[test_case("mqtt://aaa.test.com")]
+ #[test_case("https://t1124124.test.com")]
+ #[test_case("https://t1124124.test.com:12345")]
+ #[test_case("https://t1124124.test.com/path")]
+ #[test_case("https://t1124124.test.com/path/to/file.test")]
+ #[test_case("https://t1124124.test.com/path/to/file")]
+ fn url_is_my_tenant_correct_urls(url: &str) {
+ let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id");
+ assert!(c8y.url_is_in_my_tenant_domain(url));
+ }
+
+ #[test_case("test.com")]
+ #[test_case("http://test.co")]
+ #[test_case("http://test.co.te")]
+ #[test_case("http://test.com:123456")]
+ #[test_case("http://test.com::12345")]
+ fn url_is_my_tenant_incorrect_urls(url: &str) {
+ let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id");
+ assert!(!c8y.url_is_in_my_tenant_domain(url));
+ }
+}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index e838c508..3c064e90 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -1,37 +1,33 @@
+use crate::component::TEdgeComponent;
use crate::mapper::mqtt_config;
-use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
+use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
-use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
use async_trait::async_trait;
use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest};
use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_smartrest::{
error::SmartRestDeserializerError,
- smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware},
+ smartrest_deserializer::SmartRestUpdateSoftware,
smartrest_serializer::{
SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
},
};
-use chrono::{DateTime, FixedOffset, Local};
+use chrono::{DateTime, FixedOffset};
use json_sm::{
Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest,
RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
-use reqwest::Url;
use serde::{Deserialize, Serialize};
+use std::convert::TryInto;
use std::path::PathBuf;
-use std::{convert::TryInto, time::Duration};
-use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
-use tokio::time::Instant;
+use tedge_config::TEdgeConfig;
use tracing::{debug, error, info, instrument};
const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
-const RETRY_TIMEOUT_SECS: u64 = 60;
-
pub struct CumulocitySoftwareManagementMapper {}
impl CumulocitySoftwareManagementMapper {
@@ -46,31 +42,32 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_config = mqtt_config(&tedge_config)?;
let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
+ let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?;
- let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config);
+ let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?;
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy);
let messages = sm_mapper.subscribe().await?;
- let () = sm_mapper.init().await?;
let () = sm_mapper.run(messages).await?;
Ok(())
}
}
-#[derive(Debug)]
-pub struct CumulocitySoftwareManagement {
+pub struct CumulocitySoftwareManagement<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
pub client: Client,
- config: TEdgeConfig,
- c8y_internal_id: String,
+ http_proxy: Proxy,
}
-impl CumulocitySoftwareManagement {
- pub fn new(client: Client, config: TEdgeConfig) -> Self {
- Self {
- client,
- config,
- c8y_internal_id: "".into(),
- }
+impl<Proxy> CumulocitySoftwareManagement<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
+ pub fn new(client: Client, http_proxy: Proxy) -> Self {
+ Self { client, http_proxy }
}
pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
@@ -83,27 +80,13 @@ impl CumulocitySoftwareManagement {
Ok(messages)
}
- #[instrument(skip(self), name = "init")]
- async fn init(&mut self) -> Result<(), anyhow::Error> {
+ pub async fn run(
+ &mut self,
+ mut messages: Box<dyn MqttMessageStream>,
+ ) -> Result<(), anyhow::Error> {
info!("Initialisation");
- while self.c8y_internal_id.is_empty() {
- if let Err(error) = self.try_get_and_set_internal_id().await {
- error!(
- "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
- RETRY_TIMEOUT_SECS, error
- );
-
- tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS))
- .await;
- continue;
- };
- }
- info!("Initialisation done.");
-
- Ok(())
- }
+ let () = self.http_proxy.init().await?;
- pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> {
info!("Running");
let () = self.publish_supported_operations().await?;
let () = self.publish_supported_log_types().await?;
@@ -348,22 +331,13 @@ impl CumulocitySoftwareManagement {
// 2. read logs
let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?;
- // 3. create log event
- let token = get_jwt_token(&self.client).await?;
- let url_host = self.config.query_string(C8yUrlSetting)?;
-
- let c8y_managed_object = C8yManagedObject {
- id: self.c8y_internal_id.clone(),
- };
- let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?;
-
- // 4. upload log file
- let binary_upload_event_url =
- get_url_for_event_binary_upload(&url_host, &event_response_id);
-
- let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?;
+ // 3. upload log file
+ let binary_upload_event_url = self
+ .http_proxy
+ .upload_log_binary(&log_output.as_str())
+ .await?;
- // 5. set log file request to done
+ // 4. set log file request to done
let () = self
.set_log_file_request_done(&binary_upload_event_url)
.await?;
@@ -383,8 +357,7 @@ impl CumulocitySoftwareManagement {
.from_smartrest(smartrest)?
.to_thin_edge_json()?;
- let token = get_jwt_token(&self.client).await?;
- let tenant_uri = self.config.query_string(C8yUrlSetting)?;
+ let token = self.http_proxy.get_jwt_token().await?;
software_update_request
.update_list
@@ -392,7 +365,7 @@ impl CumulocitySoftwareManagement {
.for_each(|modules| {
modules.modules.iter_mut().for_each(|module| {
if let Some(url) = &module.url {
- if url_is_in_my_tenant_domain(url.url(), &tenant_uri) {
+ if self.http_proxy.url_is_in_my_tenant_domain(url.url()) {
module.url = module.url.as_ref().map(|s| {
DownloadInfo::new(&s.url)
.with_auth(Auth::new_bearer(&token.token()))
@@ -436,57 +409,13 @@ impl CumulocitySoftwareManagement {
&self,
json_response: &SoftwareListResponse,
) -> Result<(), SMCumulocityMapperError> {
- let token = get_jwt_token(&self.client).await?;
-
- let reqwest_client = reqwest::ClientBuilder::new().build()?;
-
- let url_host = self.config.query_string(C8yUrlSetting)?;
- let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id);
-
let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into();
-
- let _published =
- publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list)
- .await?;
-
- Ok(())
- }
-
- async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
- let token = get_jwt_token(&self.client).await?;
- let reqwest_client = reqwest::ClientBuilder::new().build()?;
-
- let url_host = self.config.query_string(C8yUrlSetting)?;
- let device_id = self.config.query_string(DeviceIdSetting)?;
- let url_get_id = get_url_for_get_id(&url_host, &device_id);
-
- self.c8y_internal_id =
- try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?;
-
- Ok(())
+ self.http_proxy
+ .send_software_list_http(&c8y_software_list)
+ .await
}
}
-async fn upload_log_binary(
- token: &SmartRestJwtResponse,
- binary_upload_event_url: &str,
- log_content: &str,
-) -> Result<(), SMCumulocityMapperError> {
- let client = reqwest::ClientBuilder::new().build()?;
-
- let request = client
- .post(binary_upload_event_url)
- .header("Accept", "application/json")
- .header("Content-Type", "text/plain")
- .body(log_content.to_string())
- .bearer_auth(token.token())
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let _response = client.execute(request).await?;
- Ok(())
-}
-
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
/// used to retrieve the id of a log event
@@ -494,40 +423,6 @@ pub struct SmartRestLogEvent {
pub id: String,
}
-/// Make a POST request to /event/events and return the event id from response body.
-/// The event id is used to upload the binary.
-async fn create_log_event(
- url_host: &str,
- c8y_managed_object: &C8yManagedObject,
- token: &SmartRestJwtResponse,
-) -> Result<String, SMCumulocityMapperError> {
- let client = reqwest::ClientBuilder::new().build()?;
-
- let create_event_url = get_url_for_create_event(&url_host);
-
- let local: DateTime<Local> = Local::now();
-
- let c8y_log_event = C8yCreateEvent::new(
- c8y_managed_object.to_owned(),
- "c8y_Logfile",
- &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
- "software-management",
- );
-
- let request = client
- .post(create_event_url)
- .json(&c8y_log_event)
- .bearer_auth(token.token())
- .header("Accept", "application/json")
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let response = client.execute(request).await?;
- let event_response_body = response.json::<SmartRestLogEvent>().await?;
-
- Ok(event_response_body.id)
-}
-
/// Returns a date time object from a file path or file-path-like string
/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
///
@@ -642,201 +537,14 @@ fn read_tedge_logs(
Ok(output)
}
-fn url_is_in_my_tenant_domain(url: &str, tenant_uri: &str) -> bool {
- // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options:
- // * <tenant_name>.<domain> eg: sample.c8y.io
- // * <tenant_id>.<domain> eg: t12345.c8y.io
- // These URLs may be both equivalent and point to the same tenant.
- // We are going to remove that and only check if the domain is the same.
- let url_host = match Url::parse(url) {
- Ok(url) => match url.host() {
- Some(host) => host.to_string(),
- None => return false,
- },
- Err(_err) => {
- return false;
- }
- };
-
- let url_domain = url_host.splitn(2, '.').collect::<Vec<&str>>();
- let tenant_domain = tenant_uri.splitn(2, '.').collect::<Vec<&str>>();
- if url_domain.get(1) == tenant_domain.get(1) {
- return true;
- }
- false
-}
-
-async fn publish_software_list_http(
- client: &reqwest::Client,
- url: &str,
- token: &str,
- list: &C8yUpdateSoftwareListResponse,
-) -> Result<(), SMCumulocityMapperError> {
- let request = client
- .put(url)
- .json(list)
- .bearer_auth(token)
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let _response = client.execute(request).await?;
-
- Ok(())
-}
-
-async fn try_get_internal_id(
- client: &reqwest::Client,
- url_get_id: &str,
- token: &str,
-) -> Result<String, SMCumulocityMapperError> {
- let internal_id = client.get(url_get_id).bearer_auth(token).send().await?;
- let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
-
- let internal_id = internal_id_response.id();
- Ok(internal_id)
-}
-
-fn get_url_for_sw_list(url_host: &str, internal_id: &str) -> String {
- let mut url_update_swlist = String::new();
- url_update_swlist.push_str("https://");
- url_update_swlist.push_str(url_host);
- url_update_swlist.push_str("/inventory/managedObjects/");
- url_update_swlist.push_str(internal_id);
-
- url_update_swlist
-}
-
-fn get_url_for_get_id(url_host: &str, device_id: &str) -> String {
- let mut url_get_id = String::new();
- url_get_id.push_str("https://");
- url_get_id.push_str(url_host);
- url_get_id.push_str("/identity/externalIds/c8y_Serial/");
- url_get_id.push_str(device_id);
-
- url_get_id
-}
-
-fn get_url_for_create_event(url_host: &str) -> String {
- let mut url_create_event = String::new();
- url_create_event.push_str("https://");
- url_create_event.push_str(url_host);
- url_create_event.push_str("/event/events/");
-
- url_create_event
-}
-
-fn get_url_for_event_binary_upload(url_host: &str, event_id: &str) -> String {
- let mut url_event_binary = get_url_for_create_event(url_host);
- url_event_binary.push_str(event_id);
- url_event_binary.push_str("/binaries");
-
- url_event_binary
-}
-
-async fn get_jwt_token(client: &Client) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
- let mut subscriber = client.subscribe(Topic::new("c8y/s/dat")?.filter()).await?;
-
- let () = client
- .publish(mqtt_client::Message::new(
- &Topic::new("c8y/s/uat")?,
- "".to_string(),
- ))
- .await?;
-
- let token_smartrest =
- match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await {
- Ok(Some(msg)) => msg.payload_str()?.to_string(),
- Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
- Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
- };
-
- Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
-}
-
#[cfg(test)]
mod tests {
use super::*;
- use mqtt_tests::with_timeout::{Maybe, WithTimeout};
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use test_case::test_case;
- const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000);
-
- #[tokio::test]
- #[serial_test::serial]
- async fn get_jwt_token_full_run() {
- let broker = mqtt_tests::test_mqtt_broker();
- let mut messages = broker.messages_published_on("c8y/s/uat").await;
-
- let publisher = Client::connect(
- "get_jwt_token_full_run",
- &mqtt_client::Config::default().with_port(broker.port),
- )
- .await
- .unwrap();
-
- // Setup listener stream to publish on first message received on topic `c8y/s/us`.
- let responder_task = tokio::spawn(async move {
- let msg = messages
- .recv()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No JWT request received.");
- assert_eq!(&msg, "");
-
- // After receiving successful message publish response with a custom 'token' on topic `c8y/s/dat`.
- let _ = broker.publish("c8y/s/dat", "71,1111").await;
- });
-
- // Wait till token received.
- let (jwt_token, _responder) = tokio::join!(get_jwt_token(&publisher), responder_task);
-
- // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`.
- assert!(jwt_token.is_ok());
- assert_eq!(jwt_token.unwrap().token(), "1111");
- }
-
- #[test]
- fn get_url_for_get_id_returns_correct_address() {
- let res = get_url_for_get_id("test_host", "test_device");
-
- assert_eq!(
- res,
- "https://test_host/identity/externalIds/c8y_Serial/test_device"
- );
- }
-
- #[test]
- fn get_url_for_sw_list_returns_correct_address() {
- let res = get_url_for_sw_list("test_host", "12345");
-
- assert_eq!(res, "https://test_host/inventory/managedObjects/12345");
- }
-
- #[test_case("http://aaa.test.com")]
- #[test_case("https://aaa.test.com")]
- #[test_case("ftp://aaa.test.com")]
- #[test_case("mqtt://aaa.test.com")]
- #[test_case("https://t1124124.test.com")]
- #[test_case("https://t1124124.test.com:12345")]
- #[test_case("https://t1124124.test.com/path")]
- #[test_case("https://t1124124.test.com/path/to/file.test")]
- #[test_case("https://t1124124.test.com/path/to/file")]
- fn url_is_my_tenant_correct_urls(url: &str) {
- assert!(url_is_in_my_tenant