summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/sm_c8y_mapper
diff options
context:
space:
mode:
authorLukasz Woznicki <lukasz.woznicki@softwareag.com>2022-02-09 13:55:02 +0000
committerLukasz Woznicki <lukasz.woznicki@softwareag.com>2022-02-18 09:41:55 +0000
commitad065ec206ad3b48f9e1cc48080f4d2ecfe85a2e (patch)
tree2042dc1a03440f61b112006d6f12b532f0610566 /crates/core/tedge_mapper/src/sm_c8y_mapper
parentb1907dc4780709bad572a7108314c563eb02b70a (diff)
Move module to subdirectories and adjust use
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/sm_c8y_mapper')
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs55
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs425
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs4
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs457
4 files changed, 0 insertions, 941 deletions
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
deleted file mode 100644
index e3d96cab..00000000
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs
+++ /dev/null
@@ -1,55 +0,0 @@
-use c8y_smartrest::error::{SmartRestDeserializerError, SmartRestSerializerError};
-
-#[derive(thiserror::Error, Debug)]
-pub enum SMCumulocityMapperError {
- #[error("Invalid MQTT Message.")]
- InvalidMqttMessage,
-
- #[error(transparent)]
- InvalidTopicError(#[from] agent_interface::TopicError),
-
- #[error(transparent)]
- InvalidThinEdgeJson(#[from] agent_interface::SoftwareError),
-
- #[error(transparent)]
- FromElapsed(#[from] tokio::time::error::Elapsed),
-
- #[error(transparent)]
- FromMqttClient(#[from] mqtt_channel::MqttError),
-
- #[error(transparent)]
- FromReqwest(#[from] reqwest::Error),
-
- #[error(transparent)]
- FromSmartRestSerializer(#[from] SmartRestSerializerError),
-
- #[error(transparent)]
- FromSmartRestDeserializer(#[from] SmartRestDeserializerError),
-
- #[error(transparent)]
- FromTedgeConfig(#[from] tedge_config::ConfigSettingError),
-
- #[error(transparent)]
- FromTimeFormat(#[from] time::error::Format),
-
- #[error(transparent)]
- FromTimeParse(#[from] time::error::Parse),
-
- #[error("Invalid date in file name: {0}")]
- InvalidDateInFileName(String),
-
- #[error("Invalid path. Not UTF-8.")]
- InvalidUtf8Path,
-
- #[error(transparent)]
- FromIo(#[from] std::io::Error),
-
- #[error("Request timed out")]
- RequestTimeout,
-
- #[error("Operation execution failed: {0}")]
- ExecuteFailed(String),
-
- #[error("An unknown operation template: {0}")]
- UnknownOperation(String),
-}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
deleted file mode 100644
index 871afcfd..00000000
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs
+++ /dev/null
@@ -1,425 +0,0 @@
-use crate::component::TEdgeComponent;
-
-use agent_interface::{
- topic::*, Jsonify, OperationStatus, RestartOperationRequest, RestartOperationResponse,
- SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
-};
-use async_trait::async_trait;
-use c8y_api::{
- http_proxy::{C8YHttpProxy, JwtAuthHttpProxy},
- json_c8y::C8yUpdateSoftwareListResponse,
-};
-use c8y_smartrest::smartrest_deserializer::SmartRestRestartRequest;
-use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
-use c8y_smartrest::{
- error::{SMCumulocityMapperError, SmartRestDeserializerError},
- operations::Operations,
- smartrest_deserializer::SmartRestUpdateSoftware,
- smartrest_serializer::{
- SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
- SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
- SmartRestSetSupportedLogType,
- },
- topic::*,
-};
-use download::{Auth, DownloadInfo};
-use mqtt_channel::{Config, Connection, MqttError, SinkExt, StreamExt, Topic, TopicFilter};
-use std::{convert::TryInto, process::Stdio};
-use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
-use tracing::{debug, error, info, instrument};
-
-const SM_MAPPER: &str = "SM-C8Y-Mapper";
-const SM_MAPPER_JWT_TOKEN_SESSION_NAME: &str = "SM-C8Y-Mapper-JWT-Token";
-
-pub struct CumulocitySoftwareManagementMapper {}
-
-impl CumulocitySoftwareManagementMapper {
- pub fn new() -> Self {
- Self {}
- }
-
- pub fn subscriptions(operations: &Operations) -> Result<TopicFilter, anyhow::Error> {
- let mut topic_filter = TopicFilter::new(ResponseTopic::SoftwareListResponse.as_str())?;
- topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?;
- topic_filter.add(C8yTopic::SmartRestRequest.as_str())?;
- topic_filter.add(ResponseTopic::RestartResponse.as_str())?;
-
- for topic in operations.topics_for_operations() {
- topic_filter.add(&topic)?
- }
-
- Ok(topic_filter)
- }
-
- pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge sm mapper session");
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
- let config = Config::default()
- .with_session_name(SM_MAPPER)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
- mqtt_channel::init_session(&config).await?;
- Ok(())
- }
-
- pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge sm mapper session");
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
- let config = Config::default()
- .with_session_name(SM_MAPPER)
- .with_clean_session(true)
- .with_subscriptions(mqtt_topic);
- mqtt_channel::clear_session(&config).await?;
- Ok(())
- }
-}
-
-#[async_trait]
-impl TEdgeComponent for CumulocitySoftwareManagementMapper {
- #[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")]
- async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let http_proxy =
- JwtAuthHttpProxy::try_new(&tedge_config, SM_MAPPER_JWT_TOKEN_SESSION_NAME).await?;
- let mut sm_mapper =
- CumulocitySoftwareManagement::try_new(&tedge_config, http_proxy, operations).await?;
-
- let () = sm_mapper.run().await?;
-
- Ok(())
- }
-}
-
-pub struct CumulocitySoftwareManagement<Proxy>
-where
- Proxy: C8YHttpProxy,
-{
- client: Connection,
- http_proxy: Proxy,
- operations: Operations,
-}
-
-impl<Proxy> CumulocitySoftwareManagement<Proxy>
-where
- Proxy: C8YHttpProxy,
-{
- pub fn new(client: Connection, http_proxy: Proxy, operations: Operations) -> Self {
- Self {
- client,
- http_proxy,
- operations,
- }
- }
-
- pub async fn try_new(
- tedge_config: &TEdgeConfig,
- http_proxy: Proxy,
- operations: Operations,
- ) -> Result<Self, anyhow::Error> {
- let mqtt_topic = CumulocitySoftwareManagementMapper::subscriptions(&operations)?;
- let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
-
- let mqtt_config = crate::mapper::mqtt_config(SM_MAPPER, mqtt_port, mqtt_topic)?;
- let client = Connection::new(&mqtt_config).await?;
-
- Ok(Self {
- client,
- http_proxy,
- operations,
- })
- }
-
- pub async fn run(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialisation");
- let () = self.http_proxy.init().await?;
-
- info!("Running");
- let () = self.publish_supported_log_types().await?;
- let () = self.publish_get_pending_operations().await?;
- let () = self.ask_software_list().await?;
-
- while let Err(err) = self.subscribe_messages_runtime().await {
- if let SMCumulocityMapperError::FromSmartRestDeserializer(
- SmartRestDeserializerError::InvalidParameter { operation, .. },
- ) = &err
- {
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- // publish the operation status as `executing`
- let () = self.publish(&topic, format!("501,{}", operation)).await?;
- // publish the operation status as `failed`
- let () = self
- .publish(
- &topic,
- format!("502,{},\"{}\"", operation, &err.to_string()),
- )
- .await?;
- }
- error!("{}", err);
- }
-
- Ok(())
- }
-
- async fn process_smartrest(&mut self, payload: &str) -> Result<(), SMCumulocityMapperError> {
- let message_id: &str = &payload[..3];
- match message_id {
- "528" => {
- let () = self.forward_software_request(payload).await?;
- }
- "510" => {
- let () = self.forward_restart_request(payload).await?;
- }
- template => match self.operations.matching_smartrest_template(template) {
- Some(operation) => {
- if let Some(command) = operation.command() {
- execute_operation(payload, command.as_str()).await?;
- }
- }
- None => {
- return Err(SMCumulocityMapperError::UnknownOperation(
- template.to_string(),
- ));
- }
- },
- }
-
- Ok(())
- }
-
- #[instrument(skip(self), name = "main-loop")]
- async fn subscribe_messages_runtime(&mut self) -> Result<(), SMCumulocityMapperError> {
- while let Some(message) = self.client.received.next().await {
- let request_topic = message.topic.clone().try_into()?;
- match request_topic {
- MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse) => {
- debug!("Software list");
- let () = self
- .validate_and_publish_software_list(message.payload_str()?)
- .await?;
- }
- MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareUpdateResponse) => {
- debug!("Software update");
- let () = self
- .publish_operation_status(message.payload_str()?)
- .await?;
- }
- MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse) => {
- let () = self
- .publish_restart_operation_status(message.payload_str()?)
- .await?;
- }
- MapperSubscribeTopic::C8yTopic(_) => {
- debug!("Cumulocity");
- let () = self.process_smartrest(message.payload_str()?).await?;
- }
- }
- }
- Ok(())
- }
-
- #[instrument(skip(self), name = "software-list")]
- async fn ask_software_list(&mut self) -> Result<(), SMCumulocityMapperError> {
- let request = SoftwareListRequest::default();
- let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?;
- let json_list_request = request.to_json()?;
- let () = self.publish(&topic, json_list_request).await?;
-
- Ok(())
- }
-
- #[instrument(skip(self), name = "software-update")]
- async fn validate_and_publish_software_list(
- &mut self,
- json_response: &str,
- ) -> Result<(), SMCumulocityMapperError> {
- let response = SoftwareListResponse::from_json(json_response)?;
-
- match response.status() {
- OperationStatus::Successful => {
- let () = self.send_software_list_http(&response).await?;
- }
-
- OperationStatus::Failed => {
- error!("Received a failed software response: {}", json_response);
- }
-
- OperationStatus::Executing => {} // C8Y doesn't expect any message to be published
- }
-
- Ok(())
- }
-
- async fn publish_supported_log_types(&mut self) -> Result<(), SMCumulocityMapperError> {
- let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let () = self.publish(&topic, payload).await?;
- Ok(())
- }
-
- async fn publish_get_pending_operations(&mut self) -> Result<(), SMCumulocityMapperError> {
- let data = SmartRestGetPendingOperations::default();
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- let payload = data.to_smartrest()?;
- let () = self.publish(&topic, payload).await?;
- Ok(())
- }
-
- async fn publish_operation_status(
- &mut self,
- json_response: &str,
- ) -> Result<(), SMCumulocityMapperError> {
- let response = SoftwareUpdateResponse::from_json(json_response)?;
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
- match response.status() {
- OperationStatus::Executing => {
- let smartrest_set_operation_status =
- SmartRestSetOperationToExecuting::from_thin_edge_json(response)?
- .to_smartrest()?;
- let () = self.publish(&topic, smartrest_set_operation_status).await?;
- }
- OperationStatus::Successful => {
- let smartrest_set_operation =
- SmartRestSetOperationToSuccessful::from_thin_edge_json(response)?
- .to_smartrest()?;
- let () = self.publish(&topic, smartrest_set_operation).await?;
- let () = self
- .validate_and_publish_software_list(json_response)
- .await?;
- }
- OperationStatus::Failed => {
- let smartrest_set_operation =
- SmartRestSetOperationToFailed::from_thin_edge_json(response)?.to_smartrest()?;
- let () = self.publish(&topic, smartrest_set_operation).await?;
- let () = self
- .validate_and_publish_software_list(json_response)
- .await?;
- }
- };
- Ok(())
- }
-
- async fn publish_restart_operation_status(
- &mut self,
- json_response: &str,
- ) -> Result<(), SMCumulocityMapperError> {
- let response = RestartOperationResponse::from_json(json_response)?;
- let topic = C8yTopic::SmartRestResponse.to_topic()?;
-
- match response.status() {
- OperationStatus::Executing => {
- let smartrest_set_operation = SmartRestSetOperationToExecuting::new(
- CumulocitySupportedOperations::C8yRestartRequest,
- )
- .to_smartrest()?;
-
- let () = self.publish(&topic, smartrest_set_operation).await?;
- }
- OperationStatus::Successful => {
- let smartrest_set_operation = SmartRestSetOperationToSuccessful::new(
- CumulocitySupportedOperations::C8yRestartRequest,
- )
- .to_smartrest()?;
- let () = self.publish(&topic, smartrest_set_operation).await?;
- }
- OperationStatus::Failed => {
- let smartrest_set_operation = SmartRestSetOperationToFailed::new(
- CumulocitySupportedOperations::C8yRestartRequest,
- "Restart Failed".into(),
- )
- .to_smartrest()?;
- let () = self.publish(&topic, smartrest_set_operation).await?;
- }
- }
- Ok(())
- }
- async fn forward_software_request(
- &mut self,
- smartrest: &str,
- ) -> Result<(), SMCumulocityMapperError> {
- let topic = Topic::new(RequestTopic::SoftwareUpdateRequest.as_str())?;
- let update_software = SmartRestUpdateSoftware::default();
- let mut software_update_request = update_software
- .from_smartrest(smartrest)?
- .to_thin_edge_json()?;
-
- let token = self.http_proxy.get_jwt_token().await?;
-
- software_update_request
- .update_list
- .iter_mut()
- .for_each(|modules| {
- modules.modules.iter_mut().for_each(|module| {
- if let Some(url) = &module.url {
- if self.http_proxy.url_is_in_my_tenant_domain(url.url()) {
- module.url = module.url.as_ref().map(|s| {
- DownloadInfo::new(&s.url)
- .with_auth(Auth::new_bearer(&token.token()))
- });
- } else {
- module.url = module.url.as_ref().map(|s| DownloadInfo::new(&s.url));
- }
- }
- });
- });
-
- let () = self
- .publish(&topic, software_update_request.to_json()?)
- .await?;
-
- Ok(())
- }
-
- async fn forward_restart_request(
- &mut self,
- smartrest: &str,
- ) -> Result<(), SMCumulocityMapperError> {
- let topic = Topic::new(RequestTopic::RestartRequest.as_str())?;
- let _ = SmartRestRestartRequest::from_smartrest(smartrest)?;
-
- let request = RestartOperationRequest::default();
- let () = self.publish(&topic, request.to_json()?).await?;
-
- Ok(())
- }
-
- async fn publish(&mut self, topic: &Topic, payload: String) -> Result<(), MqttError> {
- let () = self
- .client
- .published
- .send(mqtt_channel::Message::new(topic, payload))
- .await?;
- Ok(())
- }
-
- async fn send_software_list_http(
- &mut self,
- json_response: &SoftwareListResponse,
- ) -> Result<(), SMCumulocityMapperError> {
- let c8y_software_list: C8yUpdateSoftwareListResponse = json_response.into();
- self.http_proxy
- .send_software_list_http(&c8y_software_list)
- .await
- }
-}
-
-async fn execute_operation(payload: &str, command: &str) -> Result<(), SMCumulocityMapperError> {
- let command = command.to_owned();
- let payload = payload.to_string();
-
- let _handle = tokio::spawn(async move {
- let mut child = tokio::process::Command::new(command)
- .args(&[payload])
- .stdin(Stdio::null())
- .stdout(Stdio::null())
- .stderr(Stdio::null())
- .spawn()
- .map_err(|e| SMCumulocityMapperError::ExecuteFailed(e.to_string()))
- .unwrap();
-
- child.wait().await
- });
-
- Ok(())
-}
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs
deleted file mode 100644
index 144d7526..00000000
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs
+++ /dev/null
@@ -1,4 +0,0 @@
-pub mod mapper;
-
-#[cfg(test)]
-mod tests;
diff --git a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs b/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
deleted file mode 100644
index 7172cd24..00000000
--- a/crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs
+++ /dev/null
@@ -1,457 +0,0 @@
-use crate::sm_c8y_mapper::mapper::{
- CumulocitySoftwareManagement, CumulocitySoftwareManagementMapper,
-};
-use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
-use c8y_api::json_c8y::C8yUpdateSoftwareListResponse;
-use c8y_smartrest::{
- error::SMCumulocityMapperError, operations::Operations,
- smartrest_deserializer::SmartRestJwtResponse,
-};
-use mqtt_channel::{Connection, TopicFilter};
-use mqtt_tests::test_mqtt_server::MqttProcessHandler;
-use mqtt_tests::with_timeout::{Maybe, WithTimeout};
-use mqtt_tests::StreamExt;
-use serial_test::serial;
-use std::time::Duration;
-use tokio::task::JoinHandle;
-
-const TEST_TIMEOUT_MS: Duration = Duration::from_millis(1000);
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_a_software_list_request() {
- // The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`.
- let broker = mqtt_tests::test_mqtt_broker();
-
- let mut messages = broker
- .messages_published_on("tedge/commands/req/software/list")
- .await;
-
- // Start the SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await;
-
- // Expect on `tedge/commands/req/software/list` a software list request.
- let msg = messages
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- dbg!(&msg);
- assert!(&msg.contains(r#"{"id":"#));
-
- sm_mapper.unwrap().abort();
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() {
- // The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists.
- let broker = mqtt_tests::test_mqtt_broker();
- let mut messages = broker.messages_published_on("c8y/s/us").await;
-
- // Start SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await;
-
- // Expect both 114 and 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails.
- mqtt_tests::assert_received(
- &mut messages,
- TEST_TIMEOUT_MS,
- vec!["118,software-management\n", "500\n"],
- )
- .await;
- sm_mapper.unwrap().abort();
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_software_update_request() {
- // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
- // and converts it to thin-edge json message published on `tedge/commands/req/software/update`.
- let broker = mqtt_tests::test_mqtt_broker();
- let mut messages = broker
- .messages_published_on("tedge/commands/req/software/update")
- .await;
-
- let sm_mapper = start_sm_mapper(broker.port).await;
-
- // Prepare and publish a software update smartrest request on `c8y/s/ds`.
- let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
- let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
- let _ = publish_a_fake_jwt_token(&broker).await;
-
- let expected_update_list = r#"
- "updateList": [
- {
- "type": "debian",
- "modules": [
- {
- "name": "nodered",
- "version": "1.0.0",
- "action": "install"
- }
- ]
- }"#;
-
- // Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload.
- let msg = messages
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- dbg!(&msg);
- assert!(&msg.contains("{\"id\":\""));
- assert!(&msg.contains(&remove_whitespace(expected_update_list)));
- sm_mapper.unwrap().abort();
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_software_update_status_onto_c8y_topic() {
- // The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update`
- // and publishes status of the operation `501` on `c8y/s/us`
- let broker = mqtt_tests::test_mqtt_broker();
-
- let mut messages = broker.messages_published_on("c8y/s/us").await;
-
- // Start SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await;
- let _ = publish_a_fake_jwt_token(&broker).await;
-
- mqtt_tests::assert_received(
- &mut messages,
- TEST_TIMEOUT_MS,
- vec!["118,software-management\n", "500\n"],
- )
- .await;
-
- // Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`.
- let json_response = r#"{
- "id": "123",
- "status": "executing"
- }"#;
-
- let _ = broker
- .publish("tedge/commands/res/software/update", json_response)
- .await
- .unwrap();
-
- // Expect `501` smartrest message on `c8y/s/us`.
- let msg = messages
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- assert_eq!(&msg, "501,c8y_SoftwareUpdate\n");
-
- // Prepare and publish a software update response `successful`.
- let json_response = r#"{
- "id":"123",
- "status":"successful",
- "currentSoftwareList":[
- {"type":"apt","modules":[
- {"name":"m","url":"https://foobar.io/m.epl"}
- ]}
- ]}"#;
-
- let _ = broker
- .publish("tedge/commands/res/software/update", json_response)
- .await
- .unwrap();
-
- // Expect `503` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
- let msg = messages
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- assert_eq!(&msg, "503,c8y_SoftwareUpdate,\n");
-
- sm_mapper.unwrap().abort();
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_software_update_failed_status_onto_c8y_topic() {
- let broker = mqtt_tests::test_mqtt_broker();
- let mut messages = broker.messages_published_on("c8y/s/us").await;
-
- // Start SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await;
- let _ = publish_a_fake_jwt_token(&broker).await;
- mqtt_tests::assert_received(
- &mut messages,
- TEST_TIMEOUT_MS,
- vec!["118,software-management\n", "500\n"],
- )
- .await;
-
- // The agent publish an error
- let json_response = r#"
- {
- "id": "123",
- "status":"failed",
- "reason":"Partial failure: Couldn't install collectd and nginx",
- "currentSoftwareList": [
- {
- "type": "docker",
- "modules": [
- {
- "name": "nginx",
- "version": "1.21.0"
- }
- ]
- }
- ],
- "failures":[]
- }"#;
-
- let _ = broker
- .publish("tedge/commands/res/software/update", json_response)
- .await
- .unwrap();
-
- // `502` messages with correct payload have been received on `c8y/s/us`, if no msg received for the timeout the test fails.
- let msg = messages
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- assert_eq!(
- &msg,
- "502,c8y_SoftwareUpdate,\"Partial failure: Couldn\'t install collectd and nginx\"\n"
- );
-
- sm_mapper.unwrap().abort();
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_fails_during_sw_update_recovers_and_process_response() -> Result<(), anyhow::Error>
-{
- // The test assures recovery and processing of messages by the SM-Mapper when it fails in the middle of the operation.
- let broker = mqtt_tests::test_mqtt_broker();
-
- // When a software update request message is received on `c8y/s/ds` by the sm mapper,
- // converts it to thin-edge json message, publishes a request message on `tedge/commands/req/software/update`.
- // SM Mapper fails before receiving the response message for the request.
- // Meanwhile the operation response message was published on `tedge/commands/res/software/update`.
- // Now the SM Mapper recovers and receives the response message and publishes the status on `c8y/s/us`.
- // The subscriber that was waiting for the response on `c8/s/us` receives the response and validates it.
-
- // Create a subscriber to receive messages on `tedge/commands/req/software/update` topic.
- let mut requests = broker
- .messages_published_on("tedge/commands/req/software/update")
- .await;
-
- // Create a subscriber to receive messages on `"c8y/s/us` topic.
- let mut responses = broker.messages_published_on("c8y/s/us").await;
-
- // Start SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await?;
- mqtt_tests::assert_received(
- &mut responses,
- TEST_TIMEOUT_MS,
- vec!["118,software-management\n", "500\n"],
- )
- .await;
-
- // Prepare and publish a software update smartrest request on `c8y/s/ds`.
- let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
- let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
- let _ = publish_a_fake_jwt_token(&broker).await;
-
- let expected_update_list = r#"
- "updateList": [
- {
- "type": "debian",
- "modules": [
- {
- "name": "nodered",
- "version": "1.0.0",
- "action": "install"
- }
- ]
- }"#;
-
- // Wait for the request being published by the mapper on `tedge/commands/req/software/update`.
- let msg = requests
- .next()
- .with_timeout(TEST_TIMEOUT_MS)
- .await
- .expect_or("No message received after a second.");
- assert!(msg.contains(&remove_whitespace(expected_update_list)));
-
- // Stop the SM Mapper (simulating a failure)
- sm_mapper.abort();
- assert!(sm_mapper.await.unwrap_err().is_cancelled());
-
- // Let the agent publish the response `successful`.
- let json_response = r#"{
- "id":"123",
- "status":"successful",
- "currentSoftwareList":[
- {
- "type":"apt",
- "modules": [
- {
- "name":"m",
- "url":"https://foobar.io/m.epl"
- }
- ]
- }
- ]}"#;
- let _ = broker
- .publish(
- "tedge/commands/res/software/update",
- &remove_whitespace(json_response),
- )
- .await
- .unwrap();
-
- // Restart SM Mapper
- let sm_mapper = start_sm_mapper(broker.port).await?;
-
- // Validate that the mapper process the response and forward it on 'c8y/s/us'
- // Expect init messages followed by a 503 (success)
- mqtt_tests::assert_received(
- &mut responses,
- TEST_TIMEOUT_MS * 5,
- vec![
- "118,software-management\n",
- "500\n",
- "503,c8y_SoftwareUpdate,\n",
- ],
- )
- .await;
-
- sm_mapper.abort();
- Ok(())
-}
-
-#[tokio::test]
-#[serial]
-async fn mapper_publishes_software_update_request_with_wrong_action() {
- // The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
- // Then the SM Mapper finds out that wrong action as part of the update request.
- // Then SM Mapper publishes an operation status message as executing `501,c8y_SoftwareUpdate'
- // Then SM Mapper publishes an operation status message as failed `502,c8y_SoftwareUpdate,Action remove is not recognized. It must be install or delete.` on `c8/s/us`.
- // Then the subscriber that subscribed for messages on `c8/s/us` receives these messages and verifies them.
-
- let broker = mqtt_tests::test_mqtt_broker();
-
- // Create a subscriber to receive messages on `c8y/s/us` topic.
- let mut messages = broker.messages_published_on("c8y/s/us").await;
-
- let _sm_mapper = start_sm_mapper(broker.port).await;
- mqtt_tests::assert_received(
- &mut messages,
- TEST_TIMEOUT_MS,
- vec!["118,software-management\n", "500\n"],
- )
- .await;
-
- // Prepare and publish a c8_SoftwareUpdate smartrest request on `c8y/s/ds` that contains a wrong action `remove`, that is not known by c8y.
- let smartrest = r#"528,external_id,nodered,1.0.0::debian,,remove"#;
- let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap()