summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-03-07 17:42:04 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-03-11 12:40:45 +0530
commit59fd1aa5ec16cc6a8cd89fc8f3541cf33f763ff3 (patch)
tree128406c2a056d723574d8496cc1f033efb9484c8 /crates/core
parentc02fd97270b026536ed4d3e12f793299e9e76b70 (diff)
Closes #893 Support custom fields and fragments in Thin Edge JSON events
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/Cargo.toml2
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs47
-rw-r--r--crates/core/c8y_api/src/json_c8y.rs183
-rw-r--r--crates/core/c8y_smartrest/src/event.rs141
-rw-r--r--crates/core/c8y_smartrest/src/lib.rs1
-rw-r--r--crates/core/tedge/src/cli/connect/bridge_config_c8y.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs63
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs58
-rw-r--r--crates/core/thin_edge_json/src/event.rs62
10 files changed, 338 insertions, 223 deletions
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml
index 130d7b42..3b812fe9 100644
--- a/crates/core/c8y_api/Cargo.toml
+++ b/crates/core/c8y_api/Cargo.toml
@@ -31,6 +31,8 @@ tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
anyhow = "1.0"
+assert_matches = "1.5"
mockito = "0.30"
tempfile = "3.3"
test-case = "2.0"
+time = { version = "0.3", features = ["macros"] }
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index d8047e53..9e75faf3 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -8,12 +8,12 @@ use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::Smar
use mockall::automock;
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
-use std::time::Duration;
+use std::{collections::HashMap, time::Duration};
use tedge_config::{
C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
};
-use time::{format_description, OffsetDateTime};
+use time::OffsetDateTime;
use tracing::{error, info, instrument};
@@ -31,9 +31,7 @@ pub trait C8YHttpProxy: Send + Sync {
async fn send_event(
&mut self,
- event_type: &str,
- text: &str,
- time: Option<String>,
+ c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError>;
async fn send_software_list_http(
@@ -255,20 +253,17 @@ impl JwtAuthHttpProxy {
}
fn create_log_event(&self) -> C8yCreateEvent {
- self.create_event("c8y_Logfile", "software-management", None)
- }
-
- fn create_event(&self, event_type: &str, text: &str, time: Option<String>) -> C8yCreateEvent {
let c8y_managed_object = C8yManagedObject {
id: self.end_point.c8y_internal_id.clone(),
};
- let time = time.unwrap_or_else(|| {
- OffsetDateTime::now_utc()
- .format(&format_description::well_known::Rfc3339)
- .unwrap()
- });
- C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text)
+ C8yCreateEvent::new(
+ Some(c8y_managed_object),
+ "c8y_Logfile".to_string(),
+ OffsetDateTime::now_utc(),
+ "software-management".to_string(),
+ HashMap::new(),
+ )
}
async fn send_event_internal(
@@ -325,11 +320,13 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
async fn send_event(
&mut self,
- event_type: &str,
- text: &str,
- time: Option<String>,
+ mut c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
- let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time);
+ if c8y_event.source.is_none() {
+ c8y_event.source = Some(C8yManagedObject {
+ id: self.end_point.c8y_internal_id.clone(),
+ });
+ }
self.send_event_internal(c8y_event).await
}
@@ -498,11 +495,15 @@ mod tests {
device_id,
);
- // ... creates the event and assert its id
- assert_eq!(
- http_proxy.send_event("clock_event", "tick", None).await?,
- event_id
+ let c8y_event = C8yCreateEvent::new(
+ None,
+ "clock_event".to_string(),
+ OffsetDateTime::now_utc(),
+ "tick".to_string(),
+ HashMap::new(),
);
+ // ... creates the event and assert its id
+ assert_eq!(http_proxy.send_event(c8y_event).await?, event_id);
Ok(())
}
diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs
index f1f0d2b7..be857adb 100644
--- a/crates/core/c8y_api/src/json_c8y.rs
+++ b/crates/core/c8y_api/src/json_c8y.rs
@@ -1,20 +1,34 @@
+use std::collections::HashMap;
+
use agent_interface::{
Jsonify, SoftwareListResponse, SoftwareModule, SoftwareType, SoftwareVersion,
};
+use c8y_smartrest::error::SMCumulocityMapperError;
use download::DownloadInfo;
use serde::{Deserialize, Serialize};
+use serde_json::Value;
+use thin_edge_json::event::ThinEdgeEvent;
+use time::OffsetDateTime;
const EMPTY_STRING: &str = "";
-#[derive(Debug, Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct C8yCreateEvent {
- source: C8yManagedObject,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub source: Option<C8yManagedObject>,
+
#[serde(rename = "type")]
- event_type: String,
- time: String,
- text: String,
+ pub event_type: String,
+
+ #[serde(with = "time::serde::rfc3339")]
+ pub time: OffsetDateTime,
+
+ pub text: String,
+
+ #[serde(flatten)]
+ pub extras: HashMap<String, Value>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
@@ -24,7 +38,7 @@ pub struct C8yEventResponse {
pub id: String,
}
-#[derive(Debug, Serialize, Deserialize, Clone)]
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct C8yManagedObject {
pub id: String,
@@ -97,13 +111,51 @@ impl From<&SoftwareListResponse> for C8yUpdateSoftwareListResponse {
}
impl C8yCreateEvent {
- pub fn new(source: C8yManagedObject, event_type: &str, time: &str, text: &str) -> Self {
+ pub fn new(
+ source: Option<C8yManagedObject>,
+ event_type: String,
+ time: OffsetDateTime,
+ text: String,
+ extras: HashMap<String, Value>,
+ ) -> Self {
Self {
source,
- event_type: event_type.into(),
- time: time.into(),
- text: text.into(),
+ event_type,
+ time,
+ text,
+ extras,
+ }
+ }
+}
+
+impl TryFrom<ThinEdgeEvent> for C8yCreateEvent {
+ type Error = SMCumulocityMapperError;
+
+ fn try_from(event: ThinEdgeEvent) -> Result<Self, SMCumulocityMapperError> {
+ let event_type = event.name;
+ let text;
+ let time;
+ let extras;
+ match event.data {
+ None => {
+ text = event_type.clone();
+ time = OffsetDateTime::now_utc();
+ extras = HashMap::new();
+ }
+ Some(event_data) => {
+ text = event_data.text.unwrap_or_else(|| event_type.clone());
+ time = event_data.time.unwrap_or_else(OffsetDateTime::now_utc);
+ extras = event_data.extras;
+ }
}
+
+ Ok(Self {
+ source: None,
+ event_type,
+ time,
+ text,
+ extras,
+ })
}
}
@@ -142,6 +194,12 @@ fn combine_version_and_type(
#[cfg(test)]
mod tests {
+ use anyhow::Result;
+ use assert_matches::assert_matches;
+ use test_case::test_case;
+ use thin_edge_json::event::ThinEdgeEventData;
+ use time::macros::datetime;
+
use super::*;
#[test]
@@ -286,4 +344,109 @@ mod tests {
EMPTY_STRING
);
}
+
+ #[test_case(
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: Some("Someone clicked".into()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ }),
+ },
+ C8yCreateEvent {
+ source: None,
+ event_type: "click_event".into(),
+ time: datetime!(2021-04-23 19:00:00 +05:00),
+ text: "Someone clicked".into(),
+ extras: HashMap::new(),
+ }
+ ;"event translation"
+ )]
+ #[test_case(
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: None,
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ }),
+ },
+ C8yCreateEvent {
+ source: None,
+ event_type: "click_event".into(),
+ time: datetime!(2021-04-23 19:00:00 +05:00),
+ text: "click_event".into(),
+ extras: HashMap::new(),
+ }
+ ;"event translation without text"
+ )]
+ #[test_case(
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: Some("Someone, clicked, it".into()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ }),
+ },
+ C8yCreateEvent {
+ source: None,
+ event_type: "click_event".into(),
+ time: datetime!(2021-04-23 19:00:00 +05:00),
+ text: "Someone, clicked, it".into(),
+ extras: HashMap::new(),
+ }
+ ;"event translation with commas in text"
+ )]
+ fn check_event_translation(
+ tedge_event: ThinEdgeEvent,
+ expected_c8y_event: C8yCreateEvent,
+ ) -> Result<()> {
+ let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
+
+ assert_eq!(expected_c8y_event, actual_c8y_event);
+
+ Ok(())
+ }
+
+ #[test]
+ fn event_translation_empty_json_payload_generates_timestamp() -> Result<()> {
+ let tedge_event = ThinEdgeEvent {
+ name: "empty_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: None,
+ time: None,
+ extras: HashMap::new(),
+ }),
+ };
+
+ let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
+
+ assert_eq!(actual_c8y_event.event_type, "empty_event".to_string());
+ assert_eq!(actual_c8y_event.text, "empty_event".to_string());
+ assert_matches!(actual_c8y_event.time, _);
+ assert_matches!(actual_c8y_event.source, None);
+ assert!(actual_c8y_event.extras.is_empty());
+
+ Ok(())
+ }
+
+ #[test]
+ fn event_translation_empty_payload() -> Result<()> {
+ let tedge_event = ThinEdgeEvent {
+ name: "empty_event".into(),
+ data: None,
+ };
+
+ let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
+
+ assert_eq!(actual_c8y_event.event_type, "empty_event".to_string());
+ assert_eq!(actual_c8y_event.text, "empty_event".to_string());
+ assert!(actual_c8y_event.time < OffsetDateTime::now_utc());
+ assert_matches!(actual_c8y_event.source, None);
+ assert!(actual_c8y_event.extras.is_empty());
+
+ Ok(())
+ }
}
diff --git a/crates/core/c8y_smartrest/src/event.rs b/crates/core/c8y_smartrest/src/event.rs
deleted file mode 100644
index ef2e9662..00000000
--- a/crates/core/c8y_smartrest/src/event.rs
+++ /dev/null
@@ -1,141 +0,0 @@
-use thin_edge_json::event::ThinEdgeEvent;
-use time::{format_description::well_known::Rfc3339, OffsetDateTime};
-
-use crate::error::SmartRestSerializerError;
-
-const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
-
-/// Converts from thin-edge event to C8Y event SmartREST message
-pub fn serialize_event(event: ThinEdgeEvent) -> Result<String, SmartRestSerializerError> {
- let current_timestamp = OffsetDateTime::now_utc();
- match event.data {
- None => Ok(format!(
- "{CREATE_EVENT_SMARTREST_CODE},{},{},{}",
- event.name,
- event.name,
- current_timestamp.format(&Rfc3339)?
- )),
- Some(event_data) => {
- let smartrest_message = format!(
- "{CREATE_EVENT_SMARTREST_CODE},{},\"{}\",{}",
- event.name.clone(),
- event_data.text.unwrap_or(event.name),
- event_data.time.map_or_else(
- || current_timestamp.format(&Rfc3339),
- |timestamp| timestamp.format(&Rfc3339)
- )?
- );
-
- Ok(smartrest_message)
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use anyhow::Result;
- use assert_matches::assert_matches;
- use serde::Deserialize;
- use test_case::test_case;
- use thin_edge_json::event::ThinEdgeEventData;
- use time::macros::datetime;
-
- #[test_case(
- ThinEdgeEvent {
- name: "click_event".into(),
- data: Some(ThinEdgeEventData {
- text: Some("Someone clicked".into()),
- time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
- }),
- },
- "400,click_event,\"Someone clicked\",2021-04-23T19:00:00+05:00"
- ;"event translation"
- )]
- #[test_case(
- ThinEdgeEvent {
- name: "click_event".into(),
- data: Some(ThinEdgeEventData {
- text: None,
- time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
- }),
- },
- "400,click_event,\"click_event\",2021-04-23T19:00:00+05:00"
- ;"event translation without message"
- )]
- #[test_case(
- ThinEdgeEvent {
- name: "click_event".into(),
- data: Some(ThinEdgeEventData {
- text: Some("Someone, clicked, it".into()),
- time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
- }),
- },
- "400,click_event,\"Someone, clicked, it\",2021-04-23T19:00:00+05:00"
- ;"event translation with commas in message"
- )]
- fn check_event_translation(event: ThinEdgeEvent, expected_smartrest_msg: &str) {
- let result = serialize_event(event);
-
- assert_eq!(result.unwrap(), expected_smartrest_msg);
- }
-
- #[derive(Debug, Deserialize)]
- struct SmartRestEvent {
- pub code: i32,
- pub name: String,
- pub message: Option<String>,
- pub time: Option<String>,
- }
-
- #[test]
- fn event_translation_empty_json_payload_generates_timestamp() -> Result<()> {
- let event = ThinEdgeEvent {
- name: "empty_event".into(),
- data: Some(ThinEdgeEventData {
- text: None,
- time: None,
- }),
- };
-
- let smartrest_message = serialize_event(event)?;
- let mut reader = csv::ReaderBuilder::new()
- .has_headers(false)
- .from_reader(smartrest_message.as_bytes());
- let mut iter = reader.deserialize();
- let result = iter.next();
-
- assert!(result.is_some());
- let smartrest_event: SmartRestEvent = result.expect("One entry expected")?;
- assert_eq!(smartrest_event.code, 400);
- assert_eq!(smartrest_event.name, "empty_event".to_string());
- assert_eq!(smartrest_event.message, Some("empty_event".to_string()));
- assert_matches!(smartrest_event.time, Some(_));
-
- Ok(())
- }
-
- #[test]
- fn event_translation_empty_payload() -> Result<()> {
- let event = ThinEdgeEvent {
- name: "empty_event".into(),
- data: None,
- };
-
- let smartrest_message = serialize_event(event)?;
- let mut reader = csv::ReaderBuilder::new()
- .has_headers(false)
- .from_reader(smartrest_message.as_bytes());
- let mut iter = reader.deserialize();
- let result = iter.next();
-
- assert!(result.is_some());
- let smartrest_event: SmartRestEvent = result.expect("One entry expected")?;
- assert_eq!(smartrest_event.code, 400);
- assert_eq!(smartrest_event.name, "empty_event".to_string());
- assert_eq!(smartrest_event.message, Some("empty_event".to_string()));
- assert_matches!(smartrest_event.time, Some(_));
-
- Ok(())
- }
-}
diff --git a/crates/core/c8y_smartrest/src/lib.rs b/crates/core/c8y_smartrest/src/lib.rs
index b6338b06..ddb3bd2c 100644
--- a/crates/core/c8y_smartrest/src/lib.rs
+++ b/crates/core/c8y_smartrest/src/lib.rs
@@ -1,6 +1,5 @@
pub mod alarm;
pub mod error;
-pub mod event;
pub mod operations;
pub mod smartrest_deserializer;
pub mod smartrest_serializer;
diff --git a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs
index 2ceb7007..ff880e65 100644
--- a/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs
+++ b/crates/core/tedge/src/cli/connect/bridge_config_c8y.rs
@@ -67,6 +67,7 @@ impl From<BridgeConfigC8yParams> for BridgeConfig {
// c8y JSON
r#"inventory/managedObjects/update/# out 2 c8y/ """#.into(),
r#"measurement/measurements/create out 2 c8y/ """#.into(),
+ r#"event/events/create out 2 c8y/ """#.into(),
r#"error in 2 c8y/ """#.into(),
// c8y JWT token retrieval
r#"s/uat/# out 2 c8y/ """#.into(),
@@ -128,6 +129,7 @@ fn test_bridge_config_from_c8y_params() -> anyhow::Result<()> {
// c8y JSON
r#"inventory/managedObjects/update/# out 2 c8y/ """#.into(),
r#"measurement/measurements/create out 2 c8y/ """#.into(),
+ r#"event/events/create out 2 c8y/ """#.into(),
r#"error in 2 c8y/ """#.into(),
// c8y JWT token retrieval
r#"s/uat/# out 2 c8y/ """#.into(),
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 6a658a1c..c5ab7b96 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -5,11 +5,13 @@ use agent_interface::{
RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use async_trait::async_trait;
-use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use c8y_api::{
+ http_proxy::C8YHttpProxy,
+ json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
+};
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- event::{self},
operations::Operations,
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
@@ -29,7 +31,6 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
-use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -47,6 +48,9 @@ const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
+const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
+
+const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
#[derive(Debug)]
pub struct CumulocityConverter<Proxy>
@@ -150,43 +154,38 @@ where
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let c8y_event = C8yCreateEvent::try_from(tedge_event)?;
- // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- if input.payload_bytes().len() < self.size_threshold.0 {
- let smartrest_alarm = event::serialize_event(tedge_event)?;
+ // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST
+ let message = if c8y_event.extras.is_empty() {
+ let smartrest_event = Self::serialize_to_smartrest(&c8y_event);
let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ Message::new(&smartrest_topic, smartrest_event)
} else {
- // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
- let (event_text, event_time) = match tedge_event.data {
- None => {
- let message = tedge_event.name.clone();
- let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+ // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON
+ let cumulocity_event_json = serde_json::to_string(&c8y_event)?;
+ let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC);
- (message, time)
- }
- Some(event_data) => {
- let message = event_data.text.unwrap_or_else(|| tedge_event.name.clone());
- let time = event_data.time.map_or_else(
- || OffsetDateTime::now_utc().format(&Rfc3339),
- |timestamp| timestamp.format(&Rfc3339),
- )?;
- (message, time)
- }
- };
-
- let _ = self
- .http_proxy
- .send_event(
- tedge_event.name.as_str(),
- event_text.as_str(),
- Some(event_time),
- )
- .await?;
+ Message::new(&json_mqtt_topic, cumulocity_event_json)
+ };
+
+ // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ Ok(vec![message])
+ } else {
+ // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ let _ = self.http_proxy.send_event(c8y_event).await?;
Ok(vec![])
}
}
+
+ fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> String {
+ format!(
+ "{},{},\"{}\",{}",
+ CREATE_EVENT_SMARTREST_CODE, c8y_event.event_type, c8y_event.text, c8y_event.time
+ )
+ }
}
#[async_trait]
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index def4808c..07abc637 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -17,7 +17,7 @@ use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
-const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024;
+const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16184;
pub struct CumulocityMapper {}
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 58e12eaa..96ace4a8 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -3,10 +3,11 @@ use crate::core::{
size_threshold::SizeThreshold,
};
use anyhow::Result;
+use assert_json_diff::assert_json_include;
use assert_matches::assert_matches;
use c8y_api::{
http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
- json_c8y::C8yUpdateSoftwareListResponse,
+ json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
@@ -681,7 +682,7 @@ async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
-async fn convert_event() -> Result<()> {
+async fn convert_event_with_known_fields_to_c8y_smartrest() -> Result<()> {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
let device_type = String::from("test_type");
@@ -704,9 +705,48 @@ async fn convert_event() -> Result<()> {
assert_eq!(converted_events.len(), 1);
let converted_event = converted_events.get(0).unwrap();
assert_eq!(converted_event.topic.name, "c8y/s/us");
+ dbg!(converted_event.payload_str()?);
assert!(converted_event
.payload_str()?
- .starts_with("400,click_event"));
+ .starts_with(r#"400,click_event,"Someone clicked","#));
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn convert_event_with_extra_fields_to_c8y_json() -> Result<()> {
+ let size_threshold = SizeThreshold(16 * 1024);
+ let device_name = String::from("test");
+ let device_type = String::from("test_type");
+ let operations = Operations::default();
+ let http_proxy = MockC8YHttpProxy::new();
+
+ let mut converter = CumulocityConverter::new(
+ size_threshold,
+ device_name,
+ device_type,
+ operations,
+ http_proxy,
+ );
+
+ let event_topic = "tedge/events/click_event";
+ let event_payload = r#"{ "text": "tick", "foo": "bar" }"#;
+ let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload);
+
+ let converted_events = converter.convert(&event_message).await;
+ assert_eq!(converted_events.len(), 1);
+ let converted_event = converted_events.get(0).unwrap();
+ assert_eq!(converted_event.topic.name, "c8y/event/events/create");
+ let converted_c8y_json = json!({
+ "type": "click_event",
+ "text": "tick",
+ "foo": "bar",
+ });
+ assert_eq!(converted_event.topic.name, "c8y/event/events/create");
+ assert_json_include!(
+ actual: serde_json::from_str::<serde_json::Value>(converted_event.payload_str()?)?,
+ expected: converted_c8y_json
+ );
Ok(())
}
@@ -722,12 +762,8 @@ async fn test_convert_big_event() {
let mut http_proxy = MockC8YHttpProxy::new();
http_proxy
.expect_send_event()
- .with(
- predicate::eq("click_event"),
- predicate::always(),
- predicate::always(),
- )
- .returning(|_, _, _| Ok("123".into()));
+ .with(predicate::always())
+ .returning(|_| Ok("123".into()));
let mut converter = CumulocityConverter::new(
size_threshold,
@@ -787,9 +823,7 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
async fn send_event(
&mut self,
- _event_type: &str,
- _text: &str,
- _time: Option<String>,
+ _c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
Ok("123".into())
}
diff --git a/crates/core/thin_edge_json/src/event.rs b/crates/core/thin_edge_json/src/event.rs
index 474128ed..42b1ed35 100644
--- a/crates/core/thin_edge_json/src/event.rs
+++ b/crates/core/thin_edge_json/src/event.rs
@@ -1,23 +1,31 @@
+use std::collections::HashMap;
+
use clock::Timestamp;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
use self::error::ThinEdgeJsonDeserializerError;
/// In-memory representation of ThinEdge JSON event.
-#[derive(Debug, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ThinEdgeEvent {
+ #[serde(rename = "type")]
pub name: String,
+ #[serde(flatten)]
pub data: Option<ThinEdgeEventData>,
}
/// In-memory representation of ThinEdge JSON event payload
-#[derive(Debug, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ThinEdgeEventData {
pub text: Option<String>,
#[serde(default)]
#[serde(with = "clock::serde::rfc3339::option")]
pub time: Option<Timestamp>,
+
+ #[serde(flatten)]
+ pub extras: HashMap<String, Value>,
}
pub mod error {
@@ -84,6 +92,7 @@ mod tests {
data: Some(ThinEdgeEventData {
text: Some("Someone clicked".into()),
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
}),
};
"event parsing"
@@ -98,6 +107,7 @@ mod tests {
data: Some(ThinEdgeEventData {
text: Some("Someone clicked".into()),
time: None,
+ extras: HashMap::new(),
}),
};
"event parsing without timestamp"
@@ -112,6 +122,7 @@ mod tests {
data: Some(ThinEdgeEventData {
text: None,
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
}),
};
"event parsing without text"
@@ -124,6 +135,7 @@ mod tests {
data: Some(ThinEdgeEventData {
text: None,
time: None,
+ extras: HashMap::new(),
}),
};
"event parsing without text or timestamp"
@@ -158,9 +170,53 @@ mod tests {
#[test]
fn event_translation_empty_payload() -> Result<()> {
+ let event_data = ThinEdgeEventData {
+ text: Some("foo".to_string()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ };
+
+ let serialized = serde_json::to_string(&event_data).unwrap();
+ println!("serialized = {}", serialized);
+
+ Ok(())
+ }
+
+ #[test]