summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-03-07 17:42:04 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-03-11 12:40:45 +0530
commit59fd1aa5ec16cc6a8cd89fc8f3541cf33f763ff3 (patch)
tree128406c2a056d723574d8496cc1f033efb9484c8 /crates/core/tedge_mapper/src
parentc02fd97270b026536ed4d3e12f793299e9e76b70 (diff)
Closes #893 Support custom fields and fragments in Thin Edge JSON events
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs63
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs58
3 files changed, 78 insertions, 45 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 6a658a1c..c5ab7b96 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -5,11 +5,13 @@ use agent_interface::{
RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use async_trait::async_trait;
-use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use c8y_api::{
+ http_proxy::C8YHttpProxy,
+ json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
+};
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- event::{self},
operations::Operations,
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
@@ -29,7 +31,6 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
-use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -47,6 +48,9 @@ const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
+const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
+
+const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
#[derive(Debug)]
pub struct CumulocityConverter<Proxy>
@@ -150,43 +154,38 @@ where
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let c8y_event = C8yCreateEvent::try_from(tedge_event)?;
- // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- if input.payload_bytes().len() < self.size_threshold.0 {
- let smartrest_alarm = event::serialize_event(tedge_event)?;
+ // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST
+ let message = if c8y_event.extras.is_empty() {
+ let smartrest_event = Self::serialize_to_smartrest(&c8y_event);
let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ Message::new(&smartrest_topic, smartrest_event)
} else {
- // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
- let (event_text, event_time) = match tedge_event.data {
- None => {
- let message = tedge_event.name.clone();
- let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+ // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON
+ let cumulocity_event_json = serde_json::to_string(&c8y_event)?;
+ let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC);
- (message, time)
- }
- Some(event_data) => {
- let message = event_data.text.unwrap_or_else(|| tedge_event.name.clone());
- let time = event_data.time.map_or_else(
- || OffsetDateTime::now_utc().format(&Rfc3339),
- |timestamp| timestamp.format(&Rfc3339),
- )?;
- (message, time)
- }
- };
-
- let _ = self
- .http_proxy
- .send_event(
- tedge_event.name.as_str(),
- event_text.as_str(),
- Some(event_time),
- )
- .await?;
+ Message::new(&json_mqtt_topic, cumulocity_event_json)
+ };
+
+ // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ Ok(vec![message])
+ } else {
+ // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ let _ = self.http_proxy.send_event(c8y_event).await?;
Ok(vec![])
}
}
+
+ fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> String {
+ format!(
+ "{},{},\"{}\",{}",
+ CREATE_EVENT_SMARTREST_CODE, c8y_event.event_type, c8y_event.text, c8y_event.time
+ )
+ }
}
#[async_trait]
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index def4808c..07abc637 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -17,7 +17,7 @@ use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
-const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024;
+const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184;
pub struct CumulocityMapper {}
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 58e12eaa..96ace4a8 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -3,10 +3,11 @@ use crate::core::{
size_threshold::SizeThreshold,
};
use anyhow::Result;
+use assert_json_diff::assert_json_include;
use assert_matches::assert_matches;
use c8y_api::{
http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
- json_c8y::C8yUpdateSoftwareListResponse,
+ json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
@@ -681,7 +682,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
-async fn convert_event() -> Result<()> {
+async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
let device_type = String::from("test_type");
@@ -704,9 +705,48 @@ async fn convert_event() -> Result<()> {
assert_eq!(converted_events.len(), 1);
let converted_event = converted_events.get(0).unwrap();
assert_eq!(converted_event.topic.name, "c8y/s/us");
+ dbg!(converted_event.payload_str()?);
assert!(converted_event
.payload_str()?
- .starts_with("400,click_event"));
+ .starts_with(r#"400,click_event,"Someone clicked","#));
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
+ let size_threshold = SizeThreshold(16 * 1024);
+ let device_name = String::from("test");
+ let device_type = String::from("test_type");
+ let operations = Operations::default();
+ let http_proxy = MockC8YHttpProxy::new();
+
+ let mut converter = CumulocityConverter::new(
+ size_threshold,
+ device_name,
+ device_type,
+ operations,
+ http_proxy,
+ );
+
+ let event_topic = "tedge/events/click_event";
+ let event_payload = r#"{ "text": "tick", "foo": "bar" }"#;
+ let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
+
+ let converted_events = converter.convert(&event_message).await;
+ assert_eq!(converted_events.len(), 1);
+ let converted_event = converted_events.get(0).unwrap();
+ assert_eq!(converted_event.topic.name, "c8y/event/events/create");
+ let converted_c8y_json = json!({
+ "type": "click_event",
+ "text": "tick",
+ "foo": "bar",
+ });
+ assert_eq!(converted_event.topic.name, "c8y/event/events/create");
+ assert_json_include!(
+ actual: serde_json::from_str::<serde_json::Value>(converted_event.payload_str()?)?,
+ expected: converted_c8y_json
+ );
Ok(())
}
@@ -722,12 +762,8 @@ async fn test_convert_big_event() {
let mut http_proxy = MockC8YHttpProxy::new();
http_proxy
.expect_send_event()
- .with(
- predicate::eq("click_event"),
- predicate::always(),
- predicate::always(),
- )
- .returning(|_, _, _| Ok("123".into()));
+ .with(predicate::always())
+ .returning(|_| Ok("123".into()));
let mut converter = CumulocityConverter::new(
size_threshold,
@@ -787,9 +823,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
async fn send_event(
&mut self,
- _event_type: &str,
- _text: &str,
- _time: Option<String>,
+ _c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
Ok("123".into())
}