summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-02-23 20:19:49 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-02-23 22:47:50 +0530
commit5c5e38639769def866981f4257190ff76fae481e (patch)
tree7cd374cceb904e74f1904a616672f035d48501dc /crates/core/tedge_mapper/src
parent7616196c806ab6fecf63812442a02f6460785e2f (diff)
Improve error message of mqtt message size validation
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs21
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs58
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs3
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs128
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs13
-rw-r--r--crates/core/tedge_mapper/src/core/size_threshold.rs18
6 files changed, 109 insertions, 132 deletions
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index b6c3fdd4..7989cfbd 100644
--- a/crates/core/tedge_mapper/src/az/converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -37,11 +37,10 @@ impl Converter for AzureConverter {
}
async fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
- let input = input.payload_str()?;
let () = self.size_threshold.validate(input)?;
let default_timestamp = self.add_timestamp.then(|| self.clock.now());
let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp);
- let () = thin_edge_json::parser::parse_str(input, &mut serializer)?;
+ let () = thin_edge_json::parser::parse_str(input.payload_str()?, &mut serializer)?;
let payload = serializer.into_string()?;
Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))])
@@ -52,11 +51,7 @@ impl Converter for AzureConverter {
mod tests {
use crate::{
az::converter::AzureConverter,
- core::{
- converter::*,
- error::ConversionError,
- size_threshold::{SizeThreshold, SizeThresholdExceeded},
- },
+ core::{converter::*, error::ConversionError, size_threshold::SizeThreshold},
};
use assert_json_diff::*;
@@ -194,17 +189,17 @@ mod tests {
async fn exceeding_threshold_returns_error() {
let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1));
+ let _topic = "tedge/measurements".to_string();
let input = "ABC";
let result = converter.try_convert(&new_tedge_message(input)).await;
assert_matches!(
result,
- Err(ConversionError::FromSizeThresholdExceeded(
- SizeThresholdExceeded {
- actual_size: 3,
- threshold: 1
- }
- ))
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _topic,
+ actual_size: 3,
+ threshold: 1
+ })
);
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 15b250a1..1616acc8 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -29,6 +29,7 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -149,26 +150,43 @@ where
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- match self.size_threshold.validate(input.payload_str()?) {
- // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- Ok(()) => {
- let smartrest_alarm = event::serialize_event(tedge_event)?;
- let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
- }
+ // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+
+ Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ } else {
// If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
- Err(_) => {
- let event_text = tedge_event
- .data
- .and_then(|data| data.message)
- .unwrap_or_else(|| "generic event".into());
- let _ = self
- .http_proxy
- .send_event(tedge_event.name.as_str(), event_text.as_str(), None)
- .await?;
- Ok(vec![])
- }
+ let (event_text, event_time) = match tedge_event.data {
+ None => {
+ let message = tedge_event.name.clone();
+ let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+
+ (message, time)
+ }
+ Some(event_data) => {
+ let message = event_data
+ .message
+ .unwrap_or_else(|| tedge_event.name.clone());
+ let time = event_data.time.map_or_else(
+ || OffsetDateTime::now_utc().format(&Rfc3339),
+ |timestamp| timestamp.format(&Rfc3339),
+ )?;
+ (message, time)
+ }
+ };
+
+ let _ = self
+ .http_proxy
+ .send_event(
+ tedge_event.name.as_str(),
+ event_text.as_str(),
+ Some(event_time),
+ )
+ .await?;
+ Ok(vec![])
}
}
}
@@ -186,11 +204,11 @@ where
async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
- let () = self.size_threshold.validate(message.payload_str()?)?;
+ let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
topic if topic.name.starts_with("tedge/alarms") => {
- let () = self.size_threshold.validate(message.payload_str()?)?;
+ let () = self.size_threshold.validate(message)?;
self.alarm_converter.try_convert_alarm(message)
}
topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index d5ab3092..a5df4d0b 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -16,6 +16,7 @@ use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
+const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024;
pub struct CumulocityMapper {}
@@ -65,7 +66,7 @@ impl CumulocityMapper {
#[async_trait]
impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
- let size_threshold = SizeThreshold(16 * 1024);
+ let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 9f989eae..e4a68e29 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -1,7 +1,11 @@
-use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold};
+use crate::core::{
+ converter::Converter, error::ConversionError, mapper::create_mapper,
+ size_threshold::SizeThreshold,
+};
use anyhow::Result;
+use assert_matches::assert_matches;
use c8y_api::{
- http_proxy::{C8YHttpProxy, C8yMqttJwtTokenRetriever, JwtAuthHttpProxy, MockC8YHttpProxy},
+ http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
json_c8y::C8yUpdateSoftwareListResponse,
};
use c8y_smartrest::{
@@ -9,7 +13,7 @@ use c8y_smartrest::{
smartrest_deserializer::SmartRestJwtResponse,
};
use mockall::predicate;
-use mqtt_channel::{Connection, Message, Topic, TopicFilter};
+use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use serde_json::json;
use serial_test::serial;
@@ -457,19 +461,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn test_sync_alarms() {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
+ let mut converter = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let alarm_payload = r#"{ "message": "Temperature very high" }"#;
@@ -523,18 +515,7 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_thin_edge_json_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -567,18 +548,7 @@ async fn convert_thin_edge_json_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
@@ -613,18 +583,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -688,36 +647,32 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
}
}
-#[test]
-fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
+#[tokio::test]
+async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+ let mut converter = create_c8y_converter();
- let converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
- let buffer = create_packet(1024 * 20);
- let err = converter.size_threshold.validate(&buffer).unwrap_err();
- assert_eq!(
- err.to_string(),
- "The input size 20480 is too big. The threshold is 16384."
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm";
+ let big_message = create_packet(1024 * 20);
+ let alarm_payload = json!({ "message": big_message }).to_string();
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ assert_matches!(
+ converter.try_convert(&alarm_message).await,
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _,
+ actual_size: 20494,
+ threshold: 16384
+ })
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
-async fn test_convert_event() -> Result<()> {
- let size_threshold = SizeThreshold(32 * 1024);
+async fn convert_event() -> Result<()> {
+ let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
let device_type = String::from("test_type");
- let operations = Operations::new();
+ let operations = Operations::default();
let http_proxy = MockC8YHttpProxy::new();
let mut converter = CumulocityConverter::new(
@@ -749,7 +704,7 @@ async fn test_convert_big_event() {
let size_threshold = SizeThreshold(mqtt_packet_limit * 1024);
let device_name = String::from("test");
let device_type = String::from("test_type");
- let operations = Operations::new();
+ let operations = Operations::default();
let mut http_proxy = MockC8YHttpProxy::new();
http_proxy
@@ -828,26 +783,29 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
}
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
+ let converter = create_c8y_converter();
+ let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?;
+
+ let mapper_task = tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ Ok(mapper_task)
+}
+
+fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
+ let size_threshold = SizeThreshold(16 * 1024);
let device_name = "test-device".into();
let device_type = "test-device-type".into();
- let size_threshold = SizeThreshold(16 * 1024);
let operations = Operations::default();
let http_proxy = FakeC8YHttpProxy {};
- let converter = Box::new(CumulocityConverter::new(
+ CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
- ));
-
- let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?;
-
- let mapper_task = tokio::spawn(async move {
- let _ = mapper.run().await;
- });
- Ok(mapper_task)
+ )
}
fn remove_whitespace(s: &str) -> String {
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index c21792b4..e8fac94b 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -1,4 +1,4 @@
-use crate::{c8y::error::CumulocityMapperError, core::size_threshold::SizeThresholdExceeded};
+use crate::c8y::error::CumulocityMapperError;
use c8y_smartrest::error::OperationsError;
use mqtt_channel::MqttError;
@@ -54,8 +54,12 @@ pub enum ConversionError {
#[error(transparent)]
FromThinEdgeJsonParser(#[from] thin_edge_json::parser::ThinEdgeJsonParserError),
- #[error(transparent)]
- FromSizeThresholdExceeded(#[from] SizeThresholdExceeded),
+ #[error("The size of the message received on {topic} is {actual_size} which is greater than the threshold size of {threshold}.")]
+ SizeThresholdExceeded {
+ topic: String,
+ actual_size: usize,
+ threshold: usize,
+ },
#[error("The given Child ID '{id}' is invalid.")]
InvalidChildId { id: String },
@@ -83,4 +87,7 @@ pub enum ConversionError {
#[error(transparent)]
FromUtf8Error(#[from] std::string::FromUtf8Error),
+
+ #[error(transparent)]
+ FromTimeFormatError(#[from] time::error::Format),
}
diff --git a/crates/core/tedge_mapper/src/core/size_threshold.rs b/crates/core/tedge_mapper/src/core/size_threshold.rs
index fb6e3d48..da1ee108 100644
--- a/crates/core/tedge_mapper/src/core/size_threshold.rs
+++ b/crates/core/tedge_mapper/src/core/size_threshold.rs
@@ -1,12 +1,17 @@
+use mqtt_channel::Message;
+
+use super::error::ConversionError;
+
#[derive(Debug)]
pub struct SizeThreshold(pub usize);
impl SizeThreshold {
- pub fn validate(&self, input: &str) -> Result<(), SizeThresholdExceeded> {
- let actual_size = input.len();
+ pub fn validate(&self, input: &Message) -> Result<(), ConversionError> {
+ let actual_size = input.payload_bytes().len();
let threshold = self.0;
if actual_size > threshold {
- Err(SizeThresholdExceeded {
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: input.topic.name.clone(),
actual_size,
threshold,
})
@@ -15,10 +20,3 @@ impl SizeThreshold {
}
}
}
-
-#[derive(thiserror::Error, Debug)]
-#[error("The input size {actual_size} is too big. The threshold is {threshold}.")]
-pub struct SizeThresholdExceeded {
- pub actual_size: usize,
- pub threshold: usize,
-}