From 59fd1aa5ec16cc6a8cd89fc8f3541cf33f763ff3 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Mon, 7 Mar 2022 17:42:04 +0530 Subject: Closes #893 Support custom fields and fragments in Thin Edge JSON events --- crates/core/c8y_api/Cargo.toml | 2 + crates/core/c8y_api/src/http_proxy.rs | 47 +++--- crates/core/c8y_api/src/json_c8y.rs | 183 +++++++++++++++++++-- crates/core/c8y_smartrest/src/event.rs | 141 ---------------- crates/core/c8y_smartrest/src/lib.rs | 1 - .../tedge/src/cli/connect/bridge_config_c8y.rs | 2 + crates/core/tedge_mapper/src/c8y/converter.rs | 63 ++++--- crates/core/tedge_mapper/src/c8y/mapper.rs | 2 +- crates/core/tedge_mapper/src/c8y/tests.rs | 58 +++++-- crates/core/thin_edge_json/src/event.rs | 62 ++++++- 10 files changed, 338 insertions(+), 223 deletions(-) delete mode 100644 crates/core/c8y_smartrest/src/event.rs (limited to 'crates/core') diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml index 130d7b42..3b812fe9 100644 --- a/crates/core/c8y_api/Cargo.toml +++ b/crates/core/c8y_api/Cargo.toml @@ -31,6 +31,8 @@ tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] anyhow = "1.0" +assert_matches = "1.5" mockito = "0.30" tempfile = "3.3" test-case = "2.0" +time = { version = "0.3", features = ["macros"] } diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index d8047e53..9e75faf3 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -8,12 +8,12 @@ use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::Smar use mockall::automock; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use tedge_config::{ C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, }; -use time::{format_description, OffsetDateTime}; +use time::OffsetDateTime; use tracing::{error, info, instrument}; @@ -31,9 +31,7 @@ pub trait C8YHttpProxy: Send + Sync { async fn send_event( &mut self, - event_type: &str, - text: &str, - time: Option, + c8y_event: C8yCreateEvent, ) -> Result; async fn send_software_list_http( @@ -255,20 +253,17 @@ impl JwtAuthHttpProxy { } fn create_log_event(&self) -> C8yCreateEvent { - self.create_event("c8y_Logfile", "software-management", None) - } - - fn create_event(&self, event_type: &str, text: &str, time: Option) -> C8yCreateEvent { let c8y_managed_object = C8yManagedObject { id: self.end_point.c8y_internal_id.clone(), }; - let time = time.unwrap_or_else(|| { - OffsetDateTime::now_utc() - .format(&format_description::well_known::Rfc3339) - .unwrap() - }); - C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text) + C8yCreateEvent::new( + Some(c8y_managed_object), + "c8y_Logfile".to_string(), + OffsetDateTime::now_utc(), + "software-management".to_string(), + HashMap::new(), + ) } async fn send_event_internal( @@ -325,11 +320,13 @@ impl C8YHttpProxy for JwtAuthHttpProxy { async fn send_event( &mut self, - event_type: &str, - text: &str, - time: Option, + mut c8y_event: C8yCreateEvent, ) -> Result { - let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time); + if c8y_event.source.is_none() { + c8y_event.source = Some(C8yManagedObject { + id: self.end_point.c8y_internal_id.clone(), + }); + } self.send_event_internal(c8y_event).await } @@ -498,11 +495,15 @@ mod tests { device_id, ); - // ... creates the event and assert its id - assert_eq!( - http_proxy.send_event("clock_event", "tick", None).await?, - event_id + let c8y_event = C8yCreateEvent::new( + None, + "clock_event".to_string(), + OffsetDateTime::now_utc(), + "tick".to_string(), + HashMap::new(), ); + // ... creates the event and assert its id + assert_eq!(http_proxy.send_event(c8y_event).await?, event_id); Ok(()) } diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index f1f0d2b7..be857adb 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -1,20 +1,34 @@ +use std::collections::HashMap; + use agent_interface::{ Jsonify, SoftwareListResponse, SoftwareModule, SoftwareType, SoftwareVersion, }; +use c8y_smartrest::error::SMCumulocityMapperError; use download::DownloadInfo; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use thin_edge_json::event::ThinEdgeEvent; +use time::OffsetDateTime; const EMPTY_STRING: &str = ""; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct C8yCreateEvent { - source: C8yManagedObject, + #[serde(skip_serializing_if = "Option::is_none")] + pub source: Option, + #[serde(rename = "type")] - event_type: String, - time: String, - text: String, + pub event_type: String, + + #[serde(with = "time::serde::rfc3339")] + pub time: OffsetDateTime, + + pub text: String, + + #[serde(flatten)] + pub extras: HashMap, } #[derive(Debug, Deserialize, Serialize, PartialEq)] @@ -24,7 +38,7 @@ pub struct C8yEventResponse { pub id: String, } -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub struct C8yManagedObject { pub id: String, @@ -97,13 +111,51 @@ impl From<&SoftwareListResponse> for C8yUpdateSoftwareListResponse { } impl C8yCreateEvent { - pub fn new(source: C8yManagedObject, event_type: &str, time: &str, text: &str) -> Self { + pub fn new( + source: Option, + event_type: String, + time: OffsetDateTime, + text: String, + extras: HashMap, + ) -> Self { Self { source, - event_type: event_type.into(), - time: time.into(), - text: text.into(), + event_type, + time, + text, + extras, + } + } +} + +impl TryFrom for C8yCreateEvent { + type Error = SMCumulocityMapperError; + + fn try_from(event: ThinEdgeEvent) -> Result { + let event_type = event.name; + let text; + let time; + let extras; + match event.data { + None => { + text = event_type.clone(); + time = OffsetDateTime::now_utc(); + extras = HashMap::new(); + } + Some(event_data) => { + text = event_data.text.unwrap_or_else(|| event_type.clone()); + time = event_data.time.unwrap_or_else(OffsetDateTime::now_utc); + extras = event_data.extras; + } } + + Ok(Self { + source: None, + event_type, + time, + text, + extras, + }) } } @@ -142,6 +194,12 @@ fn combine_version_and_type( #[cfg(test)] mod tests { + use anyhow::Result; + use assert_matches::assert_matches; + use test_case::test_case; + use thin_edge_json::event::ThinEdgeEventData; + use time::macros::datetime; + use super::*; #[test] @@ -286,4 +344,109 @@ mod tests { EMPTY_STRING ); } + + #[test_case( + ThinEdgeEvent { + name: "click_event".into(), + data: Some(ThinEdgeEventData { + text: Some("Someone clicked".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), + }), + }, + C8yCreateEvent { + source: None, + event_type: "click_event".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "Someone clicked".into(), + extras: HashMap::new(), + } + ;"event translation" + )] + #[test_case( + ThinEdgeEvent { + name: "click_event".into(), + data: Some(ThinEdgeEventData { + text: None, + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), + }), + }, + C8yCreateEvent { + source: None, + event_type: "click_event".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "click_event".into(), + extras: HashMap::new(), + } + ;"event translation without text" + )] + #[test_case( + ThinEdgeEvent { + name: "click_event".into(), + data: Some(ThinEdgeEventData { + text: Some("Someone, clicked, it".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), + }), + }, + C8yCreateEvent { + source: None, + event_type: "click_event".into(), + time: datetime!(2021-04-23 19:00:00 +05:00), + text: "Someone, clicked, it".into(), + extras: HashMap::new(), + } + ;"event translation with commas in text" + )] + fn check_event_translation( + tedge_event: ThinEdgeEvent, + expected_c8y_event: C8yCreateEvent, + ) -> Result<()> { + let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?; + + assert_eq!(expected_c8y_event, actual_c8y_event); + + Ok(()) + } + + #[test] + fn event_translation_empty_json_payload_generates_timestamp() -> Result<()> { + let tedge_event = ThinEdgeEvent { + name: "empty_event".into(), + data: Some(ThinEdgeEventData { + text: None, + time: None, + extras: HashMap::new(), + }), + }; + + let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?; + + assert_eq!(actual_c8y_event.event_type, "empty_event".to_string()); + assert_eq!(actual_c8y_event.text, "empty_event".to_string()); + assert_matches!(actual_c8y_event.time, _); + assert_matches!(actual_c8y_event.source, None); + assert!(actual_c8y_event.extras.is_empty()); + + Ok(()) + } + + #[test] + fn event_translation_empty_payload() -> Result<()> { + let tedge_event = ThinEdgeEvent { + name: "empty_event".into(), + data: None, + }; + + let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?; + + assert_eq!(actual_c8y_event.event_type, "empty_event".to_string()); + assert_eq!(actual_c8y_event.text, "empty_event".to_string()); + assert!(actual_c8y_event.time < OffsetDateTime::now_utc()); + assert_matches!(actual_c8y_event.source, None); + assert!(actual_c8y_event.extras.is_empty()); + + Ok(()) + } } diff --git a/crates/core/c8y_smartrest/src/event.rs b/crates/core/c8y_smartrest/src/event.rs deleted file mode 100644 index ef2e9662..00000000 --- a/crates/core/c8y_smartrest/src/event.rs +++ /dev/null @@ -1,141 +0,0 @@ -use thin_edge_json::event::ThinEdgeEvent; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; - -use crate::error::SmartRestSerializerError; - -const CREATE_EVENT_SMARTREST_CODE: u16 = 400; - -/// Converts from thin-edge event to C8Y event SmartREST message -pub fn serialize_event(event: ThinEdgeEvent) -> Result { - let current_timestamp = OffsetDateTime::now_utc(); - match event.data { - None => Ok(format!( - "{CREATE_EVENT_SMARTREST_CODE},{},{},{}", - event.name, - event.name, - current_timestamp.format(&Rfc3339)? - )), - Some(event_data) => { - let smartrest_message = format!( - "{CREATE_EVENT_SMARTREST_CODE},{},\"{}\",{}", - event.name.clone(), - event_data.text.unwrap_or(event.name), - event_data.time.map_or_else( - || current_timestamp.format(&Rfc3339), - |timestamp| timestamp.format(&Rfc3339) - )? - ); - - Ok(smartrest_message) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use anyhow::Result; - use assert_matches::assert_matches; - use serde::Deserialize; - use test_case::test_case; - use thin_edge_json::event::ThinEdgeEventData; - use time::macros::datetime; - - #[test_case( - ThinEdgeEvent { - name: "click_event".into(), - data: Some(ThinEdgeEventData { - text: Some("Someone clicked".into()), - time: Some(datetime!(2021-04-23 19:00:00 +05:00)), - }), - }, - "400,click_event,\"Someone clicked\",2021-04-23T19:00:00+05:00" - ;"event translation" - )] - #[test_case( - ThinEdgeEvent { - name: "click_event".into(), - data: Some(ThinEdgeEventData { - text: None, - time: Some(datetime!(2021-04-23 19:00:00 +05:00)), - }), - }, - "400,click_event,\"click_event\",2021-04-23T19:00:00+05:00" - ;"event translation without message" - )] - #[test_case( - ThinEdgeEvent { - name: "click_event".into(), - data: Some(ThinEdgeEventData { - text: Some("Someone, clicked, it".into()), - time: Some(datetime!(2021-04-23 19:00:00 +05:00)), - }), - }, - "400,click_event,\"Someone, clicked, it\",2021-04-23T19:00:00+05:00" - ;"event translation with commas in message" - )] - fn check_event_translation(event: ThinEdgeEvent, expected_smartrest_msg: &str) { - let result = serialize_event(event); - - assert_eq!(result.unwrap(), expected_smartrest_msg); - } - - #[derive(Debug, Deserialize)] - struct SmartRestEvent { - pub code: i32, - pub name: String, - pub message: Option, - pub time: Option, - } - - #[test] - fn event_translation_empty_json_payload_generates_timestamp() -> Result<()> { - let event = ThinEdgeEvent { - name: "empty_event".into(), - data: Some(ThinEdgeEventData { - text: None, - time: None, - }), - }; - - let smartrest_message = serialize_event(event)?; - let mut reader = csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(smartrest_message.as_bytes()); - let mut iter = reader.deserialize(); - let result = iter.next(); - - assert!(result.is_some()); - let smartrest_event: SmartRestEvent = result.expect("One entry expected")?; - assert_eq!(smartrest_event.code, 400); - assert_eq!(smartrest_event.name, "empty_event".to_string()); - assert_eq!(smartrest_event.message, Some("empty_event".to_string())); - assert_matches!(smartrest_event.time, Some(_)); - - Ok(()) - } - - #[test] - fn event_translation_empty_payload() -> Result<()> { - let event = ThinEdgeEvent { - name: "empty_event".into(), - data: None, - }; - - let smartrest_message = serialize_event(event)?; - let mut reader = csv::ReaderBuilder::new() - .has_headers(false) - .from_reader(smartrest_message.as_bytes()); - let mut iter = reader.deserialize(); - let result = iter.next(); - - assert!(result.is_some()); - let smartrest_event: SmartRestEvent = result.expect("One entry expected")?; - assert_eq!(smartrest_event.code, 400); - assert_eq!(smartrest_event.name, "empty_event".to_string()); - assert_eq!(smartrest_event.message, Some("empty_event".to_string())); - assert_matches!(smartrest_event.time, Some(_)); - - Ok(()) - } -} diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs index b6338b06..ddb3bd2c 100644 --- a/crates/core/c8y_smartrest/src/lib.rs +++ b/crates/core/c8y_smartrest/src/lib.rs @@ -1,6 +1,5 @@ pub mod alarm; pub mod error; -pub mod event; pub mod operations; pub mod smartrest_deserializer; pub mod smartrest_serializer; diff --git a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs index 2ceb7007..ff880e65 100644 --- a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs +++ b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs @@ -67,6 +67,7 @@ impl From for BridgeConfig { // c8y JSON r#"inventory/managedObjects/update/# out 2 c8y/ """#.into(), r#"measurement/measurements/create out 2 c8y/ """#.into(), + r#"event/events/create out 2 c8y/ """#.into(), r#"error in 2 c8y/ """#.into(), // c8y JWT token retrieval r#"s/uat/# out 2 c8y/ """#.into(), @@ -128,6 +129,7 @@ fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> { // c8y JSON r#"inventory/managedObjects/update/# out 2 c8y/ """#.into(), r#"measurement/measurements/create out 2 c8y/ """#.into(), + r#"event/events/create out 2 c8y/ """#.into(), r#"error in 2 c8y/ """#.into(), // c8y JWT token retrieval r#"s/uat/# out 2 c8y/ """#.into(), diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 6a658a1c..c5ab7b96 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -5,11 +5,13 @@ use agent_interface::{ RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, }; use async_trait::async_trait; -use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}; +use c8y_api::{ + http_proxy::C8YHttpProxy, + json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse}, +}; use c8y_smartrest::{ alarm, error::SmartRestDeserializerError, - event::{self}, operations::Operations, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, smartrest_serializer::{ @@ -29,7 +31,6 @@ use std::{ process::Stdio, }; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; -use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tracing::{debug, info, log::error}; use super::{ @@ -47,6 +48,9 @@ const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; +const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create"; + +const CREATE_EVENT_SMARTREST_CODE: u16 = 400; #[derive(Debug)] pub struct CumulocityConverter @@ -150,43 +154,38 @@ where input: &Message, ) -> Result, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; + let c8y_event = C8yCreateEvent::try_from(tedge_event)?; - // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well - if input.payload_bytes().len() < self.size_threshold.0 { - let smartrest_alarm = event::serialize_event(tedge_event)?; + // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST + let message = if c8y_event.extras.is_empty() { + let smartrest_event = Self::serialize_to_smartrest(&c8y_event); let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + Message::new(&smartrest_topic, smartrest_event) } else { - // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event - let (event_text, event_time) = match tedge_event.data { - None => { - let message = tedge_event.name.clone(); - let time = OffsetDateTime::now_utc().format(&Rfc3339)?; + // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON + let cumulocity_event_json = serde_json::to_string(&c8y_event)?; + let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC); - (message, time) - } - Some(event_data) => { - let message = event_data.text.unwrap_or_else(|| tedge_event.name.clone()); - let time = event_data.time.map_or_else( - || OffsetDateTime::now_utc().format(&Rfc3339), - |timestamp| timestamp.format(&Rfc3339), - )?; - (message, time) - } - }; - - let _ = self - .http_proxy - .send_event( - tedge_event.name.as_str(), - event_text.as_str(), - Some(event_time), - ) - .await?; + Message::new(&json_mqtt_topic, cumulocity_event_json) + }; + + // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well + if input.payload_bytes().len() < self.size_threshold.0 { + Ok(vec![message]) + } else { + // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event + let _ = self.http_proxy.send_event(c8y_event).await?; Ok(vec![]) } } + + fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> String { + format!( + "{},{},\"{}\",{}", + CREATE_EVENT_SMARTREST_CODE, c8y_event.event_type, c8y_event.text, c8y_event.time + ) + } } #[async_trait] diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index def4808c..07abc637 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -17,7 +17,7 @@ use tracing::{info, info_span, Instrument}; use super::topic::C8yTopic; const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y"; -const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024; +const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184; pub struct CumulocityMapper {} diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 58e12eaa..96ace4a8 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -3,10 +3,11 @@ use crate::core::{ size_threshold::SizeThreshold, }; use anyhow::Result; +use assert_json_diff::assert_json_include; use assert_matches::assert_matches; use c8y_api::{ http_proxy::{C8YHttpProxy, MockC8YHttpProxy}, - json_c8y::C8yUpdateSoftwareListResponse, + json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse}, }; use c8y_smartrest::{ error::SMCumulocityMapperError, operations::Operations, @@ -681,7 +682,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn convert_event() -> Result<()> { +async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> { let size_threshold = SizeThreshold(16 * 1024); let device_name = String::from("test"); let device_type = String::from("test_type"); @@ -704,9 +705,48 @@ async fn convert_event() -> Result<()> { assert_eq!(converted_events.len(), 1); let converted_event = converted_events.get(0).unwrap(); assert_eq!(converted_event.topic.name, "c8y/s/us"); + dbg!(converted_event.payload_str()?); assert!(converted_event .payload_str()? - .starts_with("400,click_event")); + .starts_with(r#"400,click_event,"Someone clicked","#)); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> { + let size_threshold = SizeThreshold(16 * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + let operations = Operations::default(); + let http_proxy = MockC8YHttpProxy::new(); + + let mut converter = CumulocityConverter::new( + size_threshold, + device_name, + device_type, + operations, + http_proxy, + ); + + let event_topic = "tedge/events/click_event"; + let event_payload = r#"{ "text": "tick", "foo": "bar" }"#; + let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + + let converted_events = converter.convert(&event_message).await; + assert_eq!(converted_events.len(), 1); + let converted_event = converted_events.get(0).unwrap(); + assert_eq!(converted_event.topic.name, "c8y/event/events/create"); + let converted_c8y_json = json!({ + "type": "click_event", + "text": "tick", + "foo": "bar", + }); + assert_eq!(converted_event.topic.name, "c8y/event/events/create"); + assert_json_include!( + actual: serde_json::from_str::(converted_event.payload_str()?)?, + expected: converted_c8y_json + ); Ok(()) } @@ -722,12 +762,8 @@ async fn test_convert_big_event() { let mut http_proxy = MockC8YHttpProxy::new(); http_proxy .expect_send_event() - .with( - predicate::eq("click_event"), - predicate::always(), - predicate::always(), - ) - .returning(|_, _, _| Ok("123".into())); + .with(predicate::always()) + .returning(|_| Ok("123".into())); let mut converter = CumulocityConverter::new( size_threshold, @@ -787,9 +823,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy { async fn send_event( &mut self, - _event_type: &str, - _text: &str, - _time: Option, + _c8y_event: C8yCreateEvent, ) -> Result { Ok("123".into()) } diff --git a/crates/core/thin_edge_json/src/event.rs b/crates/core/thin_edge_json/src/event.rs index 474128ed..42b1ed35 100644 --- a/crates/core/thin_edge_json/src/event.rs +++ b/crates/core/thin_edge_json/src/event.rs @@ -1,23 +1,31 @@ +use std::collections::HashMap; + use clock::Timestamp; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use serde_json::Value; use self::error::ThinEdgeJsonDeserializerError; /// In-memory representation of ThinEdge JSON event. -#[derive(Debug, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ThinEdgeEvent { + #[serde(rename = "type")] pub name: String, + #[serde(flatten)] pub data: Option, } /// In-memory representation of ThinEdge JSON event payload -#[derive(Debug, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct ThinEdgeEventData { pub text: Option, #[serde(default)] #[serde(with = "clock::serde::rfc3339::option")] pub time: Option, + + #[serde(flatten)] + pub extras: HashMap, } pub mod error { @@ -84,6 +92,7 @@ mod tests { data: Some(ThinEdgeEventData { text: Some("Someone clicked".into()), time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), }), }; "event parsing" @@ -98,6 +107,7 @@ mod tests { data: Some(ThinEdgeEventData { text: Some("Someone clicked".into()), time: None, + extras: HashMap::new(), }), }; "event parsing without timestamp" @@ -112,6 +122,7 @@ mod tests { data: Some(ThinEdgeEventData { text: None, time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), }), }; "event parsing without text" @@ -124,6 +135,7 @@ mod tests { data: Some(ThinEdgeEventData { text: None, time: None, + extras: HashMap::new(), }), }; "event parsing without text or timestamp" @@ -158,9 +170,53 @@ mod tests { #[test] fn event_translation_empty_payload() -> Result<()> { + let event_data = ThinEdgeEventData { + text: Some("foo".to_string()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), + }; + + let serialized = serde_json::to_string(&event_data).unwrap(); + println!("serialized = {}", serialized); + + Ok(()) + } + + #[test] + fn test_serialize() -> Result<()> { let result = ThinEdgeEvent::try_from("tedge/events/click_event", "")?; assert_eq!(result.name, "click_event".to_string()); assert_matches!(result.data, None); + Ok(()) + } + + #[test] + fn event_translation_additional_fields() -> Result<()> { + let event_json = json!({ + "text": "foo", + "time": "2021-04-23T19:00:00+05:00", + "extra": "field", + "numeric": 32u64, + "complex": { + "hello": "world", + "num": 5u32 + } + }); + + let result = + ThinEdgeEvent::try_from("tedge/events/click_event", event_json.to_string().as_str())?; + + assert_eq!(result.name, "click_event".to_string()); + let event_data = result.data.unwrap(); + assert_eq!( + event_data.extras.get("extra").unwrap().as_str().unwrap(), + "field" + ); + assert_eq!( + event_data.extras.get("numeric").unwrap().as_u64().unwrap(), + 32u64 + ); + assert_matches!(event_data.extras.get("complex"), Some(Value::Object(_))); Ok(()) } -- cgit v1.2.3