diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 364 |
1 files changed, 36 insertions, 328 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs index e838c508..3c064e90 100644 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs @@ -1,37 +1,33 @@ +use crate::component::TEdgeComponent; use crate::mapper::mqtt_config; -use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject}; +use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*}; -use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse}; use async_trait::async_trait; use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest}; use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations; use c8y_smartrest::{ error::SmartRestDeserializerError, - smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware}, + smartrest_deserializer::SmartRestUpdateSoftware, smartrest_serializer::{ SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType, SmartRestSetSupportedOperations, }, }; -use chrono::{DateTime, FixedOffset, Local}; +use chrono::{DateTime, FixedOffset}; use json_sm::{ Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, }; use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter}; -use reqwest::Url; use serde::{Deserialize, Serialize}; +use std::convert::TryInto; use std::path::PathBuf; -use std::{convert::TryInto, time::Duration}; -use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig}; -use tokio::time::Instant; +use tedge_config::TEdgeConfig; use tracing::{debug, error, info, instrument}; const AGENT_LOG_DIR: &str = "/var/log/tedge/agent"; -const RETRY_TIMEOUT_SECS: u64 = 60; - pub struct CumulocitySoftwareManagementMapper {} impl CumulocitySoftwareManagementMapper { @@ -46,31 +42,32 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_config = mqtt_config(&tedge_config)?; let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?; + let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config); + let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?; + let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy); let messages = sm_mapper.subscribe().await?; - let () = sm_mapper.init().await?; let () = sm_mapper.run(messages).await?; Ok(()) } } -#[derive(Debug)] -pub struct CumulocitySoftwareManagement { +pub struct CumulocitySoftwareManagement<Proxy> +where + Proxy: C8YHttpProxy, +{ pub client: Client, - config: TEdgeConfig, - c8y_internal_id: String, + http_proxy: Proxy, } -impl CumulocitySoftwareManagement { - pub fn new(client: Client, config: TEdgeConfig) -> Self { - Self { - client, - config, - c8y_internal_id: "".into(), - } +impl<Proxy> CumulocitySoftwareManagement<Proxy> +where + Proxy: C8YHttpProxy, +{ + pub fn new(client: Client, http_proxy: Proxy) -> Self { + Self { client, http_proxy } } pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> { @@ -83,27 +80,13 @@ impl CumulocitySoftwareManagement { Ok(messages) } - #[instrument(skip(self), name = "init")] - async fn init(&mut self) -> Result<(), anyhow::Error> { + pub async fn run( + &mut self, + mut messages: Box<dyn MqttMessageStream>, + ) -> Result<(), anyhow::Error> { info!("Initialisation"); - while self.c8y_internal_id.is_empty() { - if let Err(error) = self.try_get_and_set_internal_id().await { - error!( - "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}", - RETRY_TIMEOUT_SECS, error - ); - - tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS)) - .await; - continue; - }; - } - info!("Initialisation done."); - - Ok(()) - } + let () = self.http_proxy.init().await?; - pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> { info!("Running"); let () = self.publish_supported_operations().await?; let () = self.publish_supported_log_types().await?; @@ -348,22 +331,13 @@ impl CumulocitySoftwareManagement { // 2. read logs let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?; - // 3. create log event - let token = get_jwt_token(&self.client).await?; - let url_host = self.config.query_string(C8yUrlSetting)?; - - let c8y_managed_object = C8yManagedObject { - id: self.c8y_internal_id.clone(), - }; - let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?; - - // 4. upload log file - let binary_upload_event_url = - get_url_for_event_binary_upload(&url_host, &event_response_id); - - let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?; + // 3. upload log file + let binary_upload_event_url = self + .http_proxy + .upload_log_binary(&log_output.as_str()) + .await?; - // 5. set log file request to done + // 4. set log file request to done let () = self .set_log_file_request_done(&binary_upload_event_url) .await?; @@ -383,8 +357,7 @@ impl CumulocitySoftwareManagement { .from_smartrest(smartrest)? .to_thin_edge_json()?; - let token = get_jwt_token(&self.client).await?; - let tenant_uri = self.config.query_string(C8yUrlSetting)?; + let token = self.http_proxy.get_jwt_token().await?; software_update_request .update_list @@ -392,7 +365,7 @@ impl CumulocitySoftwareManagement { .for_each(|modules| { modules.modules.iter_mut().for_each(|module| { if let Some(url) = &module.url { - if url_is_in_my_tenant_domain(url.url(), &tenant_uri) { + if self.http_proxy.url_is_in_my_tenant_domain(url.url()) { module.url = module.url.as_ref().map(|s| { DownloadInfo::new(&s.url) .with_auth(Auth::new_bearer(&token.token())) @@ -436,57 +409,13 @@ impl CumulocitySoftwareManagement { &self, json_response: &SoftwareListResponse, ) -> Result<(), SMCumulocityMapperError> { - let token = get_jwt_token(&self.client).await?; - - let reqwest_client = reqwest::ClientBuilder::new().build()?; - - let url_host = self.config.query_string(C8yUrlSetting)?; - let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id); - let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into(); - - let _published = - publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list) - .await?; - - Ok(()) - } - - async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { - let token = get_jwt_token(&self.client).await?; - let reqwest_client = reqwest::ClientBuilder::new().build()?; - - let url_host = self.config.query_string(C8yUrlSetting)?; - let device_id = self.config.query_string(DeviceIdSetting)?; - let url_get_id = get_url_for_get_id(&url_host, &device_id); - - self.c8y_internal_id = - try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?; - - Ok(()) + self.http_proxy + .send_software_list_http(&c8y_software_list) + .await } } -async fn upload_log_binary( - token: &SmartRestJwtResponse, - binary_upload_event_url: &str, - log_content: &str, -) -> Result<(), SMCumulocityMapperError> { - let client = reqwest::ClientBuilder::new().build()?; - - let request = client - .post(binary_upload_event_url) - .header("Accept", "application/json") - .header("Content-Type", "text/plain") - .body(log_content.to_string()) - .bearer_auth(token.token()) - .timeout(Duration::from_millis(10000)) - .build()?; - - let _response = client.execute(request).await?; - Ok(()) -} - #[derive(Debug, Deserialize, Serialize, PartialEq)] #[serde(rename_all = "camelCase")] /// used to retrieve the id of a log event @@ -494,40 +423,6 @@ pub struct SmartRestLogEvent { pub id: String, } -/// Make a POST request to /event/events and return the event id from response body. -/// The event id is used to upload the binary. -async fn create_log_event( - url_host: &str, - c8y_managed_object: &C8yManagedObject, - token: &SmartRestJwtResponse, -) -> Result<String, SMCumulocityMapperError> { - let client = reqwest::ClientBuilder::new().build()?; - - let create_event_url = get_url_for_create_event(&url_host); - - let local: DateTime<Local> = Local::now(); - - let c8y_log_event = C8yCreateEvent::new( - c8y_managed_object.to_owned(), - "c8y_Logfile", - &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - "software-management", - ); - - let request = client - .post(create_event_url) - .json(&c8y_log_event) - .bearer_auth(token.token()) - .header("Accept", "application/json") - .timeout(Duration::from_millis(10000)) - .build()?; - - let response = client.execute(request).await?; - let event_response_body = response.json::<SmartRestLogEvent>().await?; - - Ok(event_response_body.id) -} - /// Returns a date time object from a file path or file-path-like string /// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z" /// @@ -642,201 +537,14 @@ fn read_tedge_logs( Ok(output) } -fn url_is_in_my_tenant_domain(url: &str, tenant_uri: &str) -> bool { - // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options: - // * <tenant_name>.<domain> eg: sample.c8y.io - // * <tenant_id>.<domain> eg: t12345.c8y.io - // These URLs may be both equivalent and point to the same tenant. - // We are going to remove that and only check if the domain is the same. - let url_host = match Url::parse(url) { - Ok(url) => match url.host() { - Some(host) => host.to_string(), - None => return false, - }, - Err(_err) => { - return false; - } - }; - - let url_domain = url_host.splitn(2, '.').collect::<Vec<&str>>(); - let tenant_domain = tenant_uri.splitn(2, '.').collect::<Vec<&str>>(); - if url_domain.get(1) == tenant_domain.get(1) { - return true; - } - false -} - -async fn publish_software_list_http( - client: &reqwest::Client, - url: &str, - token: &str, - list: &C8yUpdateSoftwareListResponse, -) -> Result<(), SMCumulocityMapperError> { - let request = client - .put(url) - .json(list) - .bearer_auth(token) - .timeout(Duration::from_millis(10000)) - .build()?; - - let _response = client.execute(request).await?; - - Ok(()) -} - -async fn try_get_internal_id( - client: &reqwest::Client, - url_get_id: &str, - token: &str, -) -> Result<String, SMCumulocityMapperError> { - let internal_id = client.get(url_get_id).bearer_auth(token).send().await?; - let internal_id_response = internal_id.json::<InternalIdResponse>().await?; - - let internal_id = internal_id_response.id(); - Ok(internal_id) -} - -fn get_url_for_sw_list(url_host: &str, internal_id: &str) -> String { - let mut url_update_swlist = String::new(); - url_update_swlist.push_str("https://"); - url_update_swlist.push_str(url_host); - url_update_swlist.push_str("/inventory/managedObjects/"); - url_update_swlist.push_str(internal_id); - - url_update_swlist -} - -fn get_url_for_get_id(url_host: &str, device_id: &str) -> String { - let mut url_get_id = String::new(); - url_get_id.push_str("https://"); - url_get_id.push_str(url_host); - url_get_id.push_str("/identity/externalIds/c8y_Serial/"); - url_get_id.push_str(device_id); - - url_get_id -} - -fn get_url_for_create_event(url_host: &str) -> String { - let mut url_create_event = String::new(); - url_create_event.push_str("https://"); - url_create_event.push_str(url_host); - url_create_event.push_str("/event/events/"); - - url_create_event -} - -fn get_url_for_event_binary_upload(url_host: &str, event_id: &str) -> String { - let mut url_event_binary = get_url_for_create_event(url_host); - url_event_binary.push_str(event_id); - url_event_binary.push_str("/binaries"); - - url_event_binary -} - -async fn get_jwt_token(client: &Client) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> { - let mut subscriber = client.subscribe(Topic::new("c8y/s/dat")?.filter()).await?; - - let () = client - .publish(mqtt_client::Message::new( - &Topic::new("c8y/s/uat")?, - "".to_string(), - )) - .await?; - - let token_smartrest = - match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await { - Ok(Some(msg)) => msg.payload_str()?.to_string(), - Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), - Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), - }; - - Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) -} - #[cfg(test)] mod tests { use super::*; - use mqtt_tests::with_timeout::{Maybe, WithTimeout}; use std::fs::File; use std::io::Write; use std::str::FromStr; use test_case::test_case; - const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000); - - #[tokio::test] - #[serial_test::serial] - async fn get_jwt_token_full_run() { - let broker = mqtt_tests::test_mqtt_broker(); - let mut messages = broker.messages_published_on("c8y/s/uat").await; - - let publisher = Client::connect( - "get_jwt_token_full_run", - &mqtt_client::Config::default().with_port(broker.port), - ) - .await - .unwrap(); - - // Setup listener stream to publish on first message received on topic `c8y/s/us`. - let responder_task = tokio::spawn(async move { - let msg = messages - .recv() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No JWT request received."); - assert_eq!(&msg, ""); - - // After receiving successful message publish response with a custom 'token' on topic `c8y/s/dat`. - let _ = broker.publish("c8y/s/dat", "71,1111").await; - }); - - // Wait till token received. - let (jwt_token, _responder) = tokio::join!(get_jwt_token(&publisher), responder_task); - - // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`. - assert!(jwt_token.is_ok()); - assert_eq!(jwt_token.unwrap().token(), "1111"); - } - - #[test] - fn get_url_for_get_id_returns_correct_address() { - let res = get_url_for_get_id("test_host", "test_device"); - - assert_eq!( - res, - "https://test_host/identity/externalIds/c8y_Serial/test_device" - ); - } - - #[test] - fn get_url_for_sw_list_returns_correct_address() { - let res = get_url_for_sw_list("test_host", "12345"); - - assert_eq!(res, "https://test_host/inventory/managedObjects/12345"); - } - - #[test_case("http://aaa.test.com")] - #[test_case("https://aaa.test.com")] - #[test_case("ftp://aaa.test.com")] - #[test_case("mqtt://aaa.test.com")] - #[test_case("https://t1124124.test.com")] - #[test_case("https://t1124124.test.com:12345")] - #[test_case("https://t1124124.test.com/path")] - #[test_case("https://t1124124.test.com/path/to/file.test")] - #[test_case("https://t1124124.test.com/path/to/file")] - fn url_is_my_tenant_correct_urls(url: &str) { - assert!(url_is_in_my_tenant_domain(url, "test.test.com")); - } - - #[test_case("test.com")] - #[test_case("http://test.co")] - #[test_case("http://test.co.te")] - #[test_case("http://test.com:123456")] - #[test_case("http://test.com::12345")] - fn url_is_my_tenant_incorrect_urls(url: &str) { - assert!(!url_is_in_my_tenant_domain(url, "test.test.com")); - } - #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] |