summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/tests.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/tests.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs182
1 files changed, 110 insertions, 72 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index b300bf0f..7cbed97a 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -1,11 +1,21 @@
-use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold};
-use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use crate::core::{
+ converter::Converter, error::ConversionError, mapper::create_mapper,
+ size_threshold::SizeThreshold,
+};
+use anyhow::Result;
+use assert_matches::assert_matches;
+use c8y_api::{
+ http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
+ json_c8y::C8yUpdateSoftwareListResponse,
+};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
smartrest_deserializer::SmartRestJwtResponse,
};
+use mockall::predicate;
use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
+use serde_json::json;
use serial_test::serial;
use std::time::Duration;
use test_case::test_case;
@@ -463,19 +473,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn test_sync_alarms() {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
+ let mut converter = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let alarm_payload = r#"{ "message": "Temperature very high" }"#;
@@ -529,18 +527,7 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_thin_edge_json_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -573,18 +560,7 @@ async fn convert_thin_edge_json_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
@@ -619,18 +595,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -694,30 +659,91 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
}
}
-#[test]
-fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+#[tokio::test]
+async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+ let mut converter = create_c8y_converter();
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm";
+ let big_message = create_packet(1024 * 20);
+ let alarm_payload = json!({ "message": big_message }).to_string();
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ assert_matches!(
+ converter.try_convert(&alarm_message).await,
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _,
+ actual_size: 20494,
+ threshold: 16384
+ })
+ );
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn convert_event() -> Result<()> {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
- let device_type = String::from("test");
+ let device_type = String::from("test_type");
let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
+ let http_proxy = MockC8YHttpProxy::new();
- let converter = CumulocityConverter::new(
+ let mut converter = CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
);
- let buffer = create_packet(1024 * 20);
- let err = converter.size_threshold.validate(&buffer).unwrap_err();
- assert_eq!(
- err.to_string(),
- "The input size 20480 is too big. The threshold is 16384."
- );
+
+ let event_topic = "tedge/events/click_event";
+ let event_payload = r#"{ "message": "Someone clicked" }"#;
+ let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
+
+ let converted_events = converter.convert(&event_message).await;
+ assert_eq!(converted_events.len(), 1);
+ let converted_event = converted_events.get(0).unwrap();
+ assert_eq!(converted_event.topic.name, "c8y/s/us");
+ assert!(converted_event
+ .payload_str()?
+ .starts_with("400,click_event"));
+
Ok(())
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_big_event() {
+ let mqtt_packet_limit = 16;
+ let size_threshold = SizeThreshold(mqtt_packet_limit * 1024);
+ let device_name = String::from("test");
+ let device_type = String::from("test_type");
+ let operations = Operations::default();
+
+ let mut http_proxy = MockC8YHttpProxy::new();
+ http_proxy
+ .expect_send_event()
+ .with(
+ predicate::eq("click_event"),
+ predicate::always(),
+ predicate::always(),
+ )
+ .returning(|_, _, _| Ok("123".into()));
+
+ let mut converter = CumulocityConverter::new(
+ size_threshold,
+ device_name,
+ device_type,
+ operations,
+ http_proxy,
+ );
+
+ let event_topic = "tedge/events/click_event";
+ let big_event_message = create_packet((mqtt_packet_limit + 1) * 1024); // Event payload > size_threshold
+ let big_event_payload = json!({ "message": big_event_message }).to_string();
+ let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload);
+
+ assert!(converter.convert(&big_event_message).await.is_empty());
+}
+
fn create_packet(size: usize) -> String {
let data: String = "Some data!".into();
let loops = size / data.len();
@@ -757,29 +783,41 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
) -> Result<String, SMCumulocityMapperError> {
Ok("fake/upload/url".into())
}
+
+ async fn send_event(
+ &mut self,
+ _event_type: &str,
+ _text: &str,
+ _time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError> {
+ Ok("123".into())
+ }
}
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
+ let converter = create_c8y_converter();
+ let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?;
+
+ let mapper_task = tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ Ok(mapper_task)
+}
+
+fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
+ let size_threshold = SizeThreshold(16 * 1024);
let device_name = "test-device".into();
let device_type = "test-device-type".into();
- let size_threshold = SizeThreshold(16 * 1024);
let operations = Operations::default();
let http_proxy = FakeC8YHttpProxy {};
- let converter = Box::new(CumulocityConverter::new(
+ CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
- ));
-
- let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?;
-
- let mapper_task = tokio::spawn(async move {
- let _ = mapper.run().await;
- });
- Ok(mapper_task)
+ )
}
fn remove_whitespace(s: &str) -> String {