From ad065ec206ad3b48f9e1cc48080f4d2ecfe85a2e Mon Sep 17 00:00:00 2001 From: Lukasz Woznicki Date: Wed, 9 Feb 2022 13:55:02 +0000 Subject: Move module to subdirectories and adjust use Signed-off-by: Lukasz Woznicki --- .../core/tedge_mapper/src/sm_c8y_mapper/error.rs | 55 --- .../core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 425 ------------------- crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs | 4 - .../core/tedge_mapper/src/sm_c8y_mapper/tests.rs | 457 --------------------- 4 files changed, 941 deletions(-) delete mode 100644 crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs delete mode 100644 crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs delete mode 100644 crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs delete mode 100644 crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper') diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs deleted file mode 100644 index e3d96cab..00000000 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs +++ /dev/null @@ -1,55 +0,0 @@ -use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError}; - -#[derive(thiserror::Error, Debug)] -pub enum SMCumulocityMapperError { - #[error("Invalid MQTT Message.")] - InvalidMqttMessage, - - #[error(transparent)] - InvalidTopicError(#[from] agent_interface::TopicError), - - #[error(transparent)] - InvalidThinEdgeJson(#[from] agent_interface::SoftwareError), - - #[error(transparent)] - FromElapsed(#[from] tokio::time::error::Elapsed), - - #[error(transparent)] - FromMqttClient(#[from] mqtt_channel::MqttError), - - #[error(transparent)] - FromReqwest(#[from] reqwest::Error), - - #[error(transparent)] - FromSmartRestSerializer(#[from] SmartRestSerializerError), - - #[error(transparent)] - FromSmartRestDeserializer(#[from] SmartRestDeserializerError), - - #[error(transparent)] - FromTedgeConfig(#[from] tedge_config::ConfigSettingError), - - #[error(transparent)] - FromTimeFormat(#[from] time::error::Format), - - #[error(transparent)] - FromTimeParse(#[from] time::error::Parse), - - #[error("Invalid date in file name: {0}")] - InvalidDateInFileName(String), - - #[error("Invalid path. Not UTF-8.")] - InvalidUtf8Path, - - #[error(transparent)] - FromIo(#[from] std::io::Error), - - #[error("Request timed out")] - RequestTimeout, - - #[error("Operation execution failed: {0}")] - ExecuteFailed(String), - - #[error("An unknown operation template: {0}")] - UnknownOperation(String), -} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs deleted file mode 100644 index 871afcfd..00000000 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs +++ /dev/null @@ -1,425 +0,0 @@ -use crate::component::TEdgeComponent; - -use agent_interface::{ - topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse, - SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, -}; -use async_trait::async_trait; -use c8y_api::{ - http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}, - json_c8y::C8yUpdateSoftwareListResponse, -}; -use c8y_smartrest::smartrest_deserializer::SmartRestRestartRequest; -use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations; -use c8y_smartrest::{ - error::{SMCumulocityMapperError, SmartRestDeserializerError}, - operations::Operations, - smartrest_deserializer::SmartRestUpdateSoftware, - smartrest_serializer::{ - SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, - SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, - SmartRestSetSupportedLogType, - }, - topic::*, -}; -use download::{Auth, DownloadInfo}; -use mqtt_channel::{Config, Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter}; -use std::{convert::TryInto, process::Stdio}; -use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig}; -use tracing::{debug, error, info, instrument}; - -const SM_MAPPER: &str = "SM-C8Y-Mapper"; -const SM_MAPPER_JWT_TOKEN_SESSION_NAME: &str = "SM-C8Y-Mapper-JWT-Token"; - -pub struct CumulocitySoftwareManagementMapper {} - -impl CumulocitySoftwareManagementMapper { - pub fn new() -> Self { - Self {} - } - - pub fn subscriptions(operations: &Operations) -> Result { - let mut topic_filter = TopicFilter::new(ResponseTopic::SoftwareListResponse.as_str())?; - topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?; - topic_filter.add(C8yTopic::SmartRestRequest.as_str())?; - topic_filter.add(ResponseTopic::RestartResponse.as_str())?; - - for topic in operations.topics_for_operations() { - topic_filter.add(&topic)? - } - - Ok(topic_filter) - } - - pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge sm mapper session"); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let config = Config::default() - .with_session_name(SM_MAPPER) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - mqtt_channel::init_session(&config).await?; - Ok(()) - } - - pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge sm mapper session"); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let config = Config::default() - .with_session_name(SM_MAPPER) - .with_clean_session(true) - .with_subscriptions(mqtt_topic); - mqtt_channel::clear_session(&config).await?; - Ok(()) - } -} - -#[async_trait] -impl TEdgeComponent for CumulocitySoftwareManagementMapper { - #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")] - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let http_proxy = - JwtAuthHttpProxy::try_new(&tedge_config, SM_MAPPER_JWT_TOKEN_SESSION_NAME).await?; - let mut sm_mapper = - CumulocitySoftwareManagement::try_new(&tedge_config, http_proxy, operations).await?; - - let () = sm_mapper.run().await?; - - Ok(()) - } -} - -pub struct CumulocitySoftwareManagement -where - Proxy: C8YHttpProxy, -{ - client: Connection, - http_proxy: Proxy, - operations: Operations, -} - -impl CumulocitySoftwareManagement -where - Proxy: C8YHttpProxy, -{ - pub fn new(client: Connection, http_proxy: Proxy, operations: Operations) -> Self { - Self { - client, - http_proxy, - operations, - } - } - - pub async fn try_new( - tedge_config: &TEdgeConfig, - http_proxy: Proxy, - operations: Operations, - ) -> Result { - let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - - let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, mqtt_port, mqtt_topic)?; - let client = Connection::new(&mqtt_config).await?; - - Ok(Self { - client, - http_proxy, - operations, - }) - } - - pub async fn run(&mut self) -> Result<(), anyhow::Error> { - info!("Initialisation"); - let () = self.http_proxy.init().await?; - - info!("Running"); - let () = self.publish_supported_log_types().await?; - let () = self.publish_get_pending_operations().await?; - let () = self.ask_software_list().await?; - - while let Err(err) = self.subscribe_messages_runtime().await { - if let SMCumulocityMapperError::FromSmartRestDeserializer( - SmartRestDeserializerError::InvalidParameter { operation, .. }, - ) = &err - { - let topic = C8yTopic::SmartRestResponse.to_topic()?; - // publish the operation status as `executing` - let () = self.publish(&topic, format!("501,{}", operation)).await?; - // publish the operation status as `failed` - let () = self - .publish( - &topic, - format!("502,{},\"{}\"", operation, &err.to_string()), - ) - .await?; - } - error!("{}", err); - } - - Ok(()) - } - - async fn process_smartrest(&mut self, payload: &str) -> Result<(), SMCumulocityMapperError> { - let message_id: &str = &payload[..3]; - match message_id { - "528" => { - let () = self.forward_software_request(payload).await?; - } - "510" => { - let () = self.forward_restart_request(payload).await?; - } - template => match self.operations.matching_smartrest_template(template) { - Some(operation) => { - if let Some(command) = operation.command() { - execute_operation(payload, command.as_str()).await?; - } - } - None => { - return Err(SMCumulocityMapperError::UnknownOperation( - template.to_string(), - )); - } - }, - } - - Ok(()) - } - - #[instrument(skip(self), name = "main-loop")] - async fn subscribe_messages_runtime(&mut self) -> Result<(), SMCumulocityMapperError> { - while let Some(message) = self.client.received.next().await { - let request_topic = message.topic.clone().try_into()?; - match request_topic { - MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse) => { - debug!("Software list"); - let () = self - .validate_and_publish_software_list(message.payload_str()?) - .await?; - } - MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareUpdateResponse) => { - debug!("Software update"); - let () = self - .publish_operation_status(message.payload_str()?) - .await?; - } - MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse) => { - let () = self - .publish_restart_operation_status(message.payload_str()?) - .await?; - } - MapperSubscribeTopic::C8yTopic(_) => { - debug!("Cumulocity"); - let () = self.process_smartrest(message.payload_str()?).await?; - } - } - } - Ok(()) - } - - #[instrument(skip(self), name = "software-list")] - async fn ask_software_list(&mut self) -> Result<(), SMCumulocityMapperError> { - let request = SoftwareListRequest::default(); - let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?; - let json_list_request = request.to_json()?; - let () = self.publish(&topic, json_list_request).await?; - - Ok(()) - } - - #[instrument(skip(self), name = "software-update")] - async fn validate_and_publish_software_list( - &mut self, - json_response: &str, - ) -> Result<(), SMCumulocityMapperError> { - let response = SoftwareListResponse::from_json(json_response)?; - - match response.status() { - OperationStatus::Successful => { - let () = self.send_software_list_http(&response).await?; - } - - OperationStatus::Failed => { - error!("Received a failed software response: {}", json_response); - } - - OperationStatus::Executing => {} // C8Y doesn't expect any message to be published - } - - Ok(()) - } - - async fn publish_supported_log_types(&mut self) -> Result<(), SMCumulocityMapperError> { - let payload = SmartRestSetSupportedLogType::default().to_smartrest()?; - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let () = self.publish(&topic, payload).await?; - Ok(()) - } - - async fn publish_get_pending_operations(&mut self) -> Result<(), SMCumulocityMapperError> { - let data = SmartRestGetPendingOperations::default(); - let topic = C8yTopic::SmartRestResponse.to_topic()?; - let payload = data.to_smartrest()?; - let () = self.publish(&topic, payload).await?; - Ok(()) - } - - async fn publish_operation_status( - &mut self, - json_response: &str, - ) -> Result<(), SMCumulocityMapperError> { - let response = SoftwareUpdateResponse::from_json(json_response)?; - let topic = C8yTopic::SmartRestResponse.to_topic()?; - match response.status() { - OperationStatus::Executing => { - let smartrest_set_operation_status = - SmartRestSetOperationToExecuting::from_thin_edge_json(response)? - .to_smartrest()?; - let () = self.publish(&topic, smartrest_set_operation_status).await?; - } - OperationStatus::Successful => { - let smartrest_set_operation = - SmartRestSetOperationToSuccessful::from_thin_edge_json(response)? - .to_smartrest()?; - let () = self.publish(&topic, smartrest_set_operation).await?; - let () = self - .validate_and_publish_software_list(json_response) - .await?; - } - OperationStatus::Failed => { - let smartrest_set_operation = - SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?; - let () = self.publish(&topic, smartrest_set_operation).await?; - let () = self - .validate_and_publish_software_list(json_response) - .await?; - } - }; - Ok(()) - } - - async fn publish_restart_operation_status( - &mut self, - json_response: &str, - ) -> Result<(), SMCumulocityMapperError> { - let response = RestartOperationResponse::from_json(json_response)?; - let topic = C8yTopic::SmartRestResponse.to_topic()?; - - match response.status() { - OperationStatus::Executing => { - let smartrest_set_operation = SmartRestSetOperationToExecuting::new( - CumulocitySupportedOperations::C8yRestartRequest, - ) - .to_smartrest()?; - - let () = self.publish(&topic, smartrest_set_operation).await?; - } - OperationStatus::Successful => { - let smartrest_set_operation = SmartRestSetOperationToSuccessful::new( - CumulocitySupportedOperations::C8yRestartRequest, - ) - .to_smartrest()?; - let () = self.publish(&topic, smartrest_set_operation).await?; - } - OperationStatus::Failed => { - let smartrest_set_operation = SmartRestSetOperationToFailed::new( - CumulocitySupportedOperations::C8yRestartRequest, - "Restart Failed".into(), - ) - .to_smartrest()?; - let () = self.publish(&topic, smartrest_set_operation).await?; - } - } - Ok(()) - } - async fn forward_software_request( - &mut self, - smartrest: &str, - ) -> Result<(), SMCumulocityMapperError> { - let topic = Topic::new(RequestTopic::SoftwareUpdateRequest.as_str())?; - let update_software = SmartRestUpdateSoftware::default(); - let mut software_update_request = update_software - .from_smartrest(smartrest)? - .to_thin_edge_json()?; - - let token = self.http_proxy.get_jwt_token().await?; - - software_update_request - .update_list - .iter_mut() - .for_each(|modules| { - modules.modules.iter_mut().for_each(|module| { - if let Some(url) = &module.url { - if self.http_proxy.url_is_in_my_tenant_domain(url.url()) { - module.url = module.url.as_ref().map(|s| { - DownloadInfo::new(&s.url) - .with_auth(Auth::new_bearer(&token.token())) - }); - } else { - module.url = module.url.as_ref().map(|s| DownloadInfo::new(&s.url)); - } - } - }); - }); - - let () = self - .publish(&topic, software_update_request.to_json()?) - .await?; - - Ok(()) - } - - async fn forward_restart_request( - &mut self, - smartrest: &str, - ) -> Result<(), SMCumulocityMapperError> { - let topic = Topic::new(RequestTopic::RestartRequest.as_str())?; - let _ = SmartRestRestartRequest::from_smartrest(smartrest)?; - - let request = RestartOperationRequest::default(); - let () = self.publish(&topic, request.to_json()?).await?; - - Ok(()) - } - - async fn publish(&mut self, topic: &Topic, payload: String) -> Result<(), MqttError> { - let () = self - .client - .published - .send(mqtt_channel::Message::new(topic, payload)) - .await?; - Ok(()) - } - - async fn send_software_list_http( - &mut self, - json_response: &SoftwareListResponse, - ) -> Result<(), SMCumulocityMapperError> { - let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into(); - self.http_proxy - .send_software_list_http(&c8y_software_list) - .await - } -} - -async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> { - let command = command.to_owned(); - let payload = payload.to_string(); - - let _handle = tokio::spawn(async move { - let mut child = tokio::process::Command::new(command) - .args(&[payload]) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn() - .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string())) - .unwrap(); - - child.wait().await - }); - - Ok(()) -} diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs deleted file mode 100644 index 144d7526..00000000 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod mapper; - -#[cfg(test)] -mod tests; diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs deleted file mode 100644 index 7172cd24..00000000 --- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs +++ /dev/null @@ -1,457 +0,0 @@ -use crate::sm_c8y_mapper::mapper::{ - CumulocitySoftwareManagement, CumulocitySoftwareManagementMapper, -}; -use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; -use c8y_api::json_c8y::C8yUpdateSoftwareListResponse; -use c8y_smartrest::{ - error::SMCumulocityMapperError, operations::Operations, - smartrest_deserializer::SmartRestJwtResponse, -}; -use mqtt_channel::{Connection, TopicFilter}; -use mqtt_tests::test_mqtt_server::MqttProcessHandler; -use mqtt_tests::with_timeout::{Maybe, WithTimeout}; -use mqtt_tests::StreamExt; -use serial_test::serial; -use std::time::Duration; -use tokio::task::JoinHandle; - -const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000); - -#[tokio::test] -#[serial] -async fn mapper_publishes_a_software_list_request() { - // The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`. - let broker = mqtt_tests::test_mqtt_broker(); - - let mut messages = broker - .messages_published_on("tedge/commands/req/software/list") - .await; - - // Start the SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await; - - // Expect on `tedge/commands/req/software/list` a software list request. - let msg = messages - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - dbg!(&msg); - assert!(&msg.contains(r#"{"id":"#)); - - sm_mapper.unwrap().abort(); -} - -#[tokio::test] -#[serial] -async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() { - // The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists. - let broker = mqtt_tests::test_mqtt_broker(); - let mut messages = broker.messages_published_on("c8y/s/us").await; - - // Start SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await; - - // Expect both 114 and 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails. - mqtt_tests::assert_received( - &mut messages, - TEST_TIMEOUT_MS, - vec!["118,software-management\n", "500\n"], - ) - .await; - sm_mapper.unwrap().abort(); -} - -#[tokio::test] -#[serial] -async fn mapper_publishes_software_update_request() { - // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` - // and converts it to thin-edge json message published on `tedge/commands/req/software/update`. - let broker = mqtt_tests::test_mqtt_broker(); - let mut messages = broker - .messages_published_on("tedge/commands/req/software/update") - .await; - - let sm_mapper = start_sm_mapper(broker.port).await; - - // Prepare and publish a software update smartrest request on `c8y/s/ds`. - let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; - let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); - let _ = publish_a_fake_jwt_token(&broker).await; - - let expected_update_list = r#" - "updateList": [ - { - "type": "debian", - "modules": [ - { - "name": "nodered", - "version": "1.0.0", - "action": "install" - } - ] - }"#; - - // Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload. - let msg = messages - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - dbg!(&msg); - assert!(&msg.contains("{\"id\":\"")); - assert!(&msg.contains(&remove_whitespace(expected_update_list))); - sm_mapper.unwrap().abort(); -} - -#[tokio::test] -#[serial] -async fn mapper_publishes_software_update_status_onto_c8y_topic() { - // The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update` - // and publishes status of the operation `501` on `c8y/s/us` - let broker = mqtt_tests::test_mqtt_broker(); - - let mut messages = broker.messages_published_on("c8y/s/us").await; - - // Start SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await; - let _ = publish_a_fake_jwt_token(&broker).await; - - mqtt_tests::assert_received( - &mut messages, - TEST_TIMEOUT_MS, - vec!["118,software-management\n", "500\n"], - ) - .await; - - // Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`. - let json_response = r#"{ - "id": "123", - "status": "executing" - }"#; - - let _ = broker - .publish("tedge/commands/res/software/update", json_response) - .await - .unwrap(); - - // Expect `501` smartrest message on `c8y/s/us`. - let msg = messages - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - assert_eq!(&msg, "501,c8y_SoftwareUpdate\n"); - - // Prepare and publish a software update response `successful`. - let json_response = r#"{ - "id":"123", - "status":"successful", - "currentSoftwareList":[ - {"type":"apt","modules":[ - {"name":"m","url":"https://foobar.io/m.epl"} - ]} - ]}"#; - - let _ = broker - .publish("tedge/commands/res/software/update", json_response) - .await - .unwrap(); - - // Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. - let msg = messages - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - assert_eq!(&msg, "503,c8y_SoftwareUpdate,\n"); - - sm_mapper.unwrap().abort(); -} - -#[tokio::test] -#[serial] -async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() { - let broker = mqtt_tests::test_mqtt_broker(); - let mut messages = broker.messages_published_on("c8y/s/us").await; - - // Start SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await; - let _ = publish_a_fake_jwt_token(&broker).await; - mqtt_tests::assert_received( - &mut messages, - TEST_TIMEOUT_MS, - vec!["118,software-management\n", "500\n"], - ) - .await; - - // The agent publish an error - let json_response = r#" - { - "id": "123", - "status":"failed", - "reason":"Partial failure: Couldn't install collectd and nginx", - "currentSoftwareList": [ - { - "type": "docker", - "modules": [ - { - "name": "nginx", - "version": "1.21.0" - } - ] - } - ], - "failures":[] - }"#; - - let _ = broker - .publish("tedge/commands/res/software/update", json_response) - .await - .unwrap(); - - // `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails. - let msg = messages - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - assert_eq!( - &msg, - "502,c8y_SoftwareUpdate,\"Partial failure: Couldn\'t install collectd and nginx\"\n" - ); - - sm_mapper.unwrap().abort(); -} - -#[tokio::test] -#[serial] -async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error> -{ - // The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation. - let broker = mqtt_tests::test_mqtt_broker(); - - // When a software update request message is received on `c8y/s/ds` by the sm mapper, - // converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`. - // SM Mapper fails before receiving the response message for the request. - // Meanwhile the operation response message was published on `tedge/commands/res/software/update`. - // Now the SM Mapper recovers and receives the response message and publishes the status on `c8y/s/us`. - // The subscriber that was waiting for the response on `c8/s/us` receives the response and validates it. - - // Create a subscriber to receive messages on `tedge/commands/req/software/update` topic. - let mut requests = broker - .messages_published_on("tedge/commands/req/software/update") - .await; - - // Create a subscriber to receive messages on `"c8y/s/us` topic. - let mut responses = broker.messages_published_on("c8y/s/us").await; - - // Start SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await?; - mqtt_tests::assert_received( - &mut responses, - TEST_TIMEOUT_MS, - vec!["118,software-management\n", "500\n"], - ) - .await; - - // Prepare and publish a software update smartrest request on `c8y/s/ds`. - let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#; - let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); - let _ = publish_a_fake_jwt_token(&broker).await; - - let expected_update_list = r#" - "updateList": [ - { - "type": "debian", - "modules": [ - { - "name": "nodered", - "version": "1.0.0", - "action": "install" - } - ] - }"#; - - // Wait for the request being published by the mapper on `tedge/commands/req/software/update`. - let msg = requests - .next() - .with_timeout(TEST_TIMEOUT_MS) - .await - .expect_or("No message received after a second."); - assert!(msg.contains(&remove_whitespace(expected_update_list))); - - // Stop the SM Mapper (simulating a failure) - sm_mapper.abort(); - assert!(sm_mapper.await.unwrap_err().is_cancelled()); - - // Let the agent publish the response `successful`. - let json_response = r#"{ - "id":"123", - "status":"successful", - "currentSoftwareList":[ - { - "type":"apt", - "modules": [ - { - "name":"m", - "url":"https://foobar.io/m.epl" - } - ] - } - ]}"#; - let _ = broker - .publish( - "tedge/commands/res/software/update", - &remove_whitespace(json_response), - ) - .await - .unwrap(); - - // Restart SM Mapper - let sm_mapper = start_sm_mapper(broker.port).await?; - - // Validate that the mapper process the response and forward it on 'c8y/s/us' - // Expect init messages followed by a 503 (success) - mqtt_tests::assert_received( - &mut responses, - TEST_TIMEOUT_MS * 5, - vec![ - "118,software-management\n", - "500\n", - "503,c8y_SoftwareUpdate,\n", - ], - ) - .await; - - sm_mapper.abort(); - Ok(()) -} - -#[tokio::test] -#[serial] -async fn mapper_publishes_software_update_request_with_wrong_action() { - // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds` - // Then the SM Mapper finds out that wrong action as part of the update request. - // Then SM Mapper publishes an operation status message as executing `501,c8y_SoftwareUpdate' - // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`. - // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them. - - let broker = mqtt_tests::test_mqtt_broker(); - - // Create a subscriber to receive messages on `c8y/s/us` topic. - let mut messages = broker.messages_published_on("c8y/s/us").await; - - let _sm_mapper = start_sm_mapper(broker.port).await; - mqtt_tests::assert_received( - &mut messages, - TEST_TIMEOUT_MS, - vec!["118,software-management\n", "500\n"], - ) - .await; - - // Prepare and publish a c8_SoftwareUpdate smartrest request on `c8y/s/ds` that contains a wrong action `remove`, that is not known by c8y. - let smartrest = r#"528,external_id,nodered,1.0.0::debian,,remove"#; - let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap(); - - // Expect a 501 (executing) followed by a 502 (failed) - mqtt_tests::assert_received( - &mut messages, - TEST_TIMEOUT_MS, - vec![ - "501,c8y_SoftwareUpdate", - "502,c8y_SoftwareUpdate,\"Parameter remove is not recognized. It must be install or delete.\"", - ], - ) - .await; -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -#[serial_test::serial] -async fn get_jwt_token_full_run() { - // Given a background process that publish JWT tokens on demand. - let broker = mqtt_tests::test_mqtt_broker(); - broker.map_messages_background(|(topic, _)| { - let mut response = vec![]; - if &topic == "c8y/s/uat" { - response.push(("c8y/s/dat".into(), "71,1111".into())); - } - response - }); - - // An JwtAuthHttpProxy ... - let mqtt_config = mqtt_channel::Config::default() - .with_port(broker.port) - .with_session_name("JWT-Requester-Test") - .with_subscriptions(TopicFilter::new_unchecked("c8y/s/dat")); - let mqtt_client = Connection::new(&mqtt_config).await.unwrap(); - let http_client = reqwest::ClientBuilder::new().build().unwrap(); - let mut http_proxy = - JwtAuthHttpProxy::new(mqtt_client, http_client, "test.tenant.com", "test-device"); - - // ... fetches and returns these JWT tokens. - let jwt_token = http_proxy.get_jwt_token().await; - - // `get_jwt_token` should return `Ok` and the value of token should be as set above `1111`. - assert!(jwt_token.is_ok()); - assert_eq!(jwt_token.unwrap().token(), "1111"); -} - -fn remove_whitespace(s: &str) -> String { - let mut s = String::from(s); - s.retain(|c| !c.is_whitespace()); - s -} - -async fn start_sm_mapper(mqtt_port: u16) -> Result, anyhow::Error> { - let operations = Operations::new(); - let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?; - let mqtt_config = mqtt_channel::Config::default() - .with_port(mqtt_port) - .with_session_name("SM-C8Y-Mapper-Test") - .with_subscriptions(mqtt_topic); - - let mqtt_client = Connection::new(&mqtt_config).await?; - let http_proxy = FakeC8YHttpProxy {}; - let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, http_proxy, operations); - - let mapper_task = tokio::spawn(async move { - let _ = sm_mapper.run().await; - }); - Ok(mapper_task) -} - -async fn publish_a_fake_jwt_token(broker: &MqttProcessHandler) { - let _ = broker.publish("c8y/s/dat", "71,1111").await.unwrap(); -} - -struct FakeC8YHttpProxy {} - -#[async_trait::async_trait] -impl C8YHttpProxy for FakeC8YHttpProxy { - async fn init(&mut self) -> Result<(), SMCumulocityMapperError> { - Ok(()) - } - - fn url_is_in_my_tenant_domain(&self, _url: &str) -> bool { - true - } - - async fn get_jwt_token(&mut self) -> Result { - Ok(SmartRestJwtResponse::try_new("71,fake-token")?) - } - - async fn send_software_list_http( - &mut self, - _c8y_software_list: &C8yUpdateSoftwareListResponse, - ) -> Result<(), SMCumulocityMapperError> { - Ok(()) - } - - async fn upload_log_binary( - &mut self, - _log_content: &str, - ) -> Result { - Ok("fake/upload/url".into()) - } -} -- cgit v1.2.3