diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-07-22 14:31:08 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-22 14:31:08 +0530 |
commit | 7e62504db494d9a2523c3001b2ad6d5fa84ce548 (patch) | |
tree | aa6f73d32f66339cc30ba2de07235c9921abb078 /crates/core | |
parent | cf00c1d358f2c9ba67ca5af46fd82fb9f2cf37a6 (diff) |
Extend events API to support child devices (#1243)
* Events for child devices
Extend events APIs to send the event messages from external/child device to device twin in the cloud.
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
Diffstat (limited to 'crates/core')
-rw-r--r-- | crates/core/c8y_api/src/json_c8y.rs | 21 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs | 57 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/error.rs | 3 | ||||
-rw-r--r-- | crates/core/thin_edge_json/src/event.rs | 51 |
5 files changed, 119 insertions, 17 deletions
diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index be857adb..d50559a1 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -135,7 +135,7 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent { let event_type = event.name; let text; let time; - let extras; + let mut extras; match event.data { None => { text = event_type.clone(); @@ -148,6 +148,9 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent { extras = event_data.extras; } } + if let Some(source) = event.source { + update_the_external_source_event(&mut extras, &source)?; + } Ok(Self { source: None, @@ -191,6 +194,17 @@ fn combine_version_and_type( }, } } +fn update_the_external_source_event( + extras: &mut HashMap<String, Value>, + source: &str, +) -> Result<(), SMCumulocityMapperError> { + let mut value = serde_json::Map::new(); + value.insert("externalId".to_string(), source.into()); + value.insert("type".to_string(), "c8y_Serial".into()); + extras.insert("externalSource".into(), value.into()); + + Ok(()) +} #[cfg(test)] mod tests { @@ -353,6 +367,7 @@ mod tests { time: Some(datetime!(2021-04-23 19:00:00 +05:00)), extras: HashMap::new(), }), + source: None, }, C8yCreateEvent { source: None, @@ -371,6 +386,7 @@ mod tests { time: Some(datetime!(2021-04-23 19:00:00 +05:00)), extras: HashMap::new(), }), + source: None, }, C8yCreateEvent { source: None, @@ -389,6 +405,7 @@ mod tests { time: Some(datetime!(2021-04-23 19:00:00 +05:00)), extras: HashMap::new(), }), + source: None, }, C8yCreateEvent { source: None, @@ -419,6 +436,7 @@ mod tests { time: None, extras: HashMap::new(), }), + source: None, }; let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?; @@ -437,6 +455,7 @@ mod tests { let tedge_event = ThinEdgeEvent { name: "empty_event".into(), data: None, + source: None, }; let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?; diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index fd9223b5..9912a10b 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -92,6 +92,7 @@ where "tedge/alarms/+/+", "c8y-internal/alarms/+/+", "tedge/events/+", + "tedge/events/+/+", ] .try_into() .expect("topics that mapper should subscribe to"); @@ -143,6 +144,7 @@ where "tedge/alarms/+/+", "c8y-internal/alarms/+/+", "tedge/events/+", + "tedge/events/+/+", ] .try_into() .expect("topics that mapper should subscribe to"); @@ -185,7 +187,7 @@ where ) -> Result<Vec<Message>, ConversionError> { let mut vec: Vec<Message> = Vec::new(); - let maybe_child_id = get_child_id_from_topic(&input.topic.name)?; + let maybe_child_id = get_child_id_from_measurement_topic(&input.topic.name)?; let c8y_json_payload = match maybe_child_id { Some(child_id) => { // Need to check if the input Thin Edge JSON is valid before adding a child ID to list @@ -224,31 +226,41 @@ where &mut self, input: &Message, ) -> Result<Vec<Message>, ConversionError> { - let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; + let mut messages = Vec::new(); + + let tedge_event = ThinEdgeEvent::try_from(&input.topic.name, input.payload_str()?)?; + let child_id = tedge_event.source.clone(); + + let need_registration = self.register_external_device(&tedge_event, &mut messages); + let c8y_event = C8yCreateEvent::try_from(tedge_event)?; // If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST let message = if c8y_event.extras.is_empty() { let smartrest_event = Self::serialize_to_smartrest(&c8y_event)?; let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Message::new(&smartrest_topic, smartrest_event) } else { // If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON let cumulocity_event_json = serde_json::to_string(&c8y_event)?; let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC); - Message::new(&json_mqtt_topic, cumulocity_event_json) }; - // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well - if input.payload_bytes().len() < self.size_threshold.0 { - Ok(vec![message]) - } else { - // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event + if self.can_send_over_mqtt(&message) { + // The message can be sent via MQTT + messages.push(message); + } else if !need_registration { + // The message must be sent over HTTP let _ = self.http_proxy.send_event(c8y_event).await?; - Ok(vec![]) + return Ok(vec![]); + } else { + // The message should be sent over HTTP but this cannot be done + return Err(ConversionError::ChildDeviceNotRegistered { + id: child_id.unwrap_or_else(|| "".into()), + }); } + Ok(messages) } fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> Result<String, ConversionError> { @@ -260,6 +272,29 @@ where c8y_event.time.format(&Rfc3339)? )) } + + fn register_external_device( + &mut self, + tedge_event: &ThinEdgeEvent, + messages: &mut Vec<Message>, + ) -> bool { + if let Some(c_id) = tedge_event.source.clone() { + // Create the external source if it does not exists + if !self.children.contains(&c_id) { + self.children.insert(c_id.clone()); + messages.push(Message::new( + &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC), + format!("101,{c_id},{c_id},thin-edge.io-child"), + )); + return true; + } + } + false + } + + fn can_send_over_mqtt(&self, message: &Message) -> bool { + message.payload_bytes().len() < self.size_threshold.0 + } } #[async_trait] @@ -825,7 +860,7 @@ fn get_inventory_fragments(file_path: &str) -> Result<serde_json::Value, Convers } } -pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> { +pub fn get_child_id_from_measurement_topic(topic: &str) -> Result<Option<String>, ConversionError> { match topic.strip_prefix("tedge/measurements/").map(String::from) { Some(maybe_id) if maybe_id.is_empty() => { Err(ConversionError::InvalidChildId { id: maybe_id }) diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 78eee84c..03125102 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -23,7 +23,7 @@ use tedge_test_utils::fs::TempTedgeDir; use test_case::test_case; use tokio::task::JoinHandle; -use super::converter::{get_child_id_from_topic, CumulocityConverter}; +use super::converter::{get_child_id_from_measurement_topic, CumulocityConverter}; const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); const MQTT_HOST: &str = "127.0.0.1"; @@ -640,7 +640,7 @@ async fn convert_two_thin_edge_json_messages_given_different_child_id() { #[test_case("tedge/measurements", None; "invalid child id (parent topic)")] #[test_case("foo/bar", None; "invalid child id (invalid topic)")] fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) { - match get_child_id_from_topic(in_topic) { + match get_child_id_from_measurement_topic(in_topic) { Ok(maybe_id) => assert_eq!(maybe_id, expected_child_id), Err(crate::core::error::ConversionError::InvalidChildId { id }) => { assert_eq!(id, "".to_string()) diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index b703161a..2d2e0163 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -104,4 +104,7 @@ pub enum ConversionError { #[error(transparent)] FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError), + + #[error("The given Child ID '{id}' is not registered with Cumulocity. To send the events to the child device, it has to be registered first.")] + ChildDeviceNotRegistered { id: String }, } diff --git a/crates/core/thin_edge_json/src/event.rs b/crates/core/thin_edge_json/src/event.rs index 42b1ed35..8e07625f 100644 --- a/crates/core/thin_edge_json/src/event.rs +++ b/crates/core/thin_edge_json/src/event.rs @@ -13,6 +13,7 @@ pub struct ThinEdgeEvent { pub name: String, #[serde(flatten)] pub data: Option<ThinEdgeEventData>, + pub source: Option<String>, } /// In-memory representation of ThinEdge JSON event payload @@ -29,6 +30,7 @@ pub struct ThinEdgeEventData { } pub mod error { + #[derive(thiserror::Error, Debug)] pub enum ThinEdgeJsonDeserializerError { #[error("Unsupported topic: {0}")] @@ -48,7 +50,7 @@ impl ThinEdgeEvent { mqtt_payload: &str, ) -> Result<Self, ThinEdgeJsonDeserializerError> { let topic_split: Vec<&str> = mqtt_topic.split('/').collect(); - if topic_split.len() == 3 { + if topic_split.len() == 3 || topic_split.len() == 4 { let event_name = topic_split[2]; if event_name.is_empty() { return Err(ThinEdgeJsonDeserializerError::EmptyEventName); @@ -60,9 +62,17 @@ impl ThinEdgeEvent { Some(serde_json::from_str(mqtt_payload)?) }; + // The 4th part of the topic name is the event source - if any + let external_source = if topic_split.len() == 4 { + Some(topic_split[3].to_string()) + } else { + None + }; + Ok(Self { name: event_name.into(), data: event_data, + source: external_source, }) } else { Err(ThinEdgeJsonDeserializerError::UnsupportedTopic( @@ -94,6 +104,7 @@ mod tests { time: Some(datetime!(2021-04-23 19:00:00 +05:00)), extras: HashMap::new(), }), + source: None, }; "event parsing" )] @@ -109,6 +120,7 @@ mod tests { time: None, extras: HashMap::new(), }), + source: None, }; "event parsing without timestamp" )] @@ -124,6 +136,7 @@ mod tests { time: Some(datetime!(2021-04-23 19:00:00 +05:00)), extras: HashMap::new(), }), + source: None, }; "event parsing without text" )] @@ -137,9 +150,41 @@ mod tests { time: None, extras: HashMap::new(), }), + source: None, }; "event parsing without text or timestamp" )] + #[test_case( + "tedge/events/click_event/external_source", + json!({ + "text": "Someone clicked", + "time": "2021-04-23T19:00:00+05:00", + }), + ThinEdgeEvent { + name: "click_event".into(), + data: Some(ThinEdgeEventData { + text: Some("Someone clicked".into()), + time: Some(datetime!(2021-04-23 19:00:00 +05:00)), + extras: HashMap::new(), + }), + source: Some("external_source".into()), + }; + "event parsing with external source" + )] + #[test_case( + "tedge/events/click_event/external_source", + json!({}), + ThinEdgeEvent { + name: "click_event".into(), + data: Some(ThinEdgeEventData { + text: None, + time: None, + extras: HashMap::new(), + }), + source: Some("external_source".into()), + }; + "event parsing empty payload with external source" + )] fn parse_thin_edge_event_json( event_topic: &str, event_payload: Value, @@ -159,8 +204,8 @@ mod tests { } #[test] - fn event_translation_more_than_three_topic_levels() { - let result = ThinEdgeEvent::try_from("tedge/events/page/click", "{}"); + fn event_translation_more_than_four_topic_levels() { + let result = ThinEdgeEvent::try_from("tedge/events/page/click/click", "{}"); assert_matches!( result, |