summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/converter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/converter.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs58
1 files changed, 38 insertions, 20 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 15b250a1..1616acc8 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -29,6 +29,7 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -149,26 +150,43 @@ where
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- match self.size_threshold.validate(input.payload_str()?) {
- // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- Ok(()) => {
- let smartrest_alarm = event::serialize_event(tedge_event)?;
- let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
- }
+ // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+
+ Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ } else {
// If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
- Err(_) => {
- let event_text = tedge_event
- .data
- .and_then(|data| data.message)
- .unwrap_or_else(|| "generic event".into());
- let _ = self
- .http_proxy
- .send_event(tedge_event.name.as_str(), event_text.as_str(), None)
- .await?;
- Ok(vec![])
- }
+ let (event_text, event_time) = match tedge_event.data {
+ None => {
+ let message = tedge_event.name.clone();
+ let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+
+ (message, time)
+ }
+ Some(event_data) => {
+ let message = event_data
+ .message
+ .unwrap_or_else(|| tedge_event.name.clone());
+ let time = event_data.time.map_or_else(
+ || OffsetDateTime::now_utc().format(&Rfc3339),
+ |timestamp| timestamp.format(&Rfc3339),
+ )?;
+ (message, time)
+ }
+ };
+
+ let _ = self
+ .http_proxy
+ .send_event(
+ tedge_event.name.as_str(),
+ event_text.as_str(),
+ Some(event_time),
+ )
+ .await?;
+ Ok(vec![])
}
}
}
@@ -186,11 +204,11 @@ where
async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
- let () = self.size_threshold.validate(message.payload_str()?)?;
+ let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
topic if topic.name.starts_with("tedge/alarms") => {
- let () = self.size_threshold.validate(message.payload_str()?)?;
+ let () = self.size_threshold.validate(message)?;
self.alarm_converter.try_convert_alarm(message)
}
topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {