summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--crates/core/c8y_api/src/json_c8y.rs21
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs57
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs4
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs3
-rw-r--r--crates/core/thin_edge_json/src/event.rs51
-rw-r--r--docs/src/tutorials/send-events.md63
6 files changed, 182 insertions, 17 deletions
diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs
index be857adb..d50559a1 100644
--- a/crates/core/c8y_api/src/json_c8y.rs
+++ b/crates/core/c8y_api/src/json_c8y.rs
@@ -135,7 +135,7 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent {
let event_type = event.name;
let text;
let time;
- let extras;
+ let mut extras;
match event.data {
None => {
text = event_type.clone();
@@ -148,6 +148,9 @@ impl TryFrom<ThinEdgeEvent> for C8yCreateEvent {
extras = event_data.extras;
}
}
+ if let Some(source) = event.source {
+ update_the_external_source_event(&mut extras, &source)?;
+ }
Ok(Self {
source: None,
@@ -191,6 +194,17 @@ fn combine_version_and_type(
},
}
}
+fn update_the_external_source_event(
+ extras: &mut HashMap<String, Value>,
+ source: &str,
+) -> Result<(), SMCumulocityMapperError> {
+ let mut value = serde_json::Map::new();
+ value.insert("externalId".to_string(), source.into());
+ value.insert("type".to_string(), "c8y_Serial".into());
+ extras.insert("externalSource".into(), value.into());
+
+ Ok(())
+}
#[cfg(test)]
mod tests {
@@ -353,6 +367,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -371,6 +386,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -389,6 +405,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
},
C8yCreateEvent {
source: None,
@@ -419,6 +436,7 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
@@ -437,6 +455,7 @@ mod tests {
let tedge_event = ThinEdgeEvent {
name: "empty_event".into(),
data: None,
+ source: None,
};
let actual_c8y_event = C8yCreateEvent::try_from(tedge_event)?;
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index fd9223b5..9912a10b 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -92,6 +92,7 @@ where
"tedge/alarms/+/+",
"c8y-internal/alarms/+/+",
"tedge/events/+",
+ "tedge/events/+/+",
]
.try_into()
.expect("topics that mapper should subscribe to");
@@ -143,6 +144,7 @@ where
"tedge/alarms/+/+",
"c8y-internal/alarms/+/+",
"tedge/events/+",
+ "tedge/events/+/+",
]
.try_into()
.expect("topics that mapper should subscribe to");
@@ -185,7 +187,7 @@ where
) -> Result<Vec<Message>, ConversionError> {
let mut vec: Vec<Message> = Vec::new();
- let maybe_child_id = get_child_id_from_topic(&input.topic.name)?;
+ let maybe_child_id = get_child_id_from_measurement_topic(&input.topic.name)?;
let c8y_json_payload = match maybe_child_id {
Some(child_id) => {
// Need to check if the input Thin Edge JSON is valid before adding a child ID to list
@@ -224,31 +226,41 @@ where
&mut self,
input: &Message,
) -> Result<Vec<Message>, ConversionError> {
- let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
+ let mut messages = Vec::new();
+
+ let tedge_event = ThinEdgeEvent::try_from(&input.topic.name, input.payload_str()?)?;
+ let child_id = tedge_event.source.clone();
+
+ let need_registration = self.register_external_device(&tedge_event, &mut messages);
+
let c8y_event = C8yCreateEvent::try_from(tedge_event)?;
// If the message doesn't contain any fields other than `text` and `time`, convert to SmartREST
let message = if c8y_event.extras.is_empty() {
let smartrest_event = Self::serialize_to_smartrest(&c8y_event)?;
let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
-
Message::new(&smartrest_topic, smartrest_event)
} else {
// If the message contains extra fields other than `text` and `time`, convert to Cumulocity JSON
let cumulocity_event_json = serde_json::to_string(&c8y_event)?;
let json_mqtt_topic = Topic::new_unchecked(C8Y_JSON_MQTT_EVENTS_TOPIC);
-
Message::new(&json_mqtt_topic, cumulocity_event_json)
};
- // If the MQTT message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
- if input.payload_bytes().len() < self.size_threshold.0 {
- Ok(vec![message])
- } else {
- // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ if self.can_send_over_mqtt(&message) {
+ // The message can be sent via MQTT
+ messages.push(message);
+ } else if !need_registration {
+ // The message must be sent over HTTP
let _ = self.http_proxy.send_event(c8y_event).await?;
- Ok(vec![])
+ return Ok(vec![]);
+ } else {
+ // The message should be sent over HTTP but this cannot be done
+ return Err(ConversionError::ChildDeviceNotRegistered {
+ id: child_id.unwrap_or_else(|| "".into()),
+ });
}
+ Ok(messages)
}
fn serialize_to_smartrest(c8y_event: &C8yCreateEvent) -> Result<String, ConversionError> {
@@ -260,6 +272,29 @@ where
c8y_event.time.format(&Rfc3339)?
))
}
+
+ fn register_external_device(
+ &mut self,
+ tedge_event: &ThinEdgeEvent,
+ messages: &mut Vec<Message>,
+ ) -> bool {
+ if let Some(c_id) = tedge_event.source.clone() {
+ // Create the external source if it does not exists
+ if !self.children.contains(&c_id) {
+ self.children.insert(c_id.clone());
+ messages.push(Message::new(
+ &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
+ format!("101,{c_id},{c_id},thin-edge.io-child"),
+ ));
+ return true;
+ }
+ }
+ false
+ }
+
+ fn can_send_over_mqtt(&self, message: &Message) -> bool {
+ message.payload_bytes().len() < self.size_threshold.0
+ }
}
#[async_trait]
@@ -825,7 +860,7 @@ fn get_inventory_fragments(file_path: &str) -> Result<serde_json::Value, Convers
}
}
-pub fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> {
+pub fn get_child_id_from_measurement_topic(topic: &str) -> Result<Option<String>, ConversionError> {
match topic.strip_prefix("tedge/measurements/").map(String::from) {
Some(maybe_id) if maybe_id.is_empty() => {
Err(ConversionError::InvalidChildId { id: maybe_id })
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 78eee84c..03125102 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -23,7 +23,7 @@ use tedge_test_utils::fs::TempTedgeDir;
use test_case::test_case;
use tokio::task::JoinHandle;
-use super::converter::{get_child_id_from_topic, CumulocityConverter};
+use super::converter::{get_child_id_from_measurement_topic, CumulocityConverter};
const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
const MQTT_HOST: &str = "127.0.0.1";
@@ -640,7 +640,7 @@ async fn convert_two_thin_edge_json_messages_given_different_child_id() {
#[test_case("tedge/measurements", None; "invalid child id (parent topic)")]
#[test_case("foo/bar", None; "invalid child id (invalid topic)")]
fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
- match get_child_id_from_topic(in_topic) {
+ match get_child_id_from_measurement_topic(in_topic) {
Ok(maybe_id) => assert_eq!(maybe_id, expected_child_id),
Err(crate::core::error::ConversionError::InvalidChildId { id }) => {
assert_eq!(id, "".to_string())
diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs
index b703161a..2d2e0163 100644
--- a/crates/core/tedge_mapper/src/core/error.rs
+++ b/crates/core/tedge_mapper/src/core/error.rs
@@ -104,4 +104,7 @@ pub enum ConversionError {
#[error(transparent)]
FromOperationLogsError(#[from] plugin_sm::operation_logs::OperationLogsError),
+
+ #[error("The given Child ID '{id}' is not registered with Cumulocity. To send the events to the child device, it has to be registered first.")]
+ ChildDeviceNotRegistered { id: String },
}
diff --git a/crates/core/thin_edge_json/src/event.rs b/crates/core/thin_edge_json/src/event.rs
index 42b1ed35..8e07625f 100644
--- a/crates/core/thin_edge_json/src/event.rs
+++ b/crates/core/thin_edge_json/src/event.rs
@@ -13,6 +13,7 @@ pub struct ThinEdgeEvent {
pub name: String,
#[serde(flatten)]
pub data: Option<ThinEdgeEventData>,
+ pub source: Option<String>,
}
/// In-memory representation of ThinEdge JSON event payload
@@ -29,6 +30,7 @@ pub struct ThinEdgeEventData {
}
pub mod error {
+
#[derive(thiserror::Error, Debug)]
pub enum ThinEdgeJsonDeserializerError {
#[error("Unsupported topic: {0}")]
@@ -48,7 +50,7 @@ impl ThinEdgeEvent {
mqtt_payload: &str,
) -> Result<Self, ThinEdgeJsonDeserializerError> {
let topic_split: Vec<&str> = mqtt_topic.split('/').collect();
- if topic_split.len() == 3 {
+ if topic_split.len() == 3 || topic_split.len() == 4 {
let event_name = topic_split[2];
if event_name.is_empty() {
return Err(ThinEdgeJsonDeserializerError::EmptyEventName);
@@ -60,9 +62,17 @@ impl ThinEdgeEvent {
Some(serde_json::from_str(mqtt_payload)?)
};
+ // The 4th part of the topic name is the event source - if any
+ let external_source = if topic_split.len() == 4 {
+ Some(topic_split[3].to_string())
+ } else {
+ None
+ };
+
Ok(Self {
name: event_name.into(),
data: event_data,
+ source: external_source,
})
} else {
Err(ThinEdgeJsonDeserializerError::UnsupportedTopic(
@@ -94,6 +104,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing"
)]
@@ -109,6 +120,7 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without timestamp"
)]
@@ -124,6 +136,7 @@ mod tests {
time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without text"
)]
@@ -137,9 +150,41 @@ mod tests {
time: None,
extras: HashMap::new(),
}),
+ source: None,
};
"event parsing without text or timestamp"
)]
+ #[test_case(
+ "tedge/events/click_event/external_source",
+ json!({
+ "text": "Someone clicked",
+ "time": "2021-04-23T19:00:00+05:00",
+ }),
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: Some("Someone clicked".into()),
+ time: Some(datetime!(2021-04-23 19:00:00 +05:00)),
+ extras: HashMap::new(),
+ }),
+ source: Some("external_source".into()),
+ };
+ "event parsing with external source"
+ )]
+ #[test_case(
+ "tedge/events/click_event/external_source",
+ json!({}),
+ ThinEdgeEvent {
+ name: "click_event".into(),
+ data: Some(ThinEdgeEventData {
+ text: None,
+ time: None,
+ extras: HashMap::new(),
+ }),
+ source: Some("external_source".into()),
+ };
+ "event parsing empty payload with external source"
+ )]
fn parse_thin_edge_event_json(
event_topic: &str,
event_payload: Value,
@@ -159,8 +204,8 @@ mod tests {
}
#[test]
- fn event_translation_more_than_three_topic_levels() {
- let result = ThinEdgeEvent::try_from("tedge/events/page/click", "{}");
+ fn event_translation_more_than_four_topic_levels() {
+ let result = ThinEdgeEvent::try_from("tedge/events/page/click/click", "{}");
assert_matches!(
result,
diff --git a/docs/src/tutorials/send-events.md b/docs/src/tutorials/send-events.md
index 31306d64..5a658539 100644
--- a/docs/src/tutorials/send-events.md
+++ b/docs/src/tutorials/send-events.md
@@ -85,3 +85,66 @@ The Cumulocity JSON mapping of the same event would be as follows:
> Note: Mapped events will be sent to Cumulocity via MQTT if the incoming Thin Edge JSON event payload size is less than 16K bytes. If higher, HTTP will be used.
Find more information about events data model in Cumulocity [here](https://cumulocity.com/guides/concepts/domain-model/#events).
+
+## Sending an event for a child/external device to the cloud
+
+An event for a child/external device can be triggered on thin-edge.io by sending an MQTT message in Thin Edge JSON format to certain MQTT topics.
+
+The scheme of the topic to publish the event data is as follows:
+
+`tedge/events/<event-type>/<child-device-id>`
+
+The payload format must be as follows:
+
+```json
+{
+ "type":"<event type>",
+ "text": "<event text>",
+ "time": "<Timestamp in ISO-8601 format>"
+}
+```
+
+Here is a sample event triggered for a `login_event` event type for the `external_sensor` child device:
+
+Command to send the event from a external device as below:
+
+```shell
+$ sudo tedge mqtt pub tedge/events/login_event/external_sensor '{
+ "type":"login_event",
+ "text":"A user just logged in",
+ "time":"2021-01-01T05:30:45+00:00"
+}'
+```
+
+Payload:
+
+```json
+{
+ "type": "login_event",
+ "text": "A user just logged in",
+ "time": "2021-01-01T05:30:45+00:00"
+}
+```
+### Mapping of events to cloud-specific data format
+
+If the child/external device is connected to some supported IoT cloud platform, an event that is triggered locally on thin-edge.io will be forwarded to the connected cloud platform as well.
+The mapping of thin-edge events data to its respective cloud-native representation will be done by the corresponding cloud mapper process.
+
+#### Cumulocity cloud data mapping
+
+The Cumulocity mapper will convert Thin Edge JSON events into its Cumulocity JSON equivalent and sends them to the Cumulocity cloud.
+
+The translated payload will be in the below format.
+
+```json
+{
+ "type": "login_event",
+ "text": "A user just logged in",
+ "time": "2021-01-01T05:30:45+00:00",
+ "externalSource":{
+ "externalId": "external_sensor",
+ "type": "c8y_Serial"
+ }
+}
+```
+Here the `externalId` will be derived from the `child-device-id` of the `child device event topic`.