use crate::core::{
converter::Converter, error::ConversionError, mapper::create_mapper,
size_threshold::SizeThreshold,
};
use anyhow::Result;
use assert_json_diff::assert_json_include;
use assert_matches::assert_matches;
use c8y_api::{
http_proxy::C8YHttpProxy,
json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
smartrest_deserializer::SmartRestJwtResponse,
};
use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use serde_json::json;
use serial_test::serial;
use std::{path::Path, time::Duration};
use tedge_test_utils::fs::TempTedgeDir;
use test_case::test_case;
use tokio::task::JoinHandle;
use super::converter::{get_child_id_from_measurement_topic, CumulocityConverter};
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
const MQTT_HOST: &str = "127.0.0.1";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_a_software_list_request() {
// The test assures the mapper publishes request for software list on `tedge/commands/req/software/list`.
let broker = mqtt_tests::test_mqtt_broker();
let mut messages = broker
.messages_published_on("tedge/commands/req/software/list")
.await;
// Start the SM Mapper
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Expect on `tedge/commands/req/software/list` a software list request.
mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &[r#"{"id":"#]).await;
sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_a_supported_operation_and_a_pending_operations_onto_c8y_topic() {
// The test assures the mapper publishes smartrest messages 114 and 500 on `c8y/s/us` which shall be send over to the cloud if bridge connection exists.
let broker = mqtt_tests::test_mqtt_broker();
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Expect 500 messages has been received on `c8y/s/us`, if no msg received for the timeout the test fails.
mqtt_tests::assert_received_all_expected(&mut messages, TEST_TIMEOUT_MS, &["500\n"]).await;
sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_software_update_request() {
// The test assures SM Mapper correctly receives software update request smartrest message on `c8y/s/ds`
// and converts it to thin-edge json message published on `tedge/commands/req/software/update`.
let broker = mqtt_tests::test_mqtt_broker();
let mut messages = broker
.messages_published_on("tedge/commands/req/software/update")
.await;
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
// Prepare and publish a software update smartrest request on `c8y/s/ds`.
let smartrest = r#"528,external_id,nodered,1.0.0::debian,,install"#;
let _ = broker.publish("c8y/s/ds", smartrest).await.unwrap();
let _ = publish_a_fake_jwt_token(broker).await;
let expected_update_list = r#"
"updateList": [
{
"type": "debian",
"modules": [
{
"name": "nodered",
"version": "1.0.0",
"action": "install"
}
]
}"#;
// Expect thin-edge json message on `tedge/commands/req/software/update` with expected payload.
mqtt_tests::assert_received_all_expected(
&mut messages,
TEST_TIMEOUT_MS,
&["{\"id\":\"", &remove_whitespace(expected_update_list)],
)
.await;
sm_mapper.abort();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn mapper_publishes_software_update_status_onto_c8y_topic() {
// The test assures SM Mapper correctly receives software update response message on `tedge/commands/res/software/update`
// and publishes status of the operation `501` on `c8y/s/us`
let broker = mqtt_tests::test_mqtt_broker();
let mut messages = broker.messages_published_on("c8y/s/us").await;
// Start SM Mapper
let (_tmp_dir, sm_mapper) = start_c8y_mapper(broker.port).await.unwrap();
let _ = publish_a_fake_jwt_token(broker).await;
// Prepare and publish a software update status response message `executing` on `tedge/commands/res/software/update`.
let json_response