summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs364
1 files changed, 36 insertions, 328 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
index e838c508..3c064e90 100644
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
@@ -1,37 +1,33 @@
+use crate::component::TEdgeComponent;
use crate::mapper::mqtt_config;
-use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
+use crate::sm_c8y_mapper::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
-use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
use async_trait::async_trait;
use c8y_smartrest::smartrest_deserializer::{SmartRestLogRequest, SmartRestRestartRequest};
use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_smartrest::{
error::SmartRestDeserializerError,
- smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware},
+ smartrest_deserializer::SmartRestUpdateSoftware,
smartrest_serializer::{
SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
},
};
-use chrono::{DateTime, FixedOffset, Local};
+use chrono::{DateTime, FixedOffset};
use json_sm::{
Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest,
RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
-use reqwest::Url;
use serde::{Deserialize, Serialize};
+use std::convert::TryInto;
use std::path::PathBuf;
-use std::{convert::TryInto, time::Duration};
-use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
-use tokio::time::Instant;
+use tedge_config::TEdgeConfig;
use tracing::{debug, error, info, instrument};
const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
-const RETRY_TIMEOUT_SECS: u64 = 60;
-
pub struct CumulocitySoftwareManagementMapper {}
impl CumulocitySoftwareManagementMapper {
@@ -46,31 +42,32 @@ impl TEdgeComponent for CumulocitySoftwareManagementMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_config = mqtt_config(&tedge_config)?;
let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
+ let mqtt_jwt_client = Client::connect("JWT-Requester", &mqtt_config).await?;
- let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config);
+ let http_proxy = JwtAuthHttpProxy::try_new(mqtt_jwt_client, &tedge_config)?;
+ let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy);
let messages = sm_mapper.subscribe().await?;
- let () = sm_mapper.init().await?;
let () = sm_mapper.run(messages).await?;
Ok(())
}
}
-#[derive(Debug)]
-pub struct CumulocitySoftwareManagement {
+pub struct CumulocitySoftwareManagement<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
pub client: Client,
- config: TEdgeConfig,
- c8y_internal_id: String,
+ http_proxy: Proxy,
}
-impl CumulocitySoftwareManagement {
- pub fn new(client: Client, config: TEdgeConfig) -> Self {
- Self {
- client,
- config,
- c8y_internal_id: "".into(),
- }
+impl<Proxy> CumulocitySoftwareManagement<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
+ pub fn new(client: Client, http_proxy: Proxy) -> Self {
+ Self { client, http_proxy }
}
pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
@@ -83,27 +80,13 @@ impl CumulocitySoftwareManagement {
Ok(messages)
}
- #[instrument(skip(self), name = "init")]
- async fn init(&mut self) -> Result<(), anyhow::Error> {
+ pub async fn run(
+ &mut self,
+ mut messages: Box<dyn MqttMessageStream>,
+ ) -> Result<(), anyhow::Error> {
info!("Initialisation");
- while self.c8y_internal_id.is_empty() {
- if let Err(error) = self.try_get_and_set_internal_id().await {
- error!(
- "An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
- RETRY_TIMEOUT_SECS, error
- );
-
- tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS))
- .await;
- continue;
- };
- }
- info!("Initialisation done.");
-
- Ok(())
- }
+ let () = self.http_proxy.init().await?;
- pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> {
info!("Running");
let () = self.publish_supported_operations().await?;
let () = self.publish_supported_log_types().await?;
@@ -348,22 +331,13 @@ impl CumulocitySoftwareManagement {
// 2. read logs
let log_output = read_tedge_logs(&smartrest_obj, AGENT_LOG_DIR)?;
- // 3. create log event
- let token = get_jwt_token(&self.client).await?;
- let url_host = self.config.query_string(C8yUrlSetting)?;
-
- let c8y_managed_object = C8yManagedObject {
- id: self.c8y_internal_id.clone(),
- };
- let event_response_id = create_log_event(&url_host, &c8y_managed_object, &token).await?;
-
- // 4. upload log file
- let binary_upload_event_url =
- get_url_for_event_binary_upload(&url_host, &event_response_id);
-
- let () = upload_log_binary(&token, &binary_upload_event_url, &log_output.as_str()).await?;
+ // 3. upload log file
+ let binary_upload_event_url = self
+ .http_proxy
+ .upload_log_binary(&log_output.as_str())
+ .await?;
- // 5. set log file request to done
+ // 4. set log file request to done
let () = self
.set_log_file_request_done(&binary_upload_event_url)
.await?;
@@ -383,8 +357,7 @@ impl CumulocitySoftwareManagement {
.from_smartrest(smartrest)?
.to_thin_edge_json()?;
- let token = get_jwt_token(&self.client).await?;
- let tenant_uri = self.config.query_string(C8yUrlSetting)?;
+ let token = self.http_proxy.get_jwt_token().await?;
software_update_request
.update_list
@@ -392,7 +365,7 @@ impl CumulocitySoftwareManagement {
.for_each(|modules| {
modules.modules.iter_mut().for_each(|module| {
if let Some(url) = &module.url {
- if url_is_in_my_tenant_domain(url.url(), &tenant_uri) {
+ if self.http_proxy.url_is_in_my_tenant_domain(url.url()) {
module.url = module.url.as_ref().map(|s| {
DownloadInfo::new(&s.url)
.with_auth(Auth::new_bearer(&token.token()))
@@ -436,57 +409,13 @@ impl CumulocitySoftwareManagement {
&self,
json_response: &SoftwareListResponse,
) -> Result<(), SMCumulocityMapperError> {
- let token = get_jwt_token(&self.client).await?;
-
- let reqwest_client = reqwest::ClientBuilder::new().build()?;
-
- let url_host = self.config.query_string(C8yUrlSetting)?;
- let url = get_url_for_sw_list(&url_host, &self.c8y_internal_id);
-
let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into();
-
- let _published =
- publish_software_list_http(&reqwest_client, &url, &token.token(), &c8y_software_list)
- .await?;
-
- Ok(())
- }
-
- async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
- let token = get_jwt_token(&self.client).await?;
- let reqwest_client = reqwest::ClientBuilder::new().build()?;
-
- let url_host = self.config.query_string(C8yUrlSetting)?;
- let device_id = self.config.query_string(DeviceIdSetting)?;
- let url_get_id = get_url_for_get_id(&url_host, &device_id);
-
- self.c8y_internal_id =
- try_get_internal_id(&reqwest_client, &url_get_id, &token.token()).await?;
-
- Ok(())
+ self.http_proxy
+ .send_software_list_http(&c8y_software_list)
+ .await
}
}
-async fn upload_log_binary(
- token: &SmartRestJwtResponse,
- binary_upload_event_url: &str,
- log_content: &str,
-) -> Result<(), SMCumulocityMapperError> {
- let client = reqwest::ClientBuilder::new().build()?;
-
- let request = client
- .post(binary_upload_event_url)
- .header("Accept", "application/json")
- .header("Content-Type", "text/plain")
- .body(log_content.to_string())
- .bearer_auth(token.token())
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let _response = client.execute(request).await?;
- Ok(())
-}
-
#[derive(Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
/// used to retrieve the id of a log event
@@ -494,40 +423,6 @@ pub struct SmartRestLogEvent {
pub id: String,
}
-/// Make a POST request to /event/events and return the event id from response body.
-/// The event id is used to upload the binary.
-async fn create_log_event(
- url_host: &str,
- c8y_managed_object: &C8yManagedObject,
- token: &SmartRestJwtResponse,
-) -> Result<String, SMCumulocityMapperError> {
- let client = reqwest::ClientBuilder::new().build()?;
-
- let create_event_url = get_url_for_create_event(&url_host);
-
- let local: DateTime<Local> = Local::now();
-
- let c8y_log_event = C8yCreateEvent::new(
- c8y_managed_object.to_owned(),
- "c8y_Logfile",
- &local.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
- "software-management",
- );
-
- let request = client
- .post(create_event_url)
- .json(&c8y_log_event)
- .bearer_auth(token.token())
- .header("Accept", "application/json")
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let response = client.execute(request).await?;
- let event_response_body = response.json::<SmartRestLogEvent>().await?;
-
- Ok(event_response_body.id)
-}
-
/// Returns a date time object from a file path or file-path-like string
/// a typical file stem looks like this: "software-list-2021-10-27T10:29:58Z"
///
@@ -642,201 +537,14 @@ fn read_tedge_logs(
Ok(output)
}
-fn url_is_in_my_tenant_domain(url: &str, tenant_uri: &str) -> bool {
- // c8y URL may contain either `Tenant Name` or Tenant Id` so they can be one of following options:
- // * <tenant_name>.<domain> eg: sample.c8y.io
- // * <tenant_id>.<domain> eg: t12345.c8y.io
- // These URLs may be both equivalent and point to the same tenant.
- // We are going to remove that and only check if the domain is the same.
- let url_host = match Url::parse(url) {
- Ok(url) => match url.host() {
- Some(host) => host.to_string(),
- None => return false,
- },
- Err(_err) => {
- return false;
- }
- };
-
- let url_domain = url_host.splitn(2, '.').collect::<Vec<&str>>();
- let tenant_domain = tenant_uri.splitn(2, '.').collect::<Vec<&str>>();
- if url_domain.get(1) == tenant_domain.get(1) {
- return true;
- }
- false
-}
-
-async fn publish_software_list_http(
- client: &reqwest::Client,
- url: &str,
- token: &str,
- list: &C8yUpdateSoftwareListResponse,
-) -> Result<(), SMCumulocityMapperError> {
- let request = client
- .put(url)
- .json(list)
- .bearer_auth(token)
- .timeout(Duration::from_millis(10000))
- .build()?;
-
- let _response = client.execute(request).await?;
-
- Ok(())
-}
-
-async fn try_get_internal_id(
- client: &reqwest::Client,
- url_get_id: &str,
- token: &str,
-) -> Result<String, SMCumulocityMapperError> {
- let internal_id = client.get(url_get_id).bearer_auth(token).send().await?;
- let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
-
- let internal_id = internal_id_response.id();
- Ok(internal_id)
-}
-
-fn get_url_for_sw_list(url_host: &str, internal_id: &str) -> String {
- let mut url_update_swlist = String::new();
- url_update_swlist.push_str("https://");
- url_update_swlist.push_str(url_host);
- url_update_swlist.push_str("/inventory/managedObjects/");
- url_update_swlist.push_str(internal_id);
-
- url_update_swlist
-}
-
-fn get_url_for_get_id(url_host: &str, device_id: &str) -> String {
- let mut url_get_id = String::new();
- url_get_id.push_str("https://");
- url_get_id.push_str(url_host);
- url_get_id.push_str("/identity/externalIds/c8y_Serial/");
- url_get_id.push_str(device_id);
-
- url_get_id
-}
-
-fn get_url_for_create_event(url_host: &str) -> String {
- let mut url_create_event = String::new();
- url_create_event.push_str("https://");
- url_create_event.push_str(url_host);
- url_create_event.push_str("/event/events/");
-
- url_create_event
-}
-
-fn get_url_for_event_binary_upload(url_host: &str, event_id: &str) -> String {
- let mut url_event_binary = get_url_for_create_event(url_host);
- url_event_binary.push_str(event_id);
- url_event_binary.push_str("/binaries");
-
- url_event_binary
-}
-
-async fn get_jwt_token(client: &Client) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
- let mut subscriber = client.subscribe(Topic::new("c8y/s/dat")?.filter()).await?;
-
- let () = client
- .publish(mqtt_client::Message::new(
- &Topic::new("c8y/s/uat")?,
- "".to_string(),
- ))
- .await?;
-
- let token_smartrest =
- match tokio::time::timeout(Duration::from_secs(10), subscriber.next()).await {
- Ok(Some(msg)) => msg.payload_str()?.to_string(),
- Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
- Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
- };
-
- Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
-}
-
#[cfg(test)]
mod tests {
use super::*;
- use mqtt_tests::with_timeout::{Maybe, WithTimeout};
use std::fs::File;
use std::io::Write;
use std::str::FromStr;
use test_case::test_case;
- const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000);
-
- #[tokio::test]
- #[serial_test::serial]
- async fn get_jwt_token_full_run() {
- let broker = mqtt_tests::test_mqtt_broker();
- let mut messages = broker.messages_published_on("c8y/s/uat").await;
-
- let publisher = Client::connect(
- "get_jwt_token_full_run",
- &mqtt_client::Config::default().with_port(broker.port),
- )
- .await
- .unwrap();
-
- // Setup listener stream to publish on first message received on topic `c8y/s/us`.
- let responder_task = tokio::spawn(async move {
- let msg = messages
- .recv()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No JWT request received.");
- assert_eq!(&msg, "");
-
- // After receiving successful message publish response with a custom 'token' on topic `c8y/s/dat`.
- let _ = broker.publish("c8y/s/dat", "71,1111").await;
- });
-
- // Wait till token received.
- let (jwt_token, _responder) = tokio::join!(get_jwt_token(&publisher), responder_task);
-
- // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`.
- assert!(jwt_token.is_ok());
- assert_eq!(jwt_token.unwrap().token(), "1111");
- }
-
- #[test]
- fn get_url_for_get_id_returns_correct_address() {
- let res = get_url_for_get_id("test_host", "test_device");
-
- assert_eq!(
- res,
- "https://test_host/identity/externalIds/c8y_Serial/test_device"
- );
- }
-
- #[test]
- fn get_url_for_sw_list_returns_correct_address() {
- let res = get_url_for_sw_list("test_host", "12345");
-
- assert_eq!(res, "https://test_host/inventory/managedObjects/12345");
- }
-
- #[test_case("http://aaa.test.com")]
- #[test_case("https://aaa.test.com")]
- #[test_case("ftp://aaa.test.com")]
- #[test_case("mqtt://aaa.test.com")]
- #[test_case("https://t1124124.test.com")]
- #[test_case("https://t1124124.test.com:12345")]
- #[test_case("https://t1124124.test.com/path")]
- #[test_case("https://t1124124.test.com/path/to/file.test")]
- #[test_case("https://t1124124.test.com/path/to/file")]
- fn url_is_my_tenant_correct_urls(url: &str) {
- assert!(url_is_in_my_tenant_domain(url, "test.test.com"));
- }
-
- #[test_case("test.com")]
- #[test_case("http://test.co")]
- #[test_case("http://test.co.te")]
- #[test_case("http://test.com:123456")]
- #[test_case("http://test.com::12345")]
- fn url_is_my_tenant_incorrect_urls(url: &str) {
- assert!(!url_is_in_my_tenant_domain(url, "test.test.com"));
- }
-
#[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")]
#[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")]
#[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")]