summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/converter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/converter.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs56
1 files changed, 48 insertions, 8 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 899a763d..1616acc8 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -9,7 +9,7 @@ use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- event::serialize_event,
+ event::{self},
operations::Operations,
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
@@ -29,6 +29,7 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -144,12 +145,49 @@ where
Ok(vec)
}
- fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
+ async fn try_convert_event(
+ &mut self,
+ input: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- let smartrest_alarm = serialize_event(tedge_event)?;
- let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+
+ Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ } else {
+ // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ let (event_text, event_time) = match tedge_event.data {
+ None => {
+ let message = tedge_event.name.clone();
+ let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+
+ (message, time)
+ }
+ Some(event_data) => {
+ let message = event_data
+ .message
+ .unwrap_or_else(|| tedge_event.name.clone());
+ let time = event_data.time.map_or_else(
+ || OffsetDateTime::now_utc().format(&Rfc3339),
+ |timestamp| timestamp.format(&Rfc3339),
+ )?;
+ (message, time)
+ }
+ };
+
+ let _ = self
+ .http_proxy
+ .send_event(
+ tedge_event.name.as_str(),
+ event_text.as_str(),
+ Some(event_time),
+ )
+ .await?;
+ Ok(vec![])
+ }
}
}
@@ -164,20 +202,22 @@ where
&self.mapper_config
}
async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
- let () = self.size_threshold.validate(message.payload_str()?)?;
-
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
+ let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
topic if topic.name.starts_with("tedge/alarms") => {
+ let () = self.size_threshold.validate(message)?;
self.alarm_converter.try_convert_alarm(message)
}
topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
self.alarm_converter.process_internal_alarm(message);
Ok(vec![])
}
- topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message),
+ topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => {
+ self.try_convert_event(message).await
+ }
topic => match topic.clone().try_into() {
Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => {
debug!("Software list");