summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r--crates/core/tedge_mapper/Cargo.toml65
-rw-r--r--crates/core/tedge_mapper/src/az_converter.rs201
-rw-r--r--crates/core/tedge_mapper/src/az_mapper.rs39
-rw-r--r--crates/core/tedge_mapper/src/c8y_converter.rs251
-rw-r--r--crates/core/tedge_mapper/src/c8y_mapper.rs35
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/batcher.rs123
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/collectd.rs323
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/error.rs21
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/mapper.rs34
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/mod.rs5
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/monitor.rs156
-rw-r--r--crates/core/tedge_mapper/src/component.rs7
-rw-r--r--crates/core/tedge_mapper/src/converter.rs43
-rw-r--r--crates/core/tedge_mapper/src/error.rs46
-rw-r--r--crates/core/tedge_mapper/src/main.rs85
-rw-r--r--crates/core/tedge_mapper/src/mapper.rs184
-rw-r--r--crates/core/tedge_mapper/src/size_threshold.rs23
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs52
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/json_c8y.rs278
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs867
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs6
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs442
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/topic.rs154
23 files changed, 3440 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
new file mode 100644
index 00000000..04d92d6d
--- /dev/null
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -0,0 +1,65 @@
+[package]
+name = "tedge_mapper"
+version = "0.4.3"
+authors = ["thin-edge.io team <info@thin-edge.io>"]
+edition = "2018"
+license = "Apache-2.0"
+description = "tedge_mapper is the mapper that translates thin-edge.io data model to c8y/az data model."
+
+[package.metadata.deb]
+depends = "tedge"
+maintainer-scripts = "configuration/debian/tedge_mapper"
+assets = [
+ ["../../configuration/init/systemd/tedge-mapper-az.service", "/lib/systemd/system/tedge-mapper-az.service", "644"],
+ ["../../configuration/init/systemd/tedge-mapper-c8y.service", "/lib/systemd/system/tedge-mapper-c8y.service", "644"],
+ ["../../configuration/init/systemd/tedge-mapper-collectd.service", "/lib/systemd/system/tedge-mapper-collectd.service", "644"],
+ ["../../configuration/init/systemd/tedge-mapper-sm-c8y.service", "/lib/systemd/system/tedge-mapper-sm-c8y.service", "644"],
+ ["../../configuration/contrib/collectd/collectd.conf", "/etc/tedge/contrib/collectd/", "644"],
+ ["target/release/tedge_mapper", "/usr/bin/tedge_mapper", "755"],
+]
+
+[package.metadata.deb.systemd-units]
+unit-scripts = "../../configuration/init/systemd"
+enable = false
+start = false
+stop-on-upgrade = false
+
+[dependencies]
+anyhow = "1.0"
+async-trait = "0.1"
+batcher = { path = "../../common/batcher" }
+c8y_smartrest = { path = "../c8y_smartrest" }
+c8y_translator = { path = "../c8y_translator" }
+chrono = "0.4"
+clock = { path = "../../common/clock" }
+csv = "1.1"
+flockfile = { path = "../../common/flockfile" }
+futures = "0.3"
+json_sm = { path = "../json_sm"}
+mockall = "0.10"
+mqtt_client = { path = "../../common/mqtt_client" }
+reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+structopt = "0.3"
+tedge_config = { path = "../../common/tedge_config" }
+tedge_users = { path = "../../common/tedge_users" }
+tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
+thin_edge_json = { path = "../thin_edge_json" }
+thiserror = "1.0"
+tokio = { version = "1.8", features = ["rt", "sync", "time"] }
+tracing = { version = "0.1", features = ["attributes", "log"] }
+
+[dev-dependencies]
+assert_matches = "1.5"
+assert-json-diff = "2.0"
+serde = "1.0"
+mqtt_tests = { path = "../../tests/mqtt_tests" }
+serde_json = "1.0"
+serial_test = "0.5"
+tempfile = "3.2"
+test-case = "1.2"
+tokio-test = "0.4"
+
+[features]
+integration-test = []
diff --git a/crates/core/tedge_mapper/src/az_converter.rs b/crates/core/tedge_mapper/src/az_converter.rs
new file mode 100644
index 00000000..ad4eada8
--- /dev/null
+++ b/crates/core/tedge_mapper/src/az_converter.rs
@@ -0,0 +1,201 @@
+use crate::converter::*;
+use crate::error::*;
+use crate::size_threshold::SizeThreshold;
+use clock::Clock;
+use mqtt_client::Message;
+use thin_edge_json::serialize::ThinEdgeJsonSerializer;
+
+pub struct AzureConverter {
+ pub(crate) add_timestamp: bool,
+ pub(crate) clock: Box<dyn Clock>,
+ pub(crate) size_threshold: SizeThreshold,
+ pub(crate) mapper_config: MapperConfig,
+}
+
+impl AzureConverter {
+ pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self {
+ let mapper_config = MapperConfig {
+ in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"),
+ out_topic: make_valid_topic_or_panic("az/messages/events/"),
+ errors_topic: make_valid_topic_or_panic("tedge/errors"),
+ };
+ AzureConverter {
+ add_timestamp,
+ clock,
+ size_threshold,
+ mapper_config,
+ }
+ }
+}
+
+impl Converter for AzureConverter {
+ type Error = ConversionError;
+
+ fn get_mapper_config(&self) -> &MapperConfig {
+ &self.mapper_config
+ }
+
+ fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
+ let input = input.payload_str()?;
+ let () = self.size_threshold.validate(input)?;
+ let default_timestamp = self.add_timestamp.then(|| self.clock.now());
+ let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp);
+ let () = thin_edge_json::parser::parse_str(input, &mut serializer)?;
+
+ let payload = serializer.into_string()?;
+ Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))])
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::size_threshold::SizeThresholdExceeded;
+ use assert_json_diff::*;
+ use assert_matches::*;
+ use chrono::{FixedOffset, TimeZone};
+ use mqtt_client::Topic;
+ use serde_json::json;
+
+ struct TestClock;
+
+ impl Clock for TestClock {
+ fn now(&self) -> clock::Timestamp {
+ FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0)
+ }
+ }
+
+ #[test]
+ fn converting_invalid_json_is_invalid() {
+ let mut converter =
+ AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
+
+ let input = "This is not Thin Edge JSON";
+ let result = converter.try_convert(&new_tedge_message(input));
+
+ assert_matches!(result, Err(ConversionError::FromThinEdgeJsonParser(_)))
+ }
+
+ fn new_tedge_message(input: &str) -> Message {
+ Message::new(&Topic::new_unchecked("tedge/measurements"), input)
+ }
+
+ fn extract_first_message_payload(mut messages: Vec<Message>) -> String {
+ messages.pop().unwrap().payload_str().unwrap().to_string()
+ }
+
+ #[test]
+ fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false(
+ ) {
+ let mut converter =
+ AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
+
+ let input = r#"{
+ "temperature": 23.0
+ }"#;
+
+ let expected_output = json!({
+ "temperature": 23.0
+ });
+
+ let output = converter.convert(&new_tedge_message(input));
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
+ .unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false()
+ {
+ let mut converter =
+ AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
+
+ let input = r#"{
+ "time" : "2013-06-22T17:03:14.000+02:00",
+ "temperature": 23.0
+ }"#;
+
+ let expected_output = json!({
+ "time" : "2013-06-22T17:03:14+02:00",
+ "temperature": 23.0
+ });
+
+ let output = converter.convert(&new_tedge_message(input));
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
+ .unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true()
+ {
+ let mut converter =
+ AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024));
+
+ let input = r#"{
+ "time" : "2013-06-22T17:03:14.000+02:00",
+ "temperature": 23.0
+ }"#;
+
+ let expected_output = json!({
+ "time" : "2013-06-22T17:03:14+02:00",
+ "temperature": 23.0
+ });
+
+ let output = converter.convert(&new_tedge_message(input));
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
+ .unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true(
+ ) {
+ let mut converter =
+ AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024));
+
+ let input = r#"{
+ "temperature": 23.0
+ }"#;
+
+ let expected_output = json!({
+ "temperature": 23.0,
+ "time": "2021-04-08T00:00:00+05:00"
+ });
+
+ let output = converter.convert(&new_tedge_message(input));
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
+ .unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn exceeding_threshold_returns_error() {
+ let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1));
+
+ let input = "ABC";
+ let result = converter.try_convert(&new_tedge_message(input));
+
+ assert_matches!(
+ result,
+ Err(ConversionError::FromSizeThresholdExceeded(
+ SizeThresholdExceeded {
+ actual_size: 3,
+ threshold: 1
+ }
+ ))
+ );
+ }
+}
diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az_mapper.rs
new file mode 100644
index 00000000..4d2e2af2
--- /dev/null
+++ b/crates/core/tedge_mapper/src/az_mapper.rs
@@ -0,0 +1,39 @@
+use crate::az_converter::AzureConverter;
+use crate::component::TEdgeComponent;
+use crate::mapper::*;
+use crate::size_threshold::SizeThreshold;
+use async_trait::async_trait;
+use clock::WallClock;
+use tedge_config::ConfigSettingAccessor;
+use tedge_config::{AzureMapperTimestamp, TEdgeConfig};
+use tracing::{info_span, Instrument};
+
+const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
+
+pub struct AzureMapper {}
+
+impl AzureMapper {
+ pub fn new() -> AzureMapper {
+ AzureMapper {}
+ }
+}
+
+#[async_trait]
+impl TEdgeComponent for AzureMapper {
+ async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
+ let clock = Box::new(WallClock);
+ let size_threshold = SizeThreshold(255 * 1024);
+
+ let converter = Box::new(AzureConverter::new(add_timestamp, clock, size_threshold));
+
+ let mut mapper = create_mapper(AZURE_MAPPER_NAME, &tedge_config, converter).await?;
+
+ mapper
+ .run()
+ .instrument(info_span!(AZURE_MAPPER_NAME))
+ .await?;
+
+ Ok(())
+ }
+}
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs
new file mode 100644
index 00000000..f8b9b6c3
--- /dev/null
+++ b/crates/core/tedge_mapper/src/c8y_converter.rs
@@ -0,0 +1,251 @@
+use crate::converter::*;
+use crate::error::*;
+use crate::size_threshold::SizeThreshold;
+use c8y_translator::json;
+use mqtt_client::{Message, Topic};
+use std::collections::HashSet;
+
+const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
+
+pub struct CumulocityConverter {
+ pub(crate) size_threshold: SizeThreshold,
+ children: HashSet<String>,
+ pub(crate) mapper_config: MapperConfig,
+}
+
+impl CumulocityConverter {
+ pub fn new(size_threshold: SizeThreshold) -> Self {
+ let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements");
+ let () = topic_fiter
+ .add("tedge/measurements/+")
+ .expect("invalid topic filter");
+
+ let mapper_config = MapperConfig {
+ in_topic_filter: topic_fiter,
+ out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
+ errors_topic: make_valid_topic_or_panic("tedge/errors"),
+ };
+
+ let children: HashSet<String> = HashSet::new();
+ CumulocityConverter {
+ size_threshold,
+ children,
+ mapper_config,
+ }
+ }
+}
+
+impl Converter for CumulocityConverter {
+ type Error = ConversionError;
+
+ fn get_mapper_config(&self) -> &MapperConfig {
+ &self.mapper_config
+ }
+
+ fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
+ let () = self.size_threshold.validate(input.payload_str()?)?;
+
+ let mut vec: Vec<Message> = Vec::new();
+
+ let maybe_child_id = get_child_id_from_topic(&input.topic.name)?;
+ match maybe_child_id {
+ Some(child_id) => {
+ // Need to check if the input Thin Edge JSON is valid before adding a child ID to list
+ let c8y_json_child_payload =
+ json::from_thin_edge_json_with_child(input.payload_str()?, child_id.as_str())?;
+
+ if !self.children.contains(child_id.as_str()) {
+ self.children.insert(child_id.clone());
+ vec.push(Message::new(
+ &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
+ format!("101,{},{},thin-edge.io-child", child_id, child_id),
+ ));
+ }
+
+ vec.push(Message::new(
+ &self.mapper_config.out_topic,
+ c8y_json_child_payload,
+ ));
+ }
+ None => {
+ let c8y_json_payload = json::from_thin_edge_json(input.payload_str()?)?;
+ vec.push(Message::new(
+ &self.mapper_config.out_topic,
+ c8y_json_payload,
+ ));
+ }
+ }
+ Ok(vec)
+ }
+}
+
+fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> {
+ match topic.strip_prefix("tedge/measurements/").map(String::from) {
+ Some(maybe_id) if maybe_id.is_empty() => {
+ Err(ConversionError::InvalidChildId { id: maybe_id })
+ }
+ option => Ok(option),
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use test_case::test_case;
+
+ #[test_case("tedge/measurements/test", Some("test".to_string()); "valid child id")]
+ #[test_case("tedge/measurements/", None; "returns an error (empty value)")]
+ #[test_case("tedge/measurements", None; "invalid child id (parent topic)")]
+ #[test_case("foo/bar", None; "invalid child id (invalid topic)")]
+ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
+ match get_child_id_from_topic(in_topic) {
+ Ok(maybe_id) => assert_eq!(maybe_id, expected_child_id),
+ Err(ConversionError::InvalidChildId { id }) => {
+ assert_eq!(id, "".to_string())
+ }
+ _ => {
+ panic!("Unexpected error type")
+ }
+ }
+ }
+
+ #[test]
+ fn convert_thin_edge_json_with_child_id() {
+ let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let in_topic = "tedge/measurements/child1";
+ let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
+ let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);
+
+ let expected_smart_rest_message = Message::new(
+ &Topic::new_unchecked("c8y/s/us"),
+ "101,child1,child1,thin-edge.io-child",
+ );
+ let expected_c8y_json_message = Message::new(
+ &Topic::new_unchecked("c8y/measurement/measurements/create"),
+ r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#,
+ );
+
+ // Test the first output messages contains SmartREST and C8Y JSON.
+ let out_first_messages = converter.convert(&in_message);
+ assert_eq!(
+ out_first_messages,
+ vec![
+ expected_smart_rest_message,
+ expected_c8y_json_message.clone()
+ ]
+ );
+
+ // Test the second output messages doesn't contain SmartREST child device creation.
+ let out_second_messages = converter.convert(&in_message);
+ assert_eq!(out_second_messages, vec![expected_c8y_json_message.clone()]);
+ }
+
+ #[test]
+ fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
+ let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let in_topic = "tedge/measurements/child1";
+ let in_invalid_payload = r#"{"temp": invalid}"#;
+ let in_valid_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
+ let in_first_message = Message::new(&Topic::new_unchecked(in_topic), in_invalid_payload);
+ let in_second_message = Message::new(&Topic::new_unchecked(in_topic), in_valid_payload);
+
+ // First convert invalid Thin Edge JSON message.
+ let out_first_messages = converter.convert(&in_first_message);
+ let expected_error_message = Message::new(
+ &Topic::new_unchecked("tedge/errors"),
+ r#"Invalid JSON: expected value at line 1 column 10: `invalid}`"#,
+ );
+ assert_eq!(out_first_messages, vec![expected_error_message]);
+
+ // Second convert valid Thin Edge JSON message.
+ let out_second_messages = converter.convert(&in_second_message);
+ let expected_smart_rest_message = Message::new(
+ &Topic::new_unchecked("c8y/s/us"),
+ "101,child1,child1,thin-edge.io-child",
+ );
+ let expected_c8y_json_message = Message::new(
+ &Topic::new_unchecked("c8y/measurement/measurements/create"),
+ r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#,
+ );
+ assert_eq!(
+ out_second_messages,
+ vec![
+ expected_smart_rest_message,
+ expected_c8y_json_message.clone()
+ ]
+ );
+ }
+
+ #[test]
+ fn convert_two_thin_edge_json_messages_given_different_child_id() {
+ let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
+
+ // First message from "child1"
+ let in_first_message = Message::new(
+ &Topic::new_unchecked("tedge/measurements/child1"),
+ in_payload,
+ );
+ let out_first_messages = converter.convert(&in_first_message);
+ let expected_first_smart_rest_message = Message::new(
+ &Topic::new_unchecked("c8y/s/us"),
+ "101,child1,child1,thin-edge.io-child",
+ );
+ let expected_first_c8y_json_message = Message::new(
+ &Topic::new_unchecked("c8y/measurement/measurements/create"),
+ r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child1","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#,
+ );
+ assert_eq!(
+ out_first_messages,
+ vec![
+ expected_first_smart_rest_message,
+ expected_first_c8y_json_message
+ ]
+ );
+
+ // Second message from "child2"
+ let in_second_message = Message::new(
+ &Topic::new_unchecked("tedge/measurements/child2"),
+ in_payload,
+ );
+ let out_second_messages = converter.convert(&in_second_message);
+ let expected_second_smart_rest_message = Message::new(
+ &Topic::new_unchecked("c8y/s/us"),
+ "101,child2,child2,thin-edge.io-child",
+ );
+ let expected_second_c8y_json_message = Message::new(
+ &Topic::new_unchecked("c8y/measurement/measurements/create"),
+ r#"{"type":"ThinEdgeMeasurement","externalSource":{"externalId":"child2","type":"c8y_Serial"},"temp":{"temp":{"value":1.0}},"time":"2021-11-16T17:45:40.571760714+01:00"}"#,
+ );
+ assert_eq!(
+ out_second_messages,
+ vec![
+ expected_second_smart_rest_message,
+ expected_second_c8y_json_message
+ ]
+ );
+ }
+
+ #[test]
+ fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+ let size_threshold = SizeThreshold(16 * 1024);
+ let converter = CumulocityConverter::new(size_threshold);
+ let buffer = create_packet(1024 * 20);
+ let err = converter.size_threshold.validate(&buffer).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "The input size 20480 is too big. The threshold is 16384."
+ );
+ Ok(())
+ }
+
+ fn create_packet(size: usize) -> String {
+ let data: String = "Some data!".into();
+ let loops = size / data.len();
+ let mut buffer = String::with_capacity(size);
+ for _ in 0..loops {
+ buffer.push_str("Some data!");
+ }
+ buffer
+ }
+}
diff --git a/crates/core/tedge_mapper/src/c8y_mapper.rs b/crates/core/tedge_mapper/src/c8y_mapper.rs
new file mode 100644
index 00000000..c96645e2
--- /dev/null
+++ b/crates/core/tedge_mapper/src/c8y_mapper.rs
@@ -0,0 +1,35 @@
+use crate::c8y_converter::CumulocityConverter;
+use crate::component::TEdgeComponent;
+use crate::mapper::*;
+use crate::size_threshold::SizeThreshold;
+use async_trait::async_trait;
+use tedge_config::TEdgeConfig;
+use tracing::{info_span, Instrument};
+
+const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
+
+pub struct CumulocityMapper {}
+
+impl CumulocityMapper {
+ pub fn new() -> CumulocityMapper {
+ CumulocityMapper {}
+ }
+}
+
+#[async_trait]
+impl TEdgeComponent for CumulocityMapper {
+ async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ let size_threshold = SizeThreshold(16 * 1024);
+
+ let converter = Box::new(CumulocityConverter::new(size_threshold));
+
+ let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, &tedge_config, converter).await?;
+
+ mapper
+ .run()
+ .instrument(info_span!(CUMULOCITY_MAPPER_NAME))
+ .await?;
+
+ Ok(())
+ }
+}
diff --git a/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs b/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs
new file mode 100644
index 00000000..74aae691
--- /dev/null
+++ b/crates/core/tedge_mapper/src/collectd_mapper/batcher.rs
@@ -0,0 +1,123 @@
+use clock::Timestamp;
+use mqtt_client::Payload;
+use thin_edge_json::{
+ group::{MeasurementGroup, MeasurementGrouper},
+ measurement::MeasurementVisitor,
+ serialize::ThinEdgeJsonSerializer,
+};
+
+use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError};
+use chrono::Local;
+use thin_edge_json::group::MeasurementGrouperError;
+
+#[derive(Debug)]
+pub struct MessageBatch {
+ message_grouper: MeasurementGrouper,
+}
+
+impl MessageBatch {
+ pub fn thin_edge_json_bytes(
+ messages: Vec<CollectdMessage>,
+ ) -> Result<Payload, DeviceMonitorError> {
+ let mut messages = messages.into_iter();
+
+ if let Some(first_message) = messages.next() {
+ let timestamp = first_message.timestamp.with_timezone(Local::now().offset());
+ let mut batch = MessageBatch::start_batch(first_message, timestamp)?;
+ for message in messages {
+ batch.add_to_batch(message)?;
+ }
+ let measurements = batch.end_batch()?;
+
+ let mut tedge_json_serializer = ThinEdgeJsonSerializer::new();
+ measurements.accept(&mut tedge_json_serializer)?;
+
+ let payload = tedge_json_serializer.bytes()?;
+ Ok(payload)
+ } else {
+ Err(DeviceMonitorError::FromInvalidThinEdgeJson(
+ MeasurementGrouperError::UnexpectedEnd,
+ ))
+ }
+ }
+
+ fn start_batch(
+ collectd_message: CollectdMessage,
+ timestamp: Timestamp,
+ ) -> Result<Self, DeviceMonitorError> {
+ let mut message_grouper = MeasurementGrouper::new();
+ message_grouper.visit_timestamp(timestamp)?;
+
+ let mut message_batch = Self { message_grouper };
+
+ message_batch.add_to_batch(collectd_message)?;
+
+ Ok(message_batch)
+ }
+
+ fn add_to_batch(
+ &mut self,
+ collectd_message: CollectdMessage,
+ ) -> Result<(), DeviceMonitorError> {
+ collectd_message.accept(&mut self.message_grouper)?;
+ Ok(())
+ }
+
+ fn end_batch(self) -> Result<MeasurementGroup, DeviceMonitorError> {
+ Ok(self.message_grouper.end()?)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use assert_matches::assert_matches;
+ use chrono::{TimeZone, Utc};
+ use clock::{Clock, WallClock};
+
+ #[test]
+ fn test_message_batch_processor() -> anyhow::Result<()> {
+ let timestamp = Utc.ymd(2015, 5, 15).and_hms_milli(0, 0, 1, 444);
+ let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp);
+ let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?;
+
+ let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp);
+ message_batch.add_to_batch(collectd_message)?;
+
+ let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp);
+ message_batch.add_to_batch(collectd_message)?;
+
+ let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp);
+ message_batch.add_to_batch(collectd_message)?;
+
+ let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp);
+ message_batch.add_to_batch(collectd_message)?;
+
+ let message_group = message_batch.end_batch()?;
+
+ assert_matches!(message_group.timestamp(), Some(_));
+
+ assert_eq!(
+ message_group.get_measurement_value(Some("temperature"), "value"),
+ Some(32.5)
+ );
+ assert_eq!(
+ message_group.get_measurement_value(Some("pressure"), "value"),
+ Some(98.2)
+ );
+ assert_eq!(
+ message_group.get_measurement_value(Some("coordinate"), "x"),
+ Some(50.0)
+ );
+ assert_eq!(
+ message_group.get_measurement_value(Some("coordinate"), "y"),
+ Some(70.0)
+ );
+ assert_eq!(
+ message_group.get_measurement_value(Some("coordinate"), "z"),
+ Some(90.0)
+ );
+
+ Ok(())
+ }
+}
diff --git a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs
new file mode 100644
index 00000000..9d387314
--- /dev/null
+++ b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs
@@ -0,0 +1,323 @@</