From bde1f0f2a2d07184a26c68a97022b26c3961a47f Mon Sep 17 00:00:00 2001 From: Didier Wenzek Date: Mon, 6 Dec 2021 18:00:57 +0000 Subject: Refactor the sm_c8y mapper extracting http code (#659) * Move out any HTTP related code from the MQTT mapper to an HTTP proxy * Update the tests * Simplify the jwt tests with a background generator * Reuse the reqwest client for all requests * Rename `JwtAuthHttpProxy` the main `C8yHttpProxy` implementation * Fix typo Co-authored-by: Wenzek --- .../core/tedge_mapper/src/sm_c8y_mapper/error.rs | 4 +- .../tedge_mapper/src/sm_c8y_mapper/http_proxy.rs | 364 +++++++++++++++++++++ .../core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 364 ++------------------- crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs | 4 +- .../core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 70 +++- 5 files changed, 473 insertions(+), 333 deletions(-) create mode 100644 crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper') diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs index 5cca31b4..296f17e4 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs @@ -1,13 +1,13 @@ use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError}; #[derive(thiserror::Error, Debug)] -pub(crate) enum MapperTopicError { +pub enum MapperTopicError { #[error("Topic {topic} is unknown.")] UnknownTopic { topic: String }, } #[derive(thiserror::Error, Debug)] -pub(crate) enum SMCumulocityMapperError { +pub enum SMCumulocityMapperError { #[error("Invalid MQTT Message.")] InvalidMqttMessage, diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs new file mode 100644 index 00000000..277b0a11 --- /dev/null +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/http_proxy.rs @@ -0,0 +1,364 @@ +use crate::sm_c8y_mapper::error::SMCumulocityMapperError; +use crate::sm_c8y_mapper::json_c8y::{ + C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, +}; +use crate::sm_c8y_mapper::mapper::SmartRestLogEvent; +use async_trait::async_trait; +use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; +use chrono::{DateTime, Local}; +use mqtt_client::{Client, MqttClient, Topic}; +use reqwest::Url; +use std::time::Duration; +use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig}; +use tracing::{error, info, instrument}; + +const RETRY_TIMEOUT_SECS: u64 = 60; + +/// An HttpProxy handles http requests to C8y on behalf of the device. +#[async_trait] +pub trait C8YHttpProxy { + async fn init(&mut self) -> Result<(), SMCumulocityMapperError>; + + fn url_is_in_my_tenant_domain(&self, url: &str) -> bool; + + async fn get_jwt_token(&self) -> Result; + + async fn send_software_list_http( + &self, + c8y_software_list: &C8yUpdateSoftwareListResponse, + ) -> Result<(), SMCumulocityMapperError>; + + async fn upload_log_binary(&self, log_content: &str) + -> Result; +} + +/// Define a C8y endpoint +pub struct C8yEndPoint { + c8y_host: String, + device_id: String, + c8y_internal_id: String, +} + +impl C8yEndPoint { + fn new(c8y_host: &str, device_id: &str, c8y_internal_id: &str) -> C8yEndPoint { + C8yEndPoint { + c8y_host: c8y_host.into(), + device_id: device_id.into(), + c8y_internal_id: c8y_internal_id.into(), + } + } + + fn get_url_for_sw_list(&self) -> String { + let mut url_update_swlist = String::new(); + url_update_swlist.push_str("https://"); + url_update_swlist.push_str(&self.c8y_host); + url_update_swlist.push_str("/inventory/managedObjects/"); + url_update_swlist.push_str(&self.c8y_internal_id); + + url_update_swlist + } + + fn get_url_for_get_id(&self) -> String { + let mut url_get_id = String::new(); + url_get_id.push_str("https://"); + url_get_id.push_str(&self.c8y_host); + url_get_id.push_str("/identity/externalIds/c8y_Serial/"); + url_get_id.push_str(&self.device_id); + + url_get_id + } + + fn get_url_for_create_event(&self) -> String { + let mut url_create_event = String::new(); + url_create_event.push_str("https://"); + url_create_event.push_str(&self.c8y_host); + url_create_event.push_str("/event/events/"); + + url_create_event + } + + fn get_url_for_event_binary_upload(&self, event_id: &str) -> String { + let mut url_event_binary = self.get_url_for_create_event(); + url_event_binary.push_str(event_id); + url_event_binary.push_str("/binaries"); + + url_event_binary + } + + fn url_is_in_my_tenant_domain(&self, url: &str) -> bool { + // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options: + // * . eg: sample.c8y.io + // * . eg: t12345.c8y.io + // These URLs may be both equivalent and point to the same tenant. + // We are going to remove that and only check if the domain is the same. + let tenant_uri = &self.c8y_host; + let url_host = match Url::parse(url) { + Ok(url) => match url.host() { + Some(host) => host.to_string(), + None => return false, + }, + Err(_err) => { + return false; + } + }; + + let url_domain = url_host.splitn(2, '.').collect::>(); + let tenant_domain = tenant_uri.splitn(2, '.').collect::>(); + if url_domain.get(1) == tenant_domain.get(1) { + return true; + } + false + } +} + +/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device +/// +/// - Keep the connection info to c8y and the internal Id of the device +/// - Handle JWT requests +pub struct JwtAuthHttpProxy { + mqtt_con: Client, + http_con: reqwest::Client, + end_point: C8yEndPoint, +} + +impl JwtAuthHttpProxy { + pub fn new( + mqtt_con: Client, + http_con: reqwest::Client, + c8y_host: &str, + device_id: &str, + ) -> JwtAuthHttpProxy { + JwtAuthHttpProxy { + mqtt_con, + http_con, + end_point: C8yEndPoint { + c8y_host: c8y_host.into(), + device_id: device_id.into(), + c8y_internal_id: "".into(), + }, + } + } + + pub fn try_new( + mqtt_con: Client, + tedge_config: &TEdgeConfig, + ) -> Result { + let c8y_host = tedge_config.query_string(C8yUrlSetting)?; + let device_id = tedge_config.query_string(DeviceIdSetting)?; + let http_con = reqwest::ClientBuilder::new().build()?; + Ok(JwtAuthHttpProxy::new( + mqtt_con, http_con, &c8y_host, &device_id, + )) + } + + async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { + let token = self.get_jwt_token().await?; + let url_get_id = self.end_point.get_url_for_get_id(); + + self.end_point.c8y_internal_id = self + .try_get_internal_id(&url_get_id, &token.token()) + .await?; + + Ok(()) + } + + async fn try_get_internal_id( + &self, + url_get_id: &str, + token: &str, + ) -> Result { + let internal_id = self + .http_con + .get(url_get_id) + .bearer_auth(token) + .send() + .await?; + let internal_id_response = internal_id.json::().await?; + + let internal_id = internal_id_response.id(); + Ok(internal_id) + } + + /// Make a POST request to /event/events and return the event id from response body. + /// The event id is used to upload the binary. + fn create_log_event(&self) -> C8yCreateEvent { + let local: DateTime = Local::now(); + + let c8y_managed_object = C8yManagedObject { + id: self.end_point.c8y_internal_id.clone(), + }; + + C8yCreateEvent::new( + c8y_managed_object.to_owned(), + "c8y_Logfile", + &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + "software-management", + ) + } + + async fn get_event_id( + &self, + c8y_event: C8yCreateEvent, + ) -> Result { + let token = self.get_jwt_token().await?; + let create_event_url = self.end_point.get_url_for_create_event(); + + let request = self + .http_con + .post(create_event_url) + .json(&c8y_event) + .bearer_auth(token.token()) + .header("Accept", "application/json") + .timeout(Duration::from_millis(10000)) + .build()?; + + let response = self.http_con.execute(request).await?; + let event_response_body = response.json::().await?; + + Ok(event_response_body.id) + } +} + +#[async_trait] +impl C8YHttpProxy for JwtAuthHttpProxy { + fn url_is_in_my_tenant_domain(&self, url: &str) -> bool { + self.end_point.url_is_in_my_tenant_domain(url) + } + + #[instrument(skip(self), name = "init")] + async fn init(&mut self) -> Result<(), SMCumulocityMapperError> { + info!("Initialisation"); + while self.end_point.c8y_internal_id.is_empty() { + if let Err(error) = self.try_get_and_set_internal_id().await { + error!( + "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}", + RETRY_TIMEOUT_SECS, error + ); + + tokio::time::sleep(Duration::from_secs(RETRY_TIMEOUT_SECS)).await; + continue; + }; + } + info!("Initialisation done."); + Ok(()) + } + + async fn get_jwt_token(&self) -> Result { + let mut subscriber = self + .mqtt_con + .subscribe(Topic::new("c8y/s/dat")?.filter()) + .await?; + + let () = self + .mqtt_con + .publish(mqtt_client::Message::new( + &Topic::new("c8y/s/uat")?, + "".to_string(), + )) + .await?; + + let token_smartrest = + match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await { + Ok(Some(msg)) => msg.payload_str()?.to_string(), + Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), + Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), + }; + + Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) + } + + async fn send_software_list_http( + &self, + c8y_software_list: &C8yUpdateSoftwareListResponse, + ) -> Result<(), SMCumulocityMapperError> { + let url = self.end_point.get_url_for_sw_list(); + let token = self.get_jwt_token().await?; + + let request = self + .http_con + .put(url) + .json(c8y_software_list) + .bearer_auth(&token.token()) + .timeout(Duration::from_millis(10000)) + .build()?; + + let _response = self.http_con.execute(request).await?; + + Ok(()) + } + + async fn upload_log_binary( + &self, + log_content: &str, + ) -> Result { + let token = self.get_jwt_token().await?; + + let log_event = self.create_log_event(); + let event_response_id = self.get_event_id(log_event).await?; + let binary_upload_event_url = self + .end_point + .get_url_for_event_binary_upload(&event_response_id); + + let request = self + .http_con + .post(&binary_upload_event_url) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(log_content.to_string()) + .bearer_auth(token.token()) + .timeout(Duration::from_millis(10000)) + .build()?; + + let _response = self.http_con.execute(request).await?; + Ok(binary_upload_event_url) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use test_case::test_case; + + #[test] + fn get_url_for_get_id_returns_correct_address() { + let c8y = C8yEndPoint::new("test_host", "test_device", "internal-id"); + let res = c8y.get_url_for_get_id(); + + assert_eq!( + res, + "https://test_host/identity/externalIds/c8y_Serial/test_device" + ); + } + + #[test] + fn get_url_for_sw_list_returns_correct_address() { + let c8y = C8yEndPoint::new("test_host", "test_device", "12345"); + let res = c8y.get_url_for_sw_list(); + + assert_eq!(res, "https://test_host/inventory/managedObjects/12345"); + } + + #[test_case("http://aaa.test.com")] + #[test_case("https://aaa.test.com")] + #[test_case("ftp://aaa.test.com")] + #[test_case("mqtt://aaa.test.com")] + #[test_case("https://t1124124.test.com")] + #[test_case("https://t1124124.test.com:12345")] + #[test_case("https://t1124124.test.com/path")] + #[test_case("https://t1124124.test.com/path/to/file.test")] + #[test_case("https://t1124124.test.com/path/to/file")] + fn url_is_my_tenant_correct_urls(url: &str) { + let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); + assert!(c8y.url_is_in_my_tenant_domain(url)); + } + + #[test_case("test.com")] + #[test_case("http://test.co")] + #[test_case("http://test.co.te")] + #[test_case("http://test.com:123456")] + #[test_case("http://test.com::12345")] + fn url_is_my_tenant_incorrect_urls(url: &str) { + let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); + assert!(!c8y.url_is_in_my_tenant_domain(url)); + } +} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index e838c508..3c064e90 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -1,37 +1,33 @@ +use crate::component::TEdgeComponent; use crate::mapper::mqtt_config; -use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject}; +use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*}; -use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse}; use async_trait::async_trait; use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest}; use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_smartrest::{ error::SmartRestDeserializerError, - smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware}, + smartrest_deserializer::SmartRestUpdateSoftware, smartrest_serializer::{ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType, SmartRestSetSupportedOperations, }, }; -use chrono::{DateTime, FixedOffset, Local}; +use chrono::{DateTime, FixedOffset}; use json_sm::{ Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, }; use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter}; -use reqwest::Url; use serde::{Deserialize, Serialize}; +use std::convert::TryInto; use std::path::PathBuf; -use std::{convert::TryInto, time::Duration}; -use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig}; -use tokio::time::Instant; +use tedge_config::TEdgeConfig; use tracing::{debug, error, info, instrument}; const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; -const RETRY_TIMEOUT_SECS: u64 = 60; - pub struct CumulocitySoftwareManagementMapper {} impl CumulocitySoftwareManagementMapper { @@ -46,31 +42,32 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_config = mqtt_config(&tedge_config)?; let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?; + let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config); + let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?; + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); let messages = sm_mapper.subscribe().await?; - let () = sm_mapper.init().await?; let () = sm_mapper.run(messages).await?; Ok(()) } } -#[derive(Debug)] -pub struct CumulocitySoftwareManagement { +pub struct CumulocitySoftwareManagement +where + Proxy: C8YHttpProxy, +{ pub client: Client, - config: TEdgeConfig, - c8y_internal_id: String, + http_proxy: Proxy, } -impl CumulocitySoftwareManagement { - pub fn new(client: Client, config: TEdgeConfig) -> Self { - Self { - client, - config, - c8y_internal_id: "".into(), - } +impl CumulocitySoftwareManagement +where + Proxy: C8YHttpProxy, +{ + pub fn new(client: Client, http_proxy: Proxy) -> Self { + Self { client, http_proxy } } pub async fn subscribe(&self) -> Result, anyhow::Error> { @@ -83,27 +80,13 @@ impl CumulocitySoftwareManagement { Ok(messages) } - #[instrument(skip(self), name = "init")] - async fn init(&mut self) -> Result<(), anyhow::Error> { + pub async fn run( + &mut self, + mut messages: Box, + ) -> Result<(), anyhow::Error> { info!("Initialisation"); - while self.c8y_internal_id.is_empty() { - if let Err(error) = self.try_get_and_set_internal_id().await { - error!( - "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}", - RETRY_TIMEOUT_SECS, error - ); - - tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS)) - .await; - continue; - }; - } - info!("Initialisation done."); - - Ok(()) - } + let () = self.http_proxy.init().await?; - pub async fn run(&self, mut messages: Box) -> Result<(), anyhow::Error> { info!("Running"); let () = self.publish_supported_operations().await?; let () = self.publish_supported_log_types().await?; @@ -348,22 +331,13 @@ impl CumulocitySoftwareManagement { // 2. read logs let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?; - // 3. create log event - let token = get_jwt_token(&self.client).await?; - let url_host = self.config.query_string(C8yUrlSetting)?; - - let c8y_managed_object = C8yManagedObject { - id: self.c8y_internal_id.clone(), - }; - let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?; - - // 4. upload log file - let binary_upload_event_url = - get_url_for_event_binary_upload(&url_host, &event_response_id); - - let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?; + // 3. upload log file + let binary_upload_event_url = self + .http_proxy + .upload_log_binary(&log_output.as_str()) + .await?; - // 5. set log file request to done + // 4. set log file request to done let () = self .set_log_file_request_done(&binary_upload_event_url) .await?; @@ -383,8 +357,7 @@ impl CumulocitySoftwareManagement { .from_smartrest(smartrest)? .to_thin_edge_json()?; - let token = get_jwt_token(&self.client).await?; - let tenant_uri = self.config.query_string(C8yUrlSetting)?; + let token = self.http_proxy.get_jwt_token().await?; software_update_request .update_list @@ -392,7 +365,7 @@ impl CumulocitySoftwareManagement { .for_each(|modules| { modules.modules.iter_mut().for_each(|module| { if let Some(url) = &module.url { - if url_is_in_my_tenant_domain(url.url(), &tenant_uri) { + if self.http_proxy.url_is_in_my_tenant_domain(url.url()) { module.url = module.url.as_ref().map(|s| { DownloadInfo::new(&s.url) .with_auth(Auth::new_bearer(&token.token())) @@ -436,57 +409,13 @@ impl CumulocitySoftwareManagement { &self, json_response: &SoftwareListResponse, ) -> Result<(), SMCumulocityMapperError> { - let token = get_jwt_token(&self.client).await?; - - let reqwest_client = reqwest::ClientBuilder::new().build()?; - - let url_host = self.config.query_string(C8yUrlSetting)?; - let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id); - let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into(); - - let _published = - publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list) - .await?; - - Ok(()) - } - - async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { - let token = get_jwt_token(&self.client).await?; - let reqwest_client = reqwest::ClientBuilder::new().build()?; - - let url_host = self.config.query_string(C8yUrlSetting)?; - let device_id = self.config.query_string(DeviceIdSetting)?; - let url_get_id = get_url_for_get_id(&url_host, &device_id); - - self.c8y_internal_id = - try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?; - - Ok(()) + self.http_proxy + .send_software_list_http(&c8y_software_list) + .await } } -async fn upload_log_binary( - token: &SmartRestJwtResponse, - binary_upload_event_url: &str, - log_content: &str, -) -> Result<(), SMCumulocityMapperError> { - let client = reqwest::ClientBuilder::new().build()?; - - let request = client - .post(binary_upload_event_url) - .header("Accept", "application/json") - .header("Content-Type", "text/plain") - .body(log_content.to_string()) - .bearer_auth(token.token()) - .timeout(Duration::from_millis(10000)) - .build()?; - - let _response = client.execute(request).await?; - Ok(()) -} - #[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] /// used to retrieve the id of a log event @@ -494,40 +423,6 @@ pub struct SmartRestLogEvent { pub id: String, } -/// Make a POST request to /event/events and return the event id from response body. -/// The event id is used to upload the binary. -async fn create_log_event( - url_host: &str, - c8y_managed_object: &C8yManagedObject, - token: &SmartRestJwtResponse, -) -> Result { - let client = reqwest::ClientBuilder::new().build()?; - - let create_event_url = get_url_for_create_event(&url_host); - - let local: DateTime = Local::now(); - - let c8y_log_event = C8yCreateEvent::new( - c8y_managed_object.to_owned(), - "c8y_Logfile", - &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - "software-management", - ); - - let request = client - .post(create_event_url) - .json(&c8y_log_event) - .bearer_auth(token.token()) - .header("Accept", "application/json") - .timeout(Duration::from_millis(10000)) - .build()?; - - let response = client.execute(request).await?; - let event_response_body = response.json::().await?; - - Ok(event_response_body.id) -} - /// Returns a date time object from a file path or file-path-like string /// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" /// @@ -642,201 +537,14 @@ fn read_tedge_logs( Ok(output) } -fn url_is_in_my_tenant_domain(url: &str, tenant_uri: &str) -> bool { - // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options: - // * . eg: sample.c8y.io - // * . eg: t12345.c8y.io - // These URLs may be both equivalent and point to the same tenant. - // We are going to remove that and only check if the domain is the same. - let url_host = match Url::parse(url) { - Ok(url) => match url.host() { - Some(host) => host.to_string(), - None => return false, - }, - Err(_err) => { - return false; - } - }; - - let url_domain = url_host.splitn(2, '.').collect::>(); - let tenant_domain = tenant_uri.splitn(2, '.').collect::>(); - if url_domain.get(1) == tenant_domain.get(1) { - return true; - } - false -} - -async fn publish_software_list_http( - client: &reqwest::Client, - url: &str, - token: &str, - list: &C8yUpdateSoftwareListResponse, -) -> Result<(), SMCumulocityMapperError> { - let request = client - .put(url) - .json(list) - .bearer_auth(token) - .timeout(Duration::from_millis(10000)) - .build()?; - - let _response = client.execute(request).await?; - - Ok(()) -} - -async fn try_get_internal_id( - client: &reqwest::Client, - url_get_id: &str, - token: &str, -) -> Result { - let internal_id = client.get(url_get_id).bearer_auth(token).send().await?; - let internal_id_response = internal_id.json::().await?; - - let internal_id = internal_id_response.id(); - Ok(internal_id) -} - -fn get_url_for_sw_list(url_host: &str, internal_id: &str) -> String { - let mut url_update_swlist = String::new(); - url_update_swlist.push_str("https://"); - url_update_swlist.push_str(url_host); - url_update_swlist.push_str("/inventory/managedObjects/"); - url_update_swlist.push_str(internal_id); - - url_update_swlist -} - -fn get_url_for_get_id(url_host: &str, device_id: &str) -> String { - let mut url_get_id = String::new(); - url_get_id.push_str("https://"); - url_get_id.push_str(url_host); - url_get_id.push_str("/identity/externalIds/c8y_Serial/"); - url_get_id.push_str(device_id); - - url_get_id -} - -fn get_url_for_create_event(url_host: &str) -> String { - let mut url_create_event = String::new(); - url_create_event.push_str("https://"); - url_create_event.push_str(url_host); - url_create_event.push_str("/event/events/"); - - url_create_event -} - -fn get_url_for_event_binary_upload(url_host: &str, event_id: &str) -> String { - let mut url_event_binary = get_url_for_create_event(url_host); - url_event_binary.push_str(event_id); - url_event_binary.push_str("/binaries"); - - url_event_binary -} - -async fn get_jwt_token(client: &Client) -> Result { - let mut subscriber = client.subscribe(Topic::new("c8y/s/dat")?.filter()).await?; - - let () = client - .publish(mqtt_client::Message::new( - &Topic::new("c8y/s/uat")?, - "".to_string(), - )) - .await?; - - let token_smartrest = - match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await { - Ok(Some(msg)) => msg.payload_str()?.to_string(), - Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), - Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), - }; - - Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) -} - #[cfg(test)] mod tests { use super::*; - use mqtt_tests::with_timeout::{Maybe, WithTimeout}; use std::fs::File; use std::io::Write; use std::str::FromStr; use test_case::test_case; - const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000); - - #[tokio::test] - #[serial_test::serial] - async fn get_jwt_token_full_run() { - let broker = mqtt_tests::test_mqtt_broker(); - let mut messages = broker.messages_published_on("c8y/s/uat").await; - - let publisher = Client::connect( - "get_jwt_token_full_run", - &mqtt_client::Config::default().with_port(broker.port), - ) - .await - .unwrap(); - - // Setup listener stream to publish on first message received on topic `c8y/s/us`. - let responder_task = tokio::spawn(async move { - let msg = messages - .recv() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No JWT request received."); - assert_eq!(&msg, ""); - - // After receiving successful message publish response with a custom 'token' on topic `c8y/s/dat`. - let _ = broker.publish("c8y/s/dat", "71,1111").await; - }); - - // Wait till token received. - let (jwt_token, _responder) = tokio::join!(get_jwt_token(&publisher), responder_task); - - // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`. - assert!(jwt_token.is_ok()); - assert_eq!(jwt_token.unwrap().token(), "1111"); - } - - #[test] - fn get_url_for_get_id_returns_correct_address() { - let res = get_url_for_get_id("test_host", "test_device"); - - assert_eq!( - res, - "https://test_host/identity/externalIds/c8y_Serial/test_device" - ); - } - - #[test] - fn get_url_for_sw_list_returns_correct_address() { - let res = get_url_for_sw_list("test_host", "12345"); - - assert_eq!(res, "https://test_host/inventory/managedObjects/12345"); - } - - #[test_case("http://aaa.test.com")] - #[test_case("https://aaa.test.com")] - #[test_case("ftp://aaa.test.com")] - #[test_case("mqtt://aaa.test.com")] - #[test_case("https://t1124124.test.com")] - #[test_case("https://t1124124.test.com:12345")] - #[test_case("https://t1124124.test.com/path")] - #[test_case("https://t1124124.test.com/path/to/file.test")] - #[test_case("https://t1124124.test.com/path/to/file")] - fn url_is_my_tenant_correct_urls(url: &str) { - assert!(url_is_in_my_tenant_domain(url, "test.test.com")); - } - - #[test_case("test.com")] - #[test_case("http://test.co")] - #[test_case("http://test.co.te")] - #[test_case("http://test.com:123456")] - #[test_case("http://test.com::12345")] - fn url_is_my_tenant_incorrect_urls(url: &str) { - assert!(!url_is_in_my_tenant_domain(url, "test.test.com")); - } - #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs index fda2aa9d..10159677 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs @@ -1,6 +1,8 @@ mod error; +mod http_proxy; mod json_c8y; pub mod mapper; +mod topic; + #[cfg(test)] mod tests; -mod topic; diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs index bd3515e8..7f471ef2 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs @@ -1,4 +1,8 @@ +use crate::sm_c8y_mapper::error::SMCumulocityMapperError; +use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use crate::sm_c8y_mapper::json_c8y::C8yUpdateSoftwareListResponse; use crate::sm_c8y_mapper::mapper::CumulocitySoftwareManagement; +use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; use mqtt_client::Client; use mqtt_tests::test_mqtt_server::MqttProcessHandler; use mqtt_tests::with_timeout::{Maybe, WithTimeout}; @@ -388,6 +392,36 @@ async fn mapper_publishes_software_update_request_with_wrong_action() { .await; } +#[tokio::test] +#[serial_test::serial] +async fn get_jwt_token_full_run() { + // Given a background process that publish JWT tokens on demand. + let broker = mqtt_tests::test_mqtt_broker(); + broker.map_messages_background(|(topic, _)| { + let mut response = vec![]; + if &topic == "c8y/s/uat" { + response.push(("c8y/s/dat".into(), "71,1111".into())); + } + response + }); + + // An JwtAuthHttpProxy ... + let mqtt_config = mqtt_client::Config::default().with_port(broker.port); + let mqtt_client = Client::connect("JWT-Requester-Test", &mqtt_config) + .await + .unwrap(); + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let http_proxy = + JwtAuthHttpProxy::new(mqtt_client, http_client, "test.tenant.com", "test-device"); + + // ... fetches and returns these JWT tokens. + let jwt_token = http_proxy.get_jwt_token().await; + + // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`. + assert!(jwt_token.is_ok()); + assert_eq!(jwt_token.unwrap().token(), "1111"); +} + fn create_tedge_config(mqtt_port: u16) -> TEdgeConfig { // Create a config file in a temporary directory. let temp_dir = tempfile::tempdir().unwrap(); @@ -425,10 +459,11 @@ fn remove_whitespace(s: &str) -> String { } async fn start_sm_mapper(mqtt_port: u16) -> Result, anyhow::Error> { - let tedge_config = create_tedge_config(mqtt_port); let mqtt_config = mqtt_client::Config::default().with_port(mqtt_port); + let mqtt_client = Client::connect("SM-C8Y-Mapper-Test", &mqtt_config).await?; - let sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config); + let http_proxy = FakeC8YHttpProxy {}; + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); let messages = sm_mapper.subscribe().await?; let mapper_task = tokio::spawn(async move { @@ -440,3 +475,34 @@ async fn start_sm_mapper(mqtt_port: u16) -> Result, anyhow::Error async fn publish_a_fake_jwt_token(broker: &MqttProcessHandler) { let _ = broker.publish("c8y/s/dat", "71,1111").await.unwrap(); } + +struct FakeC8YHttpProxy {} + +#[async_trait::async_trait] +impl C8YHttpProxy for FakeC8YHttpProxy { + async fn init(&mut self) -> Result<(), SMCumulocityMapperError> { + Ok(()) + } + + fn url_is_in_my_tenant_domain(&self, _url: &str) -> bool { + true + } + + async fn get_jwt_token(&self) -> Result { + Ok(SmartRestJwtResponse::try_new("71,fake-token")?) + } + + async fn send_software_list_http( + &self, + _c8y_software_list: &C8yUpdateSoftwareListResponse, + ) -> Result<(), SMCumulocityMapperError> { + Ok(()) + } + + async fn upload_log_binary( + &self, + _log_content: &str, + ) -> Result { + Ok("fake/upload/url".into()) + } +} -- cgit v1.2.3