diff options
Diffstat (limited to 'crates/core/tedge_mapper/src')
22 files changed, 3375 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/src/az_converter.rs b/crates/core/tedge_mapper/src/az_converter.rs new file mode 100644 index 00000000..ad4eada8 --- /dev/null +++ b/crates/core/tedge_mapper/src/az_converter.rs @@ -0,0 +1,201 @@ +use crate::converter::*; +use crate::error::*; +use crate::size_threshold::SizeThreshold; +use clock::Clock; +use mqtt_client::Message; +use thin_edge_json::serialize::ThinEdgeJsonSerializer; + +pub struct AzureConverter { + pub(crate) add_timestamp: bool, + pub(crate) clock: Box<dyn Clock>, + pub(crate) size_threshold: SizeThreshold, + pub(crate) mapper_config: MapperConfig, +} + +impl AzureConverter { + pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self { + let mapper_config = MapperConfig { + in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"), + out_topic: make_valid_topic_or_panic("az/messages/events/"), + errors_topic: make_valid_topic_or_panic("tedge/errors"), + }; + AzureConverter { + add_timestamp, + clock, + size_threshold, + mapper_config, + } + } +} + +impl Converter for AzureConverter { + type Error = ConversionError; + + fn get_mapper_config(&self) -> &MapperConfig { + &self.mapper_config + } + + fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> { + let input = input.payload_str()?; + let () = self.size_threshold.validate(input)?; + let default_timestamp = self.add_timestamp.then(|| self.clock.now()); + let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp); + let () = thin_edge_json::parser::parse_str(input, &mut serializer)?; + + let payload = serializer.into_string()?; + Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::size_threshold::SizeThresholdExceeded; + use assert_json_diff::*; + use assert_matches::*; + use chrono::{FixedOffset, TimeZone}; + use mqtt_client::Topic; + use serde_json::json; + + struct TestClock; + + impl Clock for TestClock { + fn now(&self) -> clock::Timestamp { + FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0) + } + } + + #[test] + fn converting_invalid_json_is_invalid() { + let mut converter = + AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); + + let input = "This is not Thin Edge JSON"; + let result = converter.try_convert(&new_tedge_message(input)); + + assert_matches!(result, Err(ConversionError::FromThinEdgeJsonParser(_))) + } + + fn new_tedge_message(input: &str) -> Message { + Message::new(&Topic::new_unchecked("tedge/measurements"), input) + } + + fn extract_first_message_payload(mut messages: Vec<Message>) -> String { + messages.pop().unwrap().payload_str().unwrap().to_string() + } + + #[test] + fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false( + ) { + let mut converter = + AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); + + let input = r#"{ + "temperature": 23.0 + }"#; + + let expected_output = json!({ + "temperature": 23.0 + }); + + let output = converter.convert(&new_tedge_message(input)); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) + .unwrap(), + expected_output + ); + } + + #[test] + fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false() + { + let mut converter = + AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); + + let input = r#"{ + "time" : "2013-06-22T17:03:14.000+02:00", + "temperature": 23.0 + }"#; + + let expected_output = json!({ + "time" : "2013-06-22T17:03:14+02:00", + "temperature": 23.0 + }); + + let output = converter.convert(&new_tedge_message(input)); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) + .unwrap(), + expected_output + ); + } + + #[test] + fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true() + { + let mut converter = + AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024)); + + let input = r#"{ + "time" : "2013-06-22T17:03:14.000+02:00", + "temperature": 23.0 + }"#; + + let expected_output = json!({ + "time" : "2013-06-22T17:03:14+02:00", + "temperature": 23.0 + }); + + let output = converter.convert(&new_tedge_message(input)); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) + .unwrap(), + expected_output + ); + } + + #[test] + fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true( + ) { + let mut converter = + AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024)); + + let input = r#"{ + "temperature": 23.0 + }"#; + + let expected_output = json!({ + "temperature": 23.0, + "time": "2021-04-08T00:00:00+05:00" + }); + + let output = converter.convert(&new_tedge_message(input)); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) + .unwrap(), + expected_output + ); + } + + #[test] + fn exceeding_threshold_returns_error() { + let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1)); + + let input = "ABC"; + let result = converter.try_convert(&new_tedge_message(input)); + + assert_matches!( + result, + Err(ConversionError::FromSizeThresholdExceeded( + SizeThresholdExceeded { + actual_size: 3, + threshold: 1 + } + )) + ); + } +} diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az_mapper.rs new file mode 100644 index 00000000..4d2e2af2 --- /dev/null +++ b/crates/core/tedge_mapper/src/az_mapper.rs @@ -0,0 +1,39 @@ +use crate::az_converter::AzureConverter; +use crate::component::TEdgeComponent; +use crate::mapper::*; +use crate::size_threshold::SizeThreshold; +use async_trait::async_trait; +use clock::WallClock; +use tedge_config::ConfigSettingAccessor; +use tedge_config::{AzureMapperTimestamp, TEdgeConfig}; +use tracing::{info_span, Instrument}; + +const AZURE_MAPPER_NAME: &str = "tedge-mapper-az"; + +pub struct AzureMapper {} + +impl AzureMapper { + pub fn new() -> AzureMapper { + AzureMapper {} + } +} + +#[async_trait] +impl TEdgeComponent for AzureMapper { + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); + let clock = Box::new(WallClock); + let size_threshold = SizeThreshold(255 * 1024); + + let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold)); + + let mut mapper = create_mapper(AZURE_MAPPER_NAME, &tedge_config, converter).await?; + + mapper + .run() + .instrument(info_span!(AZURE_MAPPER_NAME)) + .await?; + + Ok(()) + } +} diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs new file mode 100644 index 00000000..f8b9b6c3 --- /dev/null +++ b/crates/core/tedge_mapper/src/c8y_converter.rs @@ -0,0 +1,251 @@ +use crate::converter::*; +use crate::error::*; +use crate::size_threshold::SizeThreshold; +use c8y_translator::json; +use mqtt_client::{Message, Topic}; +use std::collections::HashSet; + +const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; + +pub struct CumulocityConverter { + pub(crate) size_threshold: SizeThreshold, + children: HashSet<String>, + pub(crate) mapper_config: MapperConfig, +} + +impl CumulocityConverter { + pub fn new(size_threshold: SizeThreshold) -> Self { + let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements"); + let () = topic_fiter + .add("tedge/measurements/+") + .expect("invalid topic filter"); + + let mapper_config = MapperConfig { + in_topic_filter: topic_fiter, + out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"), + errors_topic: make_valid_topic_or_panic("tedge/errors"), + }; + + let children: HashSet<String> = HashSet::new(); + CumulocityConverter { + size_threshold, + children, + mapper_config, + } + } +} + +impl Converter for CumulocityConverter { + type Error = ConversionError; + + fn get_mapper_config(&self) -> &MapperConfig { + &self.mapper_config + } + + fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> { + let () = self.size_threshold.validate(input.payload_str()?)?; + + let mut vec: Vec<Message> = Vec::new(); + + let maybe_child_id = get_child_id_from_topic(&input.topic.name)?; + match maybe_child_id { + Some(child_id) => { + // Need to check if the input Thin Edge JSON is valid before adding a child ID to list + let c8y_json_child_payload = + json::from_thin_edge_json_with_child(input.payload_str()?, child_id.as_str())?; + + if !self.children.contains(child_id.as_str()) { + self.children.insert(child_id.clone()); + vec.push(Message::new( + &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC), + format!("101,{},{},thin-edge.io-child", child_id, child_id), + )); + } + + vec.push(Message::new( + &self.mapper_config.out_topic, + c8y_json_child_payload, + )); + } + None => { + let c8y_json_payload = json::from_thin_edge_json(input.payload_str()?)?; + vec.push(Message::new( + &self.mapper_config.out_topic, + c8y_json_payload, + )); + } + } + Ok(vec) + } +} + +fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> { + match topic.strip_prefix("tedge/measurements/").map(String::from) { + Some(maybe_id) if maybe_id.is_empty() => { + Err(ConversionError::InvalidChildId { id: maybe_id }) + } + option => Ok(option), + } +} + +#[cfg(test)] +mod test { + use super::*; + use test_case::test_case; + + #[test_case("tedge/measurements/test", Some("test".to_string()); "valid child id")] + #[test_case("tedge/measurements/", None; "returns an error (empty value)")] + #[test_case("tedge/measurements", None; "invalid child id (parent topic)")] + #[test_case("foo/bar", None; "invalid child id (invalid topic)")] + fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) { + match get_child_id_from_topic(in_topic) { + Ok(maybe_id) => assert_eq!(maybe_id, expected_child_id), + Err(ConversionError::InvalidChildId { id }) => { + assert_eq!(id, "".to_string()) + } + _ => { + panic!("Unexpected error type") + } + } + } + + #[test] + fn convert_thin_edge_json_with_child_id() { + let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let in_topic = "tedge/measurements/child1"; + let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; + let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); + + let expected_smart_rest_message = Message::new( + &Topic::new_unchecked("c8y/s/us"), + "101,child1,child1,thin-edge.io-child", + ); + let expected_c8y_json_message = Message::new( + &Topic::new_unchecked("c8y/measurement/measurements/create"), + r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#, + ); + + // Test the first output messages contains SmartREST and C8Y JSON. + let out_first_messages = converter.convert(&in_message); + assert_eq!( + out_first_messages, + vec![ + expected_smart_rest_message, + expected_c8y_json_message.clone() + ] + ); + + // Test the second output messages doesn't contain SmartREST child device creation. + let out_second_messages = converter.convert(&in_message); + assert_eq!(out_second_messages, vec![expected_c8y_json_message.clone()]); + } + + #[test] + fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { + let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let in_topic = "tedge/measurements/child1"; + let in_invalid_payload = r#"{"temp": invalid}"#; + let in_valid_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; + let in_first_message = Message::new(&Topic::new_unchecked(in_topic), in_invalid_payload); + let in_second_message = Message::new(&Topic::new_unchecked(in_topic), in_valid_payload); + + // First convert invalid Thin Edge JSON message. + let out_first_messages = converter.convert(&in_first_message); + let expected_error_message = Message::new( + &Topic::new_unchecked("tedge/errors"), + r#"Invalid JSON: expected value at line 1 column 10: `invalid}`"#, + ); + assert_eq!(out_first_messages, vec![expected_error_message]); + + // Second convert valid Thin Edge JSON message. + let out_second_messages = converter.convert(&in_second_message); + let expected_smart_rest_message = Message::new( + &Topic::new_unchecked("c8y/s/us"), + "101,child1,child1,thin-edge.io-child", + ); + let expected_c8y_json_message = Message::new( + &Topic::new_unchecked("c8y/measurement/measurements/create"), + r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#, + ); + assert_eq!( + out_second_messages, + vec![ + expected_smart_rest_message, + expected_c8y_json_message.clone() + ] + ); + } + + #[test] + fn convert_two_thin_edge_json_messages_given_different_child_id() { + let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; + + // First message from "child1" + let in_first_message = Message::new( + &Topic::new_unchecked("tedge/measurements/child1"), + in_payload, + ); + let out_first_messages = converter.convert(&in_first_message); + let expected_first_smart_rest_message = Message::new( + &Topic::new_unchecked("c8y/s/us"), + "101,child1,child1,thin-edge.io-child", + ); + let expected_first_c8y_json_message = Message::new( + &Topic::new_unchecked("c8y/measurement/measurements/create"), + r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#, + ); + assert_eq!( + out_first_messages, + vec![ + expected_first_smart_rest_message, + expected_first_c8y_json_message + ] + ); + + // Second message from "child2" + let in_second_message = Message::new( + &Topic::new_unchecked("tedge/measurements/child2"), + in_payload, + ); + let out_second_messages = converter.convert(&in_second_message); + let expected_second_smart_rest_message = Message::new( + &Topic::new_unchecked("c8y/s/us"), + "101,child2,child2,thin-edge.io-child", + ); + let expected_second_c8y_json_message = Message::new( + &Topic::new_unchecked("c8y/measurement/measurements/create"), + r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child2","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#, + ); + assert_eq!( + out_second_messages, + vec![ + expected_second_smart_rest_message, + expected_second_c8y_json_message + ] + ); + } + + #[test] + fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { + let size_threshold = SizeThreshold(16 * 1024); + let converter = CumulocityConverter::new(size_threshold); + let buffer = create_packet(1024 * 20); + let err = converter.size_threshold.validate(&buffer).unwrap_err(); + assert_eq!( + err.to_string(), + "The input size 20480 is too big. The threshold is 16384." + ); + Ok(()) + } + + fn create_packet(size: usize) -> String { + let data: String = "Some data!".into(); + let loops = size / data.len(); + let mut buffer = String::with_capacity(size); + for _ in 0..loops { + buffer.push_str("Some data!"); + } + buffer + } +} diff --git a/crates/core/tedge_mapper/src/c8y_mapper.rs b/crates/core/tedge_mapper/src/c8y_mapper.rs new file mode 100644 index 00000000..c96645e2 --- /dev/null +++ b/crates/core/tedge_mapper/src/c8y_mapper.rs @@ -0,0 +1,35 @@ +use crate::c8y_converter::CumulocityConverter; +use crate::component::TEdgeComponent; +use crate::mapper::*; +use crate::size_threshold::SizeThreshold; +use async_trait::async_trait; +use tedge_config::TEdgeConfig; +use tracing::{info_span, Instrument}; + +const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y"; + +pub struct CumulocityMapper {} + +impl CumulocityMapper { + pub fn new() -> CumulocityMapper { + CumulocityMapper {} + } +} + +#[async_trait] +impl TEdgeComponent for CumulocityMapper { + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + let size_threshold = SizeThreshold(16 * 1024); + + let converter = Box::new(CumulocityConverter::new(size_threshold)); + + let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, &tedge_config, converter).await?; + + mapper + .run() + .instrument(info_span!(CUMULOCITY_MAPPER_NAME)) + .await?; + + Ok(()) + } +} diff --git a/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs b/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs new file mode 100644 index 00000000..74aae691 --- /dev/null +++ b/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs @@ -0,0 +1,123 @@ +use clock::Timestamp; +use mqtt_client::Payload; +use thin_edge_json::{ + group::{MeasurementGroup, MeasurementGrouper}, + measurement::MeasurementVisitor, + serialize::ThinEdgeJsonSerializer, +}; + +use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError}; +use chrono::Local; +use thin_edge_json::group::MeasurementGrouperError; + +#[derive(Debug)] +pub struct MessageBatch { + message_grouper: MeasurementGrouper, +} + +impl MessageBatch { + pub fn thin_edge_json_bytes( + messages: Vec<CollectdMessage>, + ) -> Result<Payload, DeviceMonitorError> { + let mut messages = messages.into_iter(); + + if let Some(first_message) = messages.next() { + let timestamp = first_message.timestamp.with_timezone(Local::now().offset()); + let mut batch = MessageBatch::start_batch(first_message, timestamp)?; + for message in messages { + batch.add_to_batch(message)?; + } + let measurements = batch.end_batch()?; + + let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); + measurements.accept(&mut tedge_json_serializer)?; + + let payload = tedge_json_serializer.bytes()?; + Ok(payload) + } else { + Err(DeviceMonitorError::FromInvalidThinEdgeJson( + MeasurementGrouperError::UnexpectedEnd, + )) + } + } + + fn start_batch( + collectd_message: CollectdMessage, + timestamp: Timestamp, + ) -> Result<Self, DeviceMonitorError> { + let mut message_grouper = MeasurementGrouper::new(); + message_grouper.visit_timestamp(timestamp)?; + + let mut message_batch = Self { message_grouper }; + + message_batch.add_to_batch(collectd_message)?; + + Ok(message_batch) + } + + fn add_to_batch( + &mut self, + collectd_message: CollectdMessage, + ) -> Result<(), DeviceMonitorError> { + collectd_message.accept(&mut self.message_grouper)?; + Ok(()) + } + + fn end_batch(self) -> Result<MeasurementGroup, DeviceMonitorError> { + Ok(self.message_grouper.end()?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use chrono::{TimeZone, Utc}; + use clock::{Clock, WallClock}; + + #[test] + fn test_message_batch_processor() -> anyhow::Result<()> { + let timestamp = Utc.ymd(2015, 5, 15).and_hms_milli(0, 0, 1, 444); + let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp); + let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?; + + let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp); + message_batch.add_to_batch(collectd_message)?; + + let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp); + message_batch.add_to_batch(collectd_message)?; + + let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp); + message_batch.add_to_batch(collectd_message)?; + + let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp); + message_batch.add_to_batch(collectd_message)?; + + let message_group = message_batch.end_batch()?; + + assert_matches!(message_group.timestamp(), Some(_)); + + assert_eq!( + message_group.get_measurement_value(Some("temperature"), "value"), + Some(32.5) + ); + assert_eq!( + message_group.get_measurement_value(Some("pressure"), "value"), + Some(98.2) + ); + assert_eq!( + message_group.get_measurement_value(Some("coordinate"), "x"), + Some(50.0) + ); + assert_eq!( + message_group.get_measurement_value(Some("coordinate"), "y"), + Some(70.0) + ); + assert_eq!( + message_group.get_measurement_value(Some("coordinate"), "z"), + Some(90.0) + ); + + Ok(()) + } +} diff --git a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs new file mode 100644 index 00000000..9d387314 --- /dev/null +++ b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs @@ -0,0 +1,323 @@ +use batcher::Batchable; +use chrono::{DateTime, NaiveDateTime, Utc}; +use mqtt_client::Message; +use thin_edge_json::measurement::MeasurementVisitor; + +#[derive(Debug)] +pub struct CollectdMessage { + pub metric_group_key: String, + pub metric_key: String, + pub timestamp: DateTime<Utc>, + pub metric_value: f64, +} + +#[derive(thiserror::Error, Debug)] +pub enum CollectdError { + #[error( + "Message received on invalid collectd topic: {0}. \ + Collectd message topics must be in the format collectd/<hostname>/<metric-plugin-name>/<metric-key>" + )] + InvalidMeasurementTopic(String), + + #[error("Invalid payload received on topic: {0}. Error: {1}")] + InvalidMeasurementPayload(String, CollectdPayloadError), + + #[error("Non UTF-8 payload: {0:?}")] + NonUTF8MeasurementPayload(Vec<u8>), +} + +impl CollectdMessage { + pub fn accept<T>(&self, visitor: &mut T) -> Result<(), T::Error> + where + T: MeasurementVisitor, + { + visitor.visit_grouped_measurement( + &self.metric_group_key, + &self.metric_key, + self.metric_value, + ) + } + + #[cfg(test)] + pub fn new( + metric_group_key: &str, + metric_key: &str, + metric_value: f64, + timestamp: DateTime<Utc>, + ) -> Self { + Self { + metric_group_key: metric_group_key.to_string(), + metric_key: metric_key.to_string(), + timestamp, + metric_value, + } + } + + pub fn parse_from(mqtt_message: &Message) -> Result<Self, CollectdError> { + let topic = mqtt_message.topic.name.as_str(); + let collectd_topic = match C |