diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/converter.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 56 |
1 files changed, 48 insertions, 8 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 899a763d..1616acc8 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -9,7 +9,7 @@ use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse} use c8y_smartrest::{ alarm, error::SmartRestDeserializerError, - event::serialize_event, + event::{self}, operations::Operations, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, smartrest_serializer::{ @@ -29,6 +29,7 @@ use std::{ process::Stdio, }; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tracing::{debug, info, log::error}; use super::{ @@ -144,12 +145,49 @@ where Ok(vec) } - fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> { + async fn try_convert_event( + &mut self, + input: &Message, + ) -> Result<Vec<Message>, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; - let smartrest_alarm = serialize_event(tedge_event)?; - let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well + if input.payload_bytes().len() < self.size_threshold.0 { + let smartrest_alarm = event::serialize_event(tedge_event)?; + let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); + + Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + } else { + // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event + let (event_text, event_time) = match tedge_event.data { + None => { + let message = tedge_event.name.clone(); + let time = OffsetDateTime::now_utc().format(&Rfc3339)?; + + (message, time) + } + Some(event_data) => { + let message = event_data + .message + .unwrap_or_else(|| tedge_event.name.clone()); + let time = event_data.time.map_or_else( + || OffsetDateTime::now_utc().format(&Rfc3339), + |timestamp| timestamp.format(&Rfc3339), + )?; + (message, time) + } + }; + + let _ = self + .http_proxy + .send_event( + tedge_event.name.as_str(), + event_text.as_str(), + Some(event_time), + ) + .await?; + Ok(vec![]) + } } } @@ -164,20 +202,22 @@ where &self.mapper_config } async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> { - let () = self.size_threshold.validate(message.payload_str()?)?; - match &message.topic { topic if topic.name.starts_with("tedge/measurements") => { + let () = self.size_threshold.validate(message)?; self.try_convert_measurement(message) } topic if topic.name.starts_with("tedge/alarms") => { + let () = self.size_threshold.validate(message)?; self.alarm_converter.try_convert_alarm(message) } topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { self.alarm_converter.process_internal_alarm(message); Ok(vec![]) } - topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message), + topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => { + self.try_convert_event(message).await + } topic => match topic.clone().try_into() { Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => { debug!("Software list"); |