diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-07 17:42:04 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-11 12:40:45 +0530 |
commit | 59fd1aa5ec16cc6a8cd89fc8f3541cf33f763ff3 (patch) | |
tree | 128406c2a056d723574d8496cc1f033efb9484c8 /crates/core/tedge_mapper | |
parent | c02fd97270b026536ed4d3e12f793299e9e76b70 (diff) |
Closes #893 Support custom fields and fragments in Thin Edge JSON events
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 63 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 58 |
3 files changed, 78 insertions, 45 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 6a658a1c..c5ab7b96 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -5,11 +5,13 @@ use agent_interface::{ RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, }; use async_trait::async_trait; -use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}; +use c8y_api::{ + http_proxy::C8YHttpProxy, + json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse}, +}; use c8y_smartrest::{ alarm, error::SmartRestDeserializerError, - event::{self}, operations::Operations, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, smartrest_serializer::{ @@ -29,7 +31,6 @@ use std::{ process::Stdio, }; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tracing::{debug, info, log::error}; use super::{ @@ -47,6 +48,9 @@ const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; +const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; + +const CREATE_EVENT_SMARTREST_CODE: u16 = 400; #[derive(Debug)] pub struct CumulocityConverter<Proxy> @@ -150,43 +154,38 @@ where input: &Message, ) -> Result<Vec<Message>, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; + let c8y_event = C8yCreateEvent::try_from(tedge_event)?; - // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well - if input.payload_bytes().len() < self.size_threshold.0 { - let smartrest_alarm = event::serialize_event(tedge_event)?; + // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST + let message = if c8y_event.extras.is_empty() { + let smartrest_event = Self::serialize_to_smartrest(&c8y_event); let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + Message::new(&smartrest_topic, smartrest_event) } else { - // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event - let (event_text, event_time) = match tedge_event.data { - None => { - let message = tedge_event.name.clone(); - let time = OffsetDateTime::now_utc().format(&Rfc3339)?; + // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON + let cumulocity_event_json = serde_json::to_string(&c8y_event)?; + let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC); - (message, time) - } - Some(event_data) => { - let message = event_data.text.unwrap_or_else(|| tedge_event.name.clone()); - let time = event_data.time.map_or_else( - || OffsetDateTime::now_utc().format(&Rfc3339), - |timestamp| timestamp.format(&Rfc3339), - )?; - (message, time) - } - }; - - let _ = self - .http_proxy - .send_event( - tedge_event.name.as_str(), - event_text.as_str(), - Some(event_time), - ) - .await?; + Message::new(&json_mqtt_topic, cumulocity_event_json) + }; + + // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well + if input.payload_bytes().len() < self.size_threshold.0 { + Ok(vec![message]) + } else { + // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event + let _ = self.http_proxy.send_event(c8y_event).await?; Ok(vec![]) } } + + fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> String { + format!( + "{},{},\"{}\",{}", + CREATE_EVENT_SMARTREST_CODE, c8y_event.event_type, c8y_event.text, c8y_event.time + ) + } } #[async_trait] diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index def4808c..07abc637 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -17,7 +17,7 @@ use tracing::{info, info_span, Instrument}; use super::topic::C8yTopic; const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y"; -const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024; +const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184; pub struct CumulocityMapper {} diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 58e12eaa..96ace4a8 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -3,10 +3,11 @@ use crate::core::{ size_threshold::SizeThreshold, }; use anyhow::Result; +use assert_json_diff::assert_json_include; use assert_matches::assert_matches; use c8y_api::{ http_proxy::{C8YHttpProxy, MockC8YHttpProxy}, - json_c8y::C8yUpdateSoftwareListResponse, + json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse}, }; use c8y_smartrest::{ error::SMCumulocityMapperError, operations::Operations, @@ -681,7 +682,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn convert_event() -> Result<()> { +async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> { let size_threshold = SizeThreshold(16 * 1024); let device_name = String::from("test"); let device_type = String::from("test_type"); @@ -704,9 +705,48 @@ async fn convert_event() -> Result<()> { assert_eq!(converted_events.len(), 1); let converted_event = converted_events.get(0).unwrap(); assert_eq!(converted_event.topic.name, "c8y/s/us"); + dbg!(converted_event.payload_str()?); assert!(converted_event .payload_str()? - .starts_with("400,click_event")); + .starts_with(r#"400,click_event,"Someone clicked","#)); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> { + let size_threshold = SizeThreshold(16 * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + let operations = Operations::default(); + let http_proxy = MockC8YHttpProxy::new(); + + let mut converter = CumulocityConverter::new( + size_threshold, + device_name, + device_type, + operations, + http_proxy, + ); + + let event_topic = "tedge/events/click_event"; + let event_payload = r#"{ "text": "tick", "foo": "bar" }"#; + let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + + let converted_events = converter.convert(&event_message).await; + assert_eq!(converted_events.len(), 1); + let converted_event = converted_events.get(0).unwrap(); + assert_eq!(converted_event.topic.name, "c8y/event/events/create"); + let converted_c8y_json = json!({ + "type": "click_event", + "text": "tick", + "foo": "bar", + }); + assert_eq!(converted_event.topic.name, "c8y/event/events/create"); + assert_json_include!( + actual: serde_json::from_str::<serde_json::Value>(converted_event.payload_str()?)?, + expected: converted_c8y_json + ); Ok(()) } @@ -722,12 +762,8 @@ async fn test_convert_big_event() { let mut http_proxy = MockC8YHttpProxy::new(); http_proxy .expect_send_event() - .with( - predicate::eq("click_event"), - predicate::always(), - predicate::always(), - ) - .returning(|_, _, _| Ok("123".into())); + .with(predicate::always()) + .returning(|_| Ok("123".into())); let mut converter = CumulocityConverter::new( size_threshold, @@ -787,9 +823,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy { async fn send_event( &mut self, - _event_type: &str, - _text: &str, - _time: Option<String>, + _c8y_event: C8yCreateEvent, ) -> Result<String, SMCumulocityMapperError> { Ok("123".into()) } |