summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r--crates/core/tedge_mapper/Cargo.toml2
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs21
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs56
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs8
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs182
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs18
-rw-r--r--crates/core/tedge_mapper/src/core/size_threshold.rs18
7 files changed, 194 insertions, 111 deletions
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index ffc8d6d3..ba44908d 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -37,7 +37,7 @@ csv = "1.1"
download = { path = "../../common/download" }
flockfile = { path = "../../common/flockfile" }
futures = "0.3"
-mockall = "0.10"
+mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index b6c3fdd4..7989cfbd 100644
--- a/crates/core/tedge_mapper/src/az/converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -37,11 +37,10 @@ impl Converter for AzureConverter {
}
async fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
- let input = input.payload_str()?;
let () = self.size_threshold.validate(input)?;
let default_timestamp = self.add_timestamp.then(|| self.clock.now());
let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp);
- let () = thin_edge_json::parser::parse_str(input, &mut serializer)?;
+ let () = thin_edge_json::parser::parse_str(input.payload_str()?, &mut serializer)?;
let payload = serializer.into_string()?;
Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))])
@@ -52,11 +51,7 @@ impl Converter for AzureConverter {
mod tests {
use crate::{
az::converter::AzureConverter,
- core::{
- converter::*,
- error::ConversionError,
- size_threshold::{SizeThreshold, SizeThresholdExceeded},
- },
+ core::{converter::*, error::ConversionError, size_threshold::SizeThreshold},
};
use assert_json_diff::*;
@@ -194,17 +189,17 @@ mod tests {
async fn exceeding_threshold_returns_error() {
let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1));
+ let _topic = "tedge/measurements".to_string();
let input = "ABC";
let result = converter.try_convert(&new_tedge_message(input)).await;
assert_matches!(
result,
- Err(ConversionError::FromSizeThresholdExceeded(
- SizeThresholdExceeded {
- actual_size: 3,
- threshold: 1
- }
- ))
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _topic,
+ actual_size: 3,
+ threshold: 1
+ })
);
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 899a763d..1616acc8 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -9,7 +9,7 @@ use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- event::serialize_event,
+ event::{self},
operations::Operations,
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
@@ -29,6 +29,7 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -144,12 +145,49 @@ where
Ok(vec)
}
- fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
+ async fn try_convert_event(
+ &mut self,
+ input: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- let smartrest_alarm = serialize_event(tedge_event)?;
- let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+
+ Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ } else {
+ // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ let (event_text, event_time) = match tedge_event.data {
+ None => {
+ let message = tedge_event.name.clone();
+ let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+
+ (message, time)
+ }
+ Some(event_data) => {
+ let message = event_data
+ .message
+ .unwrap_or_else(|| tedge_event.name.clone());
+ let time = event_data.time.map_or_else(
+ || OffsetDateTime::now_utc().format(&Rfc3339),
+ |timestamp| timestamp.format(&Rfc3339),
+ )?;
+ (message, time)
+ }
+ };
+
+ let _ = self
+ .http_proxy
+ .send_event(
+ tedge_event.name.as_str(),
+ event_text.as_str(),
+ Some(event_time),
+ )
+ .await?;
+ Ok(vec![])
+ }
}
}
@@ -164,20 +202,22 @@ where
&self.mapper_config
}
async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
- let () = self.size_threshold.validate(message.payload_str()?)?;
-
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
+ let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
topic if topic.name.starts_with("tedge/alarms") => {
+ let () = self.size_threshold.validate(message)?;
self.alarm_converter.try_convert_alarm(message)
}
topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
self.alarm_converter.process_internal_alarm(message);
Ok(vec![])
}
- topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message),
+ topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => {
+ self.try_convert_event(message).await
+ }
topic => match topic.clone().try_into() {
Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => {
debug!("Software list");
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 315449fe..a5df4d0b 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -5,7 +5,7 @@ use crate::{
use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
-use c8y_api::http_proxy::JwtAuthHttpProxy;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
use mqtt_channel::{Config, TopicFilter};
use tedge_config::{
@@ -16,6 +16,7 @@ use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
+const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024;
pub struct CumulocityMapper {}
@@ -65,10 +66,11 @@ impl CumulocityMapper {
#[async_trait]
impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
- let size_threshold = SizeThreshold(16 * 1024);
+ let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
+ let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
+ http_proxy.init().await?;
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index b300bf0f..7cbed97a 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -1,11 +1,21 @@
-use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold};
-use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use crate::core::{
+ converter::Converter, error::ConversionError, mapper::create_mapper,
+ size_threshold::SizeThreshold,
+};
+use anyhow::Result;
+use assert_matches::assert_matches;
+use c8y_api::{
+ http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
+ json_c8y::C8yUpdateSoftwareListResponse,
+};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
smartrest_deserializer::SmartRestJwtResponse,
};
+use mockall::predicate;
use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
+use serde_json::json;
use serial_test::serial;
use std::time::Duration;
use test_case::test_case;
@@ -463,19 +473,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn test_sync_alarms() {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
+ let mut converter = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let alarm_payload = r#"{ "message": "Temperature very high" }"#;
@@ -529,18 +527,7 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_thin_edge_json_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -573,18 +560,7 @@ async fn convert_thin_edge_json_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
@@ -619,18 +595,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -694,30 +659,91 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
}
}
-#[test]
-fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+#[tokio::test]
+async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+ let mut converter = create_c8y_converter();
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm";
+ let big_message = create_packet(1024 * 20);
+ let alarm_payload = json!({ "message": big_message }).to_string();
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ assert_matches!(
+ converter.try_convert(&alarm_message).await,
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _,
+ actual_size: 20494,
+ threshold: 16384
+ })
+ );
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn convert_event() -> Result<()> {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
- let device_type = String::from("test");
+ let device_type = String::from("test_type");
let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
+ let http_proxy = MockC8YHttpProxy::new();
- let converter = CumulocityConverter::new(
+ let mut converter = CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
);
- let buffer = create_packet(1024 * 20);
- let err = converter.size_threshold.validate(&buffer).unwrap_err();
- assert_eq!(
- err.to_string(),
- "The input size 20480 is too big. The threshold is 16384."
- );
+
+ let event_topic = "tedge/events/click_event";
+ let event_payload = r#"{ "message": "Someone clicked" }"#;
+ let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
+
+ let converted_events = converter.convert(&event_message).await;
+ assert_eq!(converted_events.len(), 1);
+ let converted_event = converted_events.get(0).unwrap();
+ assert_eq!(converted_event.topic.name, "c8y/s/us");
+ assert!(converted_event
+ .payload_str()?
+ .starts_with("400,click_event"));
+
Ok(())
}
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_convert_big_event() {
+ let mqtt_packet_limit = 16;
+ let size_threshold = SizeThreshold(mqtt_packet_limit * 1024);
+ let device_name = String::from("test");
+ let device_type = String::from("test_type");
+ let operations = Operations::default();
+
+ let mut http_proxy = MockC8YHttpProxy::new();
+ http_proxy
+ .expect_send_event()
+ .with(
+ predicate::eq("click_event"),
+ predicate::always(),
+ predicate::always(),
+ )
+ .returning(|_, _, _| Ok("123".into()));
+
+ let mut converter = CumulocityConverter::new(
+ size_threshold,
+ device_name,
+ device_type,
+ operations,
+ http_proxy,
+ );
+
+ let event_topic = "tedge/events/click_event";
+ let big_event_message = create_packet((mqtt_packet_limit + 1) * 1024); // Event payload > size_threshold
+ let big_event_payload = json!({ "message": big_event_message }).to_string();
+ let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload);
+
+ assert!(converter.convert(&big_event_message).await.is_empty());
+}
+
fn create_packet(size: usize) -> String {
let data: String = "Some data!".into();
let loops = size / data.len();
@@ -757,29 +783,41 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
) -> Result<String, SMCumulocityMapperError> {
Ok("fake/upload/url".into())
}
+
+ async fn send_event(
+ &mut self,
+ _event_type: &str,
+ _text: &str,
+ _time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError> {
+ Ok("123".into())
+ }
}
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
+ let converter = create_c8y_converter();
+ let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?;
+
+ let mapper_task = tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ Ok(mapper_task)
+}
+
+fn create_c8y_converter() -> CumulocityConverter<FakeC8YHttpProxy> {
+ let size_threshold = SizeThreshold(16 * 1024);
let device_name = "test-device".into();
let device_type = "test-device-type".into();
- let size_threshold = SizeThreshold(16 * 1024);
let operations = Operations::default();
let http_proxy = FakeC8YHttpProxy {};
- let converter = Box::new(CumulocityConverter::new(
+ CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
- ));
-
- let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?;
-
- let mapper_task = tokio::spawn(async move {
- let _ = mapper.run().await;
- });
- Ok(mapper_task)
+ )
}
fn remove_whitespace(s: &str) -> String {
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index 9b13d134..e8fac94b 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -1,4 +1,4 @@
-use crate::{c8y::error::CumulocityMapperError, core::size_threshold::SizeThresholdExceeded};
+use crate::c8y::error::CumulocityMapperError;
use c8y_smartrest::error::OperationsError;
use mqtt_channel::MqttError;
@@ -33,7 +33,10 @@ pub enum ConversionError {
FromCumulocityJsonError(#[from] c8y_translator::json::CumulocityJsonError),
#[error(transparent)]
- FromCumulocityCumulocityMapperError(#[from] CumulocityMapperError),
+ FromCumulocityMapperError(#[from] CumulocityMapperError),
+
+ #[error(transparent)]
+ FromCumulocitySmartRestMapperError(#[from] c8y_smartrest::error::SMCumulocityMapperError),
#[error(transparent)]
FromThinEdgeJsonSerialization(#[from] ThinEdgeJsonSerializationError),
@@ -51,8 +54,12 @@ pub enum ConversionError {
#[error(transparent)]
FromThinEdgeJsonParser(#[from] thin_edge_json::parser::ThinEdgeJsonParserError),
- #[error(transparent)]
- FromSizeThresholdExceeded(#[from] SizeThresholdExceeded),
+ #[error("The size of the message received on {topic} is {actual_size} which is greater than the threshold size of {threshold}.")]
+ SizeThresholdExceeded {
+ topic: String,
+ actual_size: usize,
+ threshold: usize,
+ },
#[error("The given Child ID '{id}' is invalid.")]
InvalidChildId { id: String },
@@ -80,4 +87,7 @@ pub enum ConversionError {
#[error(transparent)]
FromUtf8Error(#[from] std::string::FromUtf8Error),
+
+ #[error(transparent)]
+ FromTimeFormatError(#[from] time::error::Format),
}
diff --git a/crates/core/tedge_mapper/src/core/size_threshold.rs b/crates/core/tedge_mapper/src/core/size_threshold.rs
index fb6e3d48..da1ee108 100644
--- a/crates/core/tedge_mapper/src/core/size_threshold.rs
+++ b/crates/core/tedge_mapper/src/core/size_threshold.rs
@@ -1,12 +1,17 @@
+use mqtt_channel::Message;
+
+use super::error::ConversionError;
+
#[derive(Debug)]
pub struct SizeThreshold(pub usize);
impl SizeThreshold {
- pub fn validate(&self, input: &str) -> Result<(), SizeThresholdExceeded> {
- let actual_size = input.len();
+ pub fn validate(&self, input: &Message) -> Result<(), ConversionError> {
+ let actual_size = input.payload_bytes().len();
let threshold = self.0;
if actual_size > threshold {
- Err(SizeThresholdExceeded {
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: input.topic.name.clone(),
actual_size,
threshold,
})
@@ -15,10 +20,3 @@ impl SizeThreshold {
}
}
}
-
-#[derive(thiserror::Error, Debug)]
-#[error("The input size {actual_size} is too big. The threshold is {threshold}.")]
-pub struct SizeThresholdExceeded {
- pub actual_size: usize,
- pub threshold: usize,
-}