summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-04-28 16:00:57 +0530
committerGitHub <noreply@github.com>2022-04-28 16:00:57 +0530
commit00e9ee334217e62bd63677e7d0df011f84d6752d (patch)
tree99d0801568192bc5fd8e6f3d2d7937ffc518b368 /crates/core/tedge_mapper
parent86b8b9578dbac9da5a3094d2ec9904aba8f68382 (diff)
947058fdf0da2286c63be62bbcdebf6af071c4ee# This is a combination of 2 commits. (#1095)
Closes #1056 c8y measurement threshold check for mqtt Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs30
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs170
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs8
3 files changed, 145 insertions, 63 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 9843e2f0..d71e12a0 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -121,7 +121,7 @@ where
let mut vec: Vec<Message> = Vec::new();
let maybe_child_id = get_child_id_from_topic(&input.topic.name)?;
- match maybe_child_id {
+ let c8y_json_payload = match maybe_child_id {
Some(child_id) => {
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
let c8y_json_child_payload =
@@ -134,19 +134,23 @@ where
format!("101,{child_id},{child_id},thin-edge.io-child"),
));
}
-
- vec.push(Message::new(
- &self.mapper_config.out_topic,
- c8y_json_child_payload,
- ));
- }
- None => {
- let c8y_json_payload = json::from_thin_edge_json(input.payload_str()?)?;
- vec.push(Message::new(
- &self.mapper_config.out_topic,
- c8y_json_payload,
- ));
+ c8y_json_child_payload
}
+ None => json::from_thin_edge_json(input.payload_str()?)?,
+ };
+
+ if c8y_json_payload.len() < self.size_threshold.0 {
+ vec.push(Message::new(
+ &self.mapper_config.out_topic,
+ c8y_json_payload,
+ ));
+ } else {
+ return Err(ConversionError::TranslatedSizeExceededThreshold {
+ payload: input.payload_str()?[0..50].into(),
+ topic: input.topic.name.clone(),
+ actual_size: c8y_json_payload.len(),
+ threshold: self.size_threshold.0,
+ });
}
Ok(vec)
}
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 849c7563..b4637295 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -6,14 +6,14 @@ use anyhow::Result;
use assert_json_diff::assert_json_include;
use assert_matches::assert_matches;
use c8y_api::{
- http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
+ http_proxy::C8YHttpProxy,
json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
smartrest_deserializer::SmartRestJwtResponse,
};
-use mockall::predicate;
+
use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use serde_json::json;
@@ -683,20 +683,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = MockC8YHttpProxy::new();
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
-
+ let mut converter = create_c8y_converter();
let event_topic = "tedge/events/click_event";
let event_payload = r#"{ "text": "Someone clicked", "time": "2020-02-02T01:02:03+05:30" }"#;
let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
@@ -716,20 +703,7 @@ async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = MockC8YHttpProxy::new();
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
-
+ let mut converter = create_c8y_converter();
let event_topic = "tedge/events/click_event";
let event_payload = r#"{ "text": "tick", "foo": "bar" }"#;
let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
@@ -754,34 +728,119 @@ async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_convert_big_event() {
- let mqtt_packet_limit = 16;
- let size_threshold = SizeThreshold(mqtt_packet_limit * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
-
- let mut http_proxy = MockC8YHttpProxy::new();
- http_proxy
- .expect_send_event()
- .with(predicate::always())
- .returning(|_| Ok("123".into()));
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
+ let mut converter = create_c8y_converter();
let event_topic = "tedge/events/click_event";
- let big_event_text = create_packet((mqtt_packet_limit + 1) * 1024); // Event payload > size_threshold
+ let big_event_text = create_packet((16 + 1) * 1024); // Event payload > size_threshold
let big_event_payload = json!({ "text": big_event_text }).to_string();
let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload);
assert!(converter.convert(&big_event_message).await.is_empty());
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_big_measurement() {
+ let mut converter = create_c8y_converter();
+
+ let measurement_topic = "tedge/measurements";
+ let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json
+
+ let big_measurement_message = Message::new(
+ &Topic::new_unchecked(measurement_topic),
+ big_measurement_payload,
+ );
+
+ let result = converter.convert(&big_measurement_message).await;
+
+ assert!(result
+ .into_iter()
+ .nth(0)
+ .unwrap()
+ .payload_str()
+ .unwrap()
+ .contains( "The payload {\"temperature0\":0,\"temperature1\":1,\"temperature10\" received on tedge/measurements after translation is 33020 greater than the threshold size of 16384."));
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_small_measurement() {
+ let mut converter = create_c8y_converter();
+
+ let measurement_topic = "tedge/measurements";
+ let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes
+
+ let big_measurement_message = Message::new(
+ &Topic::new_unchecked(measurement_topic),
+ big_measurement_payload,
+ );
+
+ let result = converter.convert(&big_measurement_message).await;
+
+ assert!(result
+ .into_iter()
+ .nth(0)
+ .unwrap()
+ .payload_str()
+ .unwrap()
+ .contains(
+ "{\"type\":\"ThinEdgeMeasurement\",\"temperature0\":{\"temperature0\":{\"value\":0.0}}"
+ ));
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_big_measurement_for_child_device() {
+ let mut converter = create_c8y_converter();
+
+ let measurement_topic = "tedge/measurements/child1";
+ let big_measurement_payload = create_thin_edge_measurement(10 * 1024); // Measurement payload > size_threshold after converting to c8y json
+
+ let big_measurement_message = Message::new(
+ &Topic::new_unchecked(measurement_topic),
+ big_measurement_payload,
+ );
+
+ let result = converter.convert(&big_measurement_message).await;
+
+ assert!(result
+ .into_iter()
+ .nth(0)
+ .unwrap()
+ .payload_str()
+ .unwrap()
+ .contains("The payload {\"temperature0\":0,\"temperature1\":1,\"temperature10\" received on tedge/measurements/child1 after translation is 33081 greater than the threshold size of 16384."));
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_small_measurement_for_child_device() {
+ let measurement_topic = "tedge/measurements/child1";
+ let big_measurement_payload = create_thin_edge_measurement(20); // Measurement payload size is 20 bytes
+
+ let big_measurement_message = Message::new(
+ &Topic::new_unchecked(measurement_topic),
+ big_measurement_payload,
+ );
+ let mut converter = create_c8y_converter();
+ let result = converter.convert(&big_measurement_message).await;
+
+ assert!(result
+ .clone()
+ .into_iter()
+ .nth(0)
+ .unwrap()
+ .payload_str()
+ .unwrap()
+ .contains("101,child1,child1,thin-edge.io-child"));
+
+ assert!(result.clone()
+ .into_iter()
+ .nth(1)
+ .unwrap()
+ .payload_str()
+ .unwrap()
+ .contains(
+ "{\"type\":\"ThinEdgeMeasurement\",\"externalSource\":{\"externalId\":\"child1\",\"type\":\"c8y_Serial\"},\"temperature0\":{\"temperature0\":{\"value\":0.0}},"
+ ));
+}
+
fn create_packet(size: usize) -> String {
let data: String = "Some data!".into();
let loops = size / data.len();
@@ -792,6 +851,17 @@ fn create_packet(size: usize) -> String {
buffer
}
+fn create_thin_edge_measurement(size: usize) -> String {
+ let mut map = serde_json::Map::new();
+ let data = r#""temperature":25"#;
+ let loops = size / data.len();
+ for i in 0..loops {
+ map.insert(format!("temperature{i}"), json!(i));
+ }
+ let obj = serde_json::Value::Object(map);
+ serde_json::to_string(&obj).unwrap()
+}
+
pub struct FakeC8YHttpProxy {}
#[async_trait::async_trait]
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index e8fac94b..743fd477 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -90,4 +90,12 @@ pub enum ConversionError {
#[error(transparent)]
FromTimeFormatError(#[from] time::error::Format),
+
+ #[error("The payload {payload} received on {topic} after translation is {actual_size} greater than the threshold size of {threshold}.")]
+ TranslatedSizeExceededThreshold {
+ payload: String,
+ topic: String,
+ actual_size: usize,
+ threshold: usize,
+ },
}