diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/tests.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 182 |
1 files changed, 110 insertions, 72 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index b300bf0f..7cbed97a 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -1,11 +1,21 @@ -use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold}; -use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}; +use crate::core::{ + converter::Converter, error::ConversionError, mapper::create_mapper, + size_threshold::SizeThreshold, +}; +use anyhow::Result; +use assert_matches::assert_matches; +use c8y_api::{ + http_proxy::{C8YHttpProxy, MockC8YHttpProxy}, + json_c8y::C8yUpdateSoftwareListResponse, +}; use c8y_smartrest::{ error::SMCumulocityMapperError, operations::Operations, smartrest_deserializer::SmartRestJwtResponse, }; +use mockall::predicate; use mqtt_channel::{Message, Topic}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; +use serde_json::json; use serial_test::serial; use std::time::Duration; use test_case::test_case; @@ -463,19 +473,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn test_sync_alarms() { - let size_threshold = SizeThreshold(16 * 1024); - let device_name = String::from("test"); - let device_type = String::from("test_type"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = CumulocityConverter::new( - size_threshold, - device_name, - device_type, - operations, - http_proxy, - ); + let mut converter = create_c8y_converter(); let alarm_topic = "tedge/alarms/critical/temperature_alarm"; let alarm_payload = r#"{ "message": "Temperature very high" }"#; @@ -529,18 +527,7 @@ async fn test_sync_alarms() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_thin_edge_json_with_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -573,18 +560,7 @@ async fn convert_thin_edge_json_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_invalid_payload = r#"{"temp": invalid}"#; @@ -619,18 +595,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_two_thin_edge_json_messages_given_different_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; // First message from "child1" @@ -694,30 +659,91 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) { } } -#[test] -fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { +#[tokio::test] +async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { + let mut converter = create_c8y_converter(); + + let alarm_topic = "tedge/alarms/critical/temperature_alarm"; + let big_message = create_packet(1024 * 20); + let alarm_payload = json!({ "message": big_message }).to_string(); + let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + + assert_matches!( + converter.try_convert(&alarm_message).await, + Err(ConversionError::SizeThresholdExceeded { + topic: _, + actual_size: 20494, + threshold: 16384 + }) + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn convert_event() -> Result<()> { let size_threshold = SizeThreshold(16 * 1024); let device_name = String::from("test"); - let device_type = String::from("test"); + let device_type = String::from("test_type"); let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; + let http_proxy = MockC8YHttpProxy::new(); - let converter = CumulocityConverter::new( + let mut converter = CumulocityConverter::new( size_threshold, device_name, device_type, operations, http_proxy, ); - let buffer = create_packet(1024 * 20); - let err = converter.size_threshold.validate(&buffer).unwrap_err(); - assert_eq!( - err.to_string(), - "The input size 20480 is too big. The threshold is 16384." - ); + + let event_topic = "tedge/events/click_event"; + let event_payload = r#"{ "message": "Someone clicked" }"#; + let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + + let converted_events = converter.convert(&event_message).await; + assert_eq!(converted_events.len(), 1); + let converted_event = converted_events.get(0).unwrap(); + assert_eq!(converted_event.topic.name, "c8y/s/us"); + assert!(converted_event + .payload_str()? + .starts_with("400,click_event")); + Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_convert_big_event() { + let mqtt_packet_limit = 16; + let size_threshold = SizeThreshold(mqtt_packet_limit * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + let operations = Operations::default(); + + let mut http_proxy = MockC8YHttpProxy::new(); + http_proxy + .expect_send_event() + .with( + predicate::eq("click_event"), + predicate::always(), + predicate::always(), + ) + .returning(|_, _, _| Ok("123".into())); + + let mut converter = CumulocityConverter::new( + size_threshold, + device_name, + device_type, + operations, + http_proxy, + ); + + let event_topic = "tedge/events/click_event"; + let big_event_message = create_packet((mqtt_packet_limit + 1) * 1024); // Event payload > size_threshold + let big_event_payload = json!({ "message": big_event_message }).to_string(); + let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload); + + assert!(converter.convert(&big_event_message).await.is_empty()); +} + fn create_packet(size: usize) -> String { let data: String = "Some data!".into(); let loops = size / data.len(); @@ -757,29 +783,41 @@ impl C8YHttpProxy for FakeC8YHttpProxy { ) -> Result<String, SMCumulocityMapperError> { Ok("fake/upload/url".into()) } + + async fn send_event( + &mut self, + _event_type: &str, + _text: &str, + _time: Option<String>, + ) -> Result<String, SMCumulocityMapperError> { + Ok("123".into()) + } } async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> { + let converter = create_c8y_converter(); + let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?; + + let mapper_task = tokio::spawn(async move { + let _ = mapper.run().await; + }); + Ok(mapper_task) +} + +fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> { + let size_threshold = SizeThreshold(16 * 1024); let device_name = "test-device".into(); let device_type = "test-device-type".into(); - let size_threshold = SizeThreshold(16 * 1024); let operations = Operations::default(); let http_proxy = FakeC8YHttpProxy {}; - let converter = Box::new(CumulocityConverter::new( + CumulocityConverter::new( size_threshold, device_name, device_type, operations, http_proxy, - )); - - let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?; - - let mapper_task = tokio::spawn(async move { - let _ = mapper.run().await; - }); - Ok(mapper_task) + ) } fn remove_whitespace(s: &str) -> String { |