summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-07-22 14:31:08 +0530
committerGitHub <noreply@github.com>2022-07-22 14:31:08 +0530
commit7e62504db494d9a2523c3001b2ad6d5fa84ce548 (patch)
treeaa6f73d32f66339cc30ba2de07235c9921abb078 /crates/core
parentcf00c1d358f2c9ba67ca5af46fd82fb9f2cf37a6 (diff)
Extend events API to support child devices (#1243)
* Events for child devices Extend events APIs to send the event messages from external/child device to device twin in the cloud. Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/src/json_c8y.rs21
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs57
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs4
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs3
-rw-r--r--crates/core/thin_edge_json/src/event.rs51
5 files changed, 119 insertions, 17 deletions
diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs
index be857adb..d50559a1 100644
--- a/crates/core/c8y_api/src/json_c8y.rs
+++ b/crates/core/c8y_api/src/json_c8y.rs
@@ -135,7 +135,7 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent {
let event_type = event.name;
let text;
let time;
- let extras;
+ let mut extras;
match event.data {
None => {
text = event_type.clone();
@@ -148,6 +148,9 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent {
extras = event_data.extras;
}
}
+ if let Some(source) = event.source {
+ update_the_external_source_event(&mut extras, &source)?;
+ }
Ok(Self {
source: None,
@@ -191,6 +194,17 @@ fn combine_version_and_type(
},
}
}
+fn update_the_external_source_event(
+ extras: &mut HashMap<String, Value>,
+ source: &str,
+) -> Result<(), SMCumulocityMapperError> {
+ let mut value = serde_json::Map::new();
+ value.insert("externalId".to_string(), source.into());
+ value.insert("type".to_string(), "c8y_Serial".into());
+ extras.insert("externalSource".into(), value.into());
+
+ Ok(())
+}
#[cfg(test)]
mod tests {
@@ -353,6 +367,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -371,6 +386,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -389,6 +405,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -419,6 +436,7 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
@@ -437,6 +455,7 @@ mod tests {
let tedge_event = ThinEdgeEvent {
name: "empty_event".into(),
data: None,
+ source: None,
};
let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index fd9223b5..9912a10b 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -92,6 +92,7 @@ where
"tedge/alarms/+/+",
"c8y-internal/alarms/+/+",
"tedge/events/+",
+ "tedge/events/+/+",
]
.try_into()
.expect("topics that mapper should subscribe to");
@@ -143,6 +144,7 @@ where
"tedge/alarms/+/+",
"c8y-internal/alarms/+/+",
"tedge/events/+",
+ "tedge/events/+/+",
]
.try_into()
.expect("topics that mapper should subscribe to");
@@ -185,7 +187,7 @@ where
) -> Result<Vec<Message>, ConversionError> {
let mut vec: Vec<Message> = Vec::new();
- let maybe_child_id = get_child_id_from_topic(&input.topic.name)?;
+ let maybe_child_id = get_child_id_from_measurement_topic(&input.topic.name)?;
let c8y_json_payload = match maybe_child_id {
Some(child_id) => {
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
@@ -224,31 +226,41 @@ where
&mut self,
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
- let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let mut messages = Vec::new();
+
+ let tedge_event = ThinEdgeEvent::try_from(&input.topic.name, input.payload_str()?)?;
+ let child_id = tedge_event.source.clone();
+
+ let need_registration = self.register_external_device(&tedge_event, &mut messages);
+
let c8y_event = C8yCreateEvent::try_from(tedge_event)?;
// If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST
let message = if c8y_event.extras.is_empty() {
let smartrest_event = Self::serialize_to_smartrest(&c8y_event)?;
let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
-
Message::new(&smartrest_topic, smartrest_event)
} else {
// If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON
let cumulocity_event_json = serde_json::to_string(&c8y_event)?;
let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC);
-
Message::new(&json_mqtt_topic, cumulocity_event_json)
};
- // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- if input.payload_bytes().len() < self.size_threshold.0 {
- Ok(vec![message])
- } else {
- // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ if self.can_send_over_mqtt(&message) {
+ // The message can be sent via MQTT
+ messages.push(message);
+ } else if !need_registration {
+ // The message must be sent over HTTP
let _ = self.http_proxy.send_event(c8y_event).await?;
- Ok(vec![])
+ return Ok(vec![]);
+ } else {
+ // The message should be sent over HTTP but this cannot be done
+ return Err(ConversionError::ChildDeviceNotRegistered {
+ id: child_id.unwrap_or_else(|| "".into()),
+ });
}
+ Ok(messages)
}
fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> Result<String, ConversionError> {
@@ -260,6 +272,29 @@ where
c8y_event.time.format(&Rfc3339)?
))
}
+
+ fn register_external_device(
+ &mut self,
+ tedge_event: &ThinEdgeEvent,
+ messages: &mut Vec<Message>,
+ ) -> bool {
+ if let Some(c_id) = tedge_event.source.clone() {
+ // Create the external source if it does not exists
+ if !self.children.contains(&c_id) {
+ self.children.insert(c_id.clone());
+ messages.push(Message::new(
+ &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
+ format!("101,{c_id},{c_id},thin-edge.io-child"),
+ ));
+ return true;
+ }
+ }
+ false
+ }
+
+ fn can_send_over_mqtt(&self, message: &Message) -> bool {
+ message.payload_bytes().len() < self.size_threshold.0
+ }
}
#[async_trait]
@@ -825,7 +860,7 @@ fn get_inventory_fragments(file_path: &str) -> Result<serde_json::Value, Convers
}
}
-pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> {
+pub fn get_child_id_from_measurement_topic(topic: &str) -> Result<Option<String>, ConversionError> {
match topic.strip_prefix("tedge/measurements/").map(String::from) {
Some(maybe_id) if maybe_id.is_empty() => {
Err(ConversionError::InvalidChildId { id: maybe_id })
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 78eee84c..03125102 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -23,7 +23,7 @@ use tedge_test_utils::fs::TempTedgeDir;
use test_case::test_case;
use tokio::task::JoinHandle;
-use super::converter::{get_child_id_from_topic, CumulocityConverter};
+use super::converter::{get_child_id_from_measurement_topic, CumulocityConverter};
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
const MQTT_HOST: &str = "127.0.0.1";
@@ -640,7 +640,7 @@ async fn convert_two_thin_edge_json_messages_given_different_child_id() {
#[test_case("tedge/measurements", None; "invalid child id (parent topic)")]
#[test_case("foo/bar", None; "invalid child id (invalid topic)")]
fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
- match get_child_id_from_topic(in_topic) {
+ match get_child_id_from_measurement_topic(in_topic) {
Ok(maybe_id) => assert_eq!(maybe_id, expected_child_id),
Err(crate::core::error::ConversionError::InvalidChildId { id }) => {
assert_eq!(id, "".to_string())
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index b703161a..2d2e0163 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -104,4 +104,7 @@ pub enum ConversionError {
#[error(transparent)]
FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError),
+
+ #[error("The given Child ID '{id}' is not registered with Cumulocity. To send the events to the child device, it has to be registered first.")]
+ ChildDeviceNotRegistered { id: String },
}
diff --git a/crates/core/thin_edge_json/src/event.rs b/crates/core/thin_edge_json/src/event.rs
index 42b1ed35..8e07625f 100644
--- a/crates/core/thin_edge_json/src/event.rs
+++ b/crates/core/thin_edge_json/src/event.rs
@@ -13,6 +13,7 @@ pub struct ThinEdgeEvent {
pub name: String,
#[serde(flatten)]
pub data: Option<ThinEdgeEventData>,
+ pub source: Option<String>,
}
/// In-memory representation of ThinEdge JSON event payload
@@ -29,6 +30,7 @@ pub struct ThinEdgeEventData {
}
pub mod error {
+
#[derive(thiserror::Error, Debug)]
pub enum ThinEdgeJsonDeserializerError {
#[error("Unsupported topic: {0}")]
@@ -48,7 +50,7 @@ impl ThinEdgeEvent {
mqtt_payload: &str,
) -> Result<Self, ThinEdgeJsonDeserializerError> {
let topic_split: Vec<&str> = mqtt_topic.split('/').collect();
- if topic_split.len() == 3 {
+ if topic_split.len() == 3 || topic_split.len() == 4 {
let event_name = topic_split[2];
if event_name.is_empty() {
return Err(ThinEdgeJsonDeserializerError::EmptyEventName);
@@ -60,9 +62,17 @@ impl ThinEdgeEvent {
Some(serde_json::from_str(mqtt_payload)?)
};
+ // The 4th part of the topic name is the event source - if any
+ let external_source = if topic_split.len() == 4 {
+ Some(topic_split[3].to_string())
+ } else {
+ None
+ };
+
Ok(Self {
name: event_name.into(),
data: event_data,
+ source: external_source,
})
} else {
Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(
@@ -94,6 +104,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing"
)]
@@ -109,6 +120,7 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without timestamp"
)]
@@ -124,6 +136,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without text"
)]
@@ -137,9 +150,41 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without text or timestamp"
)]
+ #[test_case(
+ "tedge/events/click_event/external_source",
+ json!({
+ "text": "Someone clicked",
+ "time": "2021-04-23T19:00:00+05:00",
+ }),
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: Some("Someone clicked".into()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ }),
+ source: Some("external_source".into()),
+ };
+ "event parsing with external source"
+ )]
+ #[test_case(
+ "tedge/events/click_event/external_source",
+ json!({}),
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: None,
+ time: None,
+ extras: HashMap::new(),
+ }),
+ source: Some("external_source".into()),
+ };
+ "event parsing empty payload with external source"
+ )]
fn parse_thin_edge_event_json(
event_topic: &str,
event_payload: Value,
@@ -159,8 +204,8 @@ mod tests {
}
#[test]
- fn event_translation_more_than_three_topic_levels() {
- let result = ThinEdgeEvent::try_from("tedge/events/page/click", "{}");
+ fn event_translation_more_than_four_topic_levels() {
+ let result = ThinEdgeEvent::try_from("tedge/events/page/click/click", "{}");
assert_matches!(
result,