summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--Cargo.lock30
-rw-r--r--Cargo.toml1
-rw-r--r--common/mqtt_client/src/lib.rs8
-rw-r--r--mapper/dm_agent/Cargo.toml27
-rw-r--r--mapper/dm_agent/src/batcher.rs392
-rw-r--r--mapper/dm_agent/src/collectd.rs250
-rw-r--r--mapper/dm_agent/src/main.rs31
-rw-r--r--mapper/dm_agent/src/monitor.rs50
-rw-r--r--mapper/dm_agent/src/mqtt.rs86
-rw-r--r--mapper/thin_edge_json/src/group.rs56
11 files changed, 932 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index 1325842c..05894aae 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@
**/*.rs.bk
.idea/
+*.iml
.tmp/
.venv/
diff --git a/Cargo.lock b/Cargo.lock
index 6b0a8d1c..768e96f7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -140,6 +140,17 @@ dependencies = [
]
[[package]]
+name = "async-trait"
+version = "0.1.50"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722"
+dependencies = [
+ "proc-macro2 1.0.26",
+ "quote 1.0.9",
+ "syn 1.0.68",
+]
+
+[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1827,6 +1838,25 @@ dependencies = [
]
[[package]]
+name = "tedge_dm_agent"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "assert_matches",
+ "async-trait",
+ "chrono",
+ "futures",
+ "mockall",
+ "mqtt_client",
+ "thin_edge_json",
+ "thiserror",
+ "tokio",
+ "tokio-test",
+ "tracing",
+ "tracing-subscriber",
+]
+
+[[package]]
name = "tedge_mapper"
version = "0.1.0"
dependencies = [
diff --git a/Cargo.toml b/Cargo.toml
index e94ebcb2..93613c2e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ members = [
"mapper/tedge_mapper",
"mapper/cumulocity/c8y_translator_lib",
"mapper/thin_edge_json",
+ "mapper/dm_agent"
]
[profile.release]
diff --git a/common/mqtt_client/src/lib.rs b/common/mqtt_client/src/lib.rs
index 3dd05c3f..20f22d55 100644
--- a/common/mqtt_client/src/lib.rs
+++ b/common/mqtt_client/src/lib.rs
@@ -370,6 +370,14 @@ impl Config {
}
}
+ /// Update queue_capcity.
+ pub fn queue_capacity(self, queue_capacity: usize) -> Self {
+ Self {
+ queue_capacity,
+ ..self
+ }
+ }
+
/// Enable `clean_session`.
pub fn clean_session(self) -> Self {
Self {
diff --git a/mapper/dm_agent/Cargo.toml b/mapper/dm_agent/Cargo.toml
new file mode 100644
index 00000000..84371007
--- /dev/null
+++ b/mapper/dm_agent/Cargo.toml
@@ -0,0 +1,27 @@
+[package]
+name = "tedge_dm_agent"
+version = "0.1.0"
+edition = "2018"
+authors = ["thin-edge.io team <info@thin-edge.io>"]
+license = "Apache-2.0"
+readme = "README.md"
+description = "The daemon providing monitoring and remote management capabilities to tedge"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+thin_edge_json = {path = "../thin_edge_json" }
+mqtt_client = {path = "../../common/mqtt_client" }
+chrono = "0.4"
+futures = "0.3"
+tokio = { version = "1.1", features = ["rt", "sync", "time"] }
+anyhow = "1.0"
+thiserror = "1.0"
+tracing = { version = "0.1", features = ["attributes", "log"] }
+tracing-subscriber = "0.2"
+mockall = "0.9"
+async-trait = "0.1"
+
+[dev-dependencies]
+assert_matches = "1.4"
+tokio-test = "0.4.1"
diff --git a/mapper/dm_agent/src/batcher.rs b/mapper/dm_agent/src/batcher.rs
new file mode 100644
index 00000000..3cd7665d
--- /dev/null
+++ b/mapper/dm_agent/src/batcher.rs
@@ -0,0 +1,392 @@
+use crate::mqtt::{MqttClient, MqttMessageStream};
+use mqtt_client::{Message, QoS, Topic, TopicFilter};
+use std::{sync::Arc, time::Duration};
+use thin_edge_json::{
+ group::MeasurementGrouper,
+ measurement::current_timestamp,
+ measurement::FlatMeasurementVisitor,
+ serialize::{ThinEdgeJsonSerializationError, ThinEdgeJsonSerializer},
+};
+use tokio::{
+ select,
+ sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender},
+ time::{interval, Interval},
+};
+use tracing::{error, log::warn};
+
+use crate::collectd::{self, CollectdMessage};
+
+const DEFAULT_STATS_COLLECTION_WINDOW: u64 = 1000;
+
+const SOURCE_TOPIC: &str = "collectd/#";
+const TARGET_TOPIC: &str = "tedge/measurements";
+
+#[derive(thiserror::Error, Debug)]
+pub enum DeviceMonitorError {
+ #[error(transparent)]
+ MqttClientError(#[from] Arc<mqtt_client::Error>),
+
+ #[error(transparent)]
+ InvalidCollectdMeasurementError(#[from] collectd::CollectdError),
+
+ #[error(transparent)]
+ InvalidThinEdgeJsonError(#[from] thin_edge_json::group::MeasurementGrouperError),
+
+ #[error(transparent)]
+ ThinEdgeJsonSerializationError(#[from] ThinEdgeJsonSerializationError),
+
+ #[error(transparent)]
+ BatchingError(#[from] SendError<MeasurementGrouper>),
+}
+
+impl From<mqtt_client::Error> for DeviceMonitorError {
+ fn from(error: mqtt_client::Error) -> Self {
+ Self::MqttClientError(Arc::new(error))
+ }
+}
+
+#[derive(Debug)]
+pub struct MessageBatch {
+ message_grouper: MeasurementGrouper,
+}
+
+impl MessageBatch {
+ pub fn start_batch(message: Message) -> Result<Self, DeviceMonitorError> {
+ let mut message_grouper = MeasurementGrouper::new();
+ message_grouper.timestamp(&current_timestamp())?;
+
+ let mut message_batch = Self { message_grouper };
+
+ message_batch.add_to_batch(message)?;
+
+ Ok(message_batch)
+ }
+
+ pub fn add_to_batch(&mut self, message: Message) -> Result<(), DeviceMonitorError> {
+ let collectd_message = CollectdMessage::parse_from(&message)?;
+
+ self.message_grouper.measurement(
+ Some(collectd_message.metric_group_key),
+ collectd_message.metric_key,
+ collectd_message.metric_value,
+ )?;
+
+ Ok(())
+ }
+
+ pub fn end_batch(self) -> MeasurementGrouper {
+ self.message_grouper
+ }
+}
+
+pub struct MessageBatcher {
+ sender: UnboundedSender<MeasurementGrouper>,
+ mqtt_client: Arc<dyn MqttClient>,
+ topic_filter: TopicFilter,
+}
+
+impl MessageBatcher {
+ pub fn new(
+ sender: UnboundedSender<MeasurementGrouper>,
+ mqtt_client: Arc<dyn MqttClient>,
+ ) -> Result<Self, DeviceMonitorError> {
+ let topic_filter = TopicFilter::new(SOURCE_TOPIC)?.qos(QoS::AtMostOnce);
+ Ok(Self {
+ sender,
+ mqtt_client,
+ topic_filter,
+ })
+ }
+
+ pub async fn run(&self) -> Result<(), DeviceMonitorError> {
+ let mut messages = self
+ .mqtt_client
+ .subscribe(self.topic_filter.clone())
+ .await?;
+
+ let batching_window = Duration::from_millis(DEFAULT_STATS_COLLECTION_WINDOW);
+
+ loop {
+ match messages.next().await {
+ Some(message) => {
+ // Build a message batch until the batching window times out and return the batch
+ let message_batch_result = self
+ .build_message_batch_with_timeout(
+ message,
+ messages.as_mut(),
+ interval(batching_window),
+ )
+ .await;
+
+ match message_batch_result {
+ Ok(message_batch) => {
+ //Send the current batch to the batch processor
+ let _ = self.sender.send(message_batch).map_err(|err| {
+ error!("Error while publishing a message batch: {}", err)
+ });
+ }
+ Err(err) => {
+ error!("Error while building a message batch: {}", err);
+ }
+ }
+ }
+ None => {
+ //If the message batching loop returns, it means the MQTT connection has closed
+ error!("MQTT connection closed. Retrying...");
+ }
+ }
+ }
+ }
+
+ async fn build_message_batch_with_timeout(
+ &self,
+ first_message: Message,
+ messages: &mut dyn MqttMessageStream,
+ mut timeout: Interval,
+ ) -> Result<MeasurementGrouper, DeviceMonitorError> {
+ let mut message_batch = MessageBatch::start_batch(first_message)?;
+ timeout.tick().await; // The first tick starts the timeout window
+
+ loop {
+ select! {
+ maybe_message = messages.next() => {
+ match maybe_message {
+ Some(message) => message_batch.add_to_batch(message)?,
+ None => break
+ }
+ }
+
+ _result = timeout.tick() => {
+ break;
+ }
+ }
+ }
+
+ Ok(message_batch.end_batch())
+ }
+}
+
+pub struct MessageBatchPublisher {
+ receiver: UnboundedReceiver<MeasurementGrouper>,
+ mqtt_client: Arc<dyn MqttClient>,
+ topic: Topic,
+}
+
+impl MessageBatchPublisher {
+ pub fn new(
+ receiver: UnboundedReceiver<MeasurementGrouper>,
+ mqtt_client: Arc<dyn MqttClient>,
+ ) -> Result<Self, DeviceMonitorError> {
+ let topic = Topic::new(TARGET_TOPIC)?;
+
+ Ok(Self {
+ receiver,
+ mqtt_client,
+ topic,
+ })
+ }
+
+ pub async fn run(&mut self) {
+ while let Some(message_grouper) = self.receiver.recv().await {
+ if let Err(err) = self.publish_as_mqtt_message(message_grouper).await {
+ error!("Error publishing the measurement batch: {}", err);
+ }
+ }
+
+ warn!("MQTT message channel closed. Can not proceed");
+ }
+
+ async fn publish_as_mqtt_message(
+ &mut self,
+ message_grouper: MeasurementGrouper,
+ ) -> Result<(), DeviceMonitorError> {
+ let mut tedge_json_serializer = ThinEdgeJsonSerializer::new();
+ message_grouper.accept(&mut tedge_json_serializer)?;
+
+ let tedge_message = Message::new(&self.topic, tedge_json_serializer.bytes()?);
+
+ self.mqtt_client.publish(tedge_message).await?;
+
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+
+ use crate::mqtt::MockMqttClient;
+ use crate::mqtt::MockMqttErrorStream;
+ use crate::mqtt::MockMqttMessageStream;
+ use assert_matches::assert_matches;
+ use futures::future::{pending, ready};
+ use mockall::Sequence;
+ use tokio::time::sleep;
+
+ use super::*;
+
+ #[test]
+ fn test_message_batch_processor() -> anyhow::Result<()> {
+ let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
+ let collectd_message = Message::new(&topic, "123456789:32.5");
+ let mut message_batch = MessageBatch::start_batch(collectd_message)?;
+
+ let topic = Topic::new("collectd/localhost/coordinate/x").unwrap();
+ let collectd_message = Message::new(&topic, "123456789:50");
+ message_batch.add_to_batch(collectd_message)?;
+
+ let topic = Topic::new("collectd/localhost/coordinate/y").unwrap();
+ let collectd_message = Message::new(&topic, "123456789:70");
+ message_batch.add_to_batch(collectd_message)?;
+
+ let topic = Topic::new("collectd/localhost/pressure/value").unwrap();
+ let collectd_message = Message::new(&topic, "123456789:98.2");
+ message_batch.add_to_batch(collectd_message)?;
+
+ let topic = Topic::new("collectd/localhost/coordinate/z").unwrap();
+ let collectd_message = Message::new(&topic, "123456789:90");
+ message_batch.add_to_batch(collectd_message)?;
+
+ let message_grouper = message_batch.end_batch();
+
+ assert_matches!(message_grouper.timestamp, Some(_));
+
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("temperature"), "value"),
+ Some(32.5)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("pressure"), "value"),
+ Some(98.2)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("coordinate"), "x"),
+ Some(50.0)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("coordinate"), "y"),
+ Some(70.0)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("coordinate"), "z"),
+ Some(90.0)
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn invalid_collectd_message_format() {
+ let topic = Topic::new("collectd/host/group/key").unwrap();
+ let invalid_collectd_message = Message::new(&topic, "123456789");
+ let result = MessageBatch::start_batch(invalid_collectd_message);
+
+ assert_matches!(
+ result,
+ Err(DeviceMonitorError::InvalidCollectdMeasurementError(_))
+ );
+ }
+
+ #[tokio::test]
+ async fn batch_publisher() {
+ let mut message_grouper = MeasurementGrouper::new();
+ message_grouper
+ .measurement(Some("temperature"), "value", 32.5)
+ .unwrap();
+
+ let (_sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>();
+
+ let mut mqtt_client = MockMqttClient::new();
+ mqtt_client.expect_publish().times(1).returning(|message| {
+ assert_eq!(message.topic.name, TARGET_TOPIC.to_string()); //The test assertion happens here
+ Ok(123)
+ });
+
+ let mut publisher = MessageBatchPublisher::new(receiver, Arc::new(mqtt_client)).unwrap();
+ publisher
+ .publish_as_mqtt_message(message_grouper)
+ .await
+ .unwrap();
+ }
+
+ //TODO Control the timeout better with mocked clocks
+ #[tokio::test]
+ async fn batching_with_window_timeout() {
+ let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>();
+
+ let mut mqtt_client = MockMqttClient::new();
+
+ mqtt_client.expect_subscribe_errors().returning(|| {
+ let error_stream = MockMqttErrorStream::default();
+ Box::new(error_stream)
+ });
+
+ mqtt_client.expect_subscribe().returning(|_| {
+ let message_stream = MockMqttMessageStream::default();
+ Ok(Box::new(message_stream))
+ });
+
+ let mut seq = Sequence::new(); //To control the order of mock returns
+ let mut message_stream = MockMqttMessageStream::default();
+ message_stream
+ .expect_next()
+ .times(1)
+ .in_sequence(&mut seq) //The first value to be returned by this mock stream
+ .returning(|| {
+ let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
+ let message = Message::new(&topic, "123456789:32.5");
+ Box::pin(ready(Some(message)))
+ });
+
+ message_stream
+ .expect_next()
+ .times(1)
+ .in_sequence(&mut seq) //The second value to be returend by this mock stream
+ .returning(|| {
+ let topic = Topic::new("collectd/localhost/pressure/value").unwrap();
+ let message = Message::new(&topic, "123456789:98.2");
+ Box::pin(ready(Some(message)))
+ });
+
+ //The third message from this stream will be returned only after the batching window
+ message_stream
+ .expect_next()
+ .times(1)
+ .in_sequence(&mut seq) //The third value to be returend by this mock stream
+ .returning(|| {
+ Box::pin(async {
+ sleep(Duration::from_millis(1000)).await; //Sleep for a duration greater than the batching window
+ let topic = Topic::new("collectd/localhost/speed/value").unwrap();
+ let message = Message::new(&topic, "123456789:350");
+ Some(message)
+ })
+ });
+
+ //Block the stream from the 4th invocation onwards
+ message_stream
+ .expect_next()
+ .returning(|| Box::pin(pending())); //Block the stream with a pending future
+
+ let mut timeout = interval(Duration::from_millis(500));
+ timeout.tick().await; // The first tick starts the timeout window
+
+ let first_message = message_stream.next().await.unwrap();
+ let builder = MessageBatcher::new(sender, Arc::new(mqtt_client)).unwrap();
+ let message_grouper = builder
+ .build_message_batch_with_timeout(first_message, &mut message_stream, timeout)
+ .await
+ .unwrap();
+
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("temperature"), "value"),
+ Some(32.5)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("pressure"), "value"),
+ Some(98.2)
+ );
+ assert_eq!(
+ message_grouper.get_measurement_value(Some("speed"), "value"),
+ None //This measurement isn't included in the batch because it came after the batching window
+ );
+ }
+}
diff --git a/mapper/dm_agent/src/collectd.rs b/mapper/dm_agent/src/collectd.rs
new file mode 100644
index 00000000..ad87e94c
--- /dev/null
+++ b/mapper/dm_agent/src/collectd.rs
@@ -0,0 +1,250 @@
+use std::str::from_utf8;
+
+use mqtt_client::Message;
+
+#[derive(Debug)]
+pub struct CollectdMessage<'a> {
+ pub metric_group_key: &'a str,
+ pub metric_key: &'a str,
+ pub metric_value: f64,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum CollectdError {
+ #[error(
+ "Message received on invalid collectd topic: {0}. \
+ Collectd message topics must be in the format collectd/<hostname>/<metric-plugin-name>/<metric-key>"
+ )]
+ InvalidMeasurementTopic(String),
+
+ #[error("Invalid payload received on topic: {0}. Error: {1}")]
+ InvalidMeasurementPayload(String, CollectdPayloadError),
+}
+
+impl<'a> CollectdMessage<'a> {
+ pub fn parse_from(mqtt_message: &'a Message) -> Result<Self, CollectdError> {
+ let topic = mqtt_message.topic.name.as_str();
+ let collectd_topic = match CollectdTopic::from_str(topic) {
+ Ok(collectd_topic) => collectd_topic,
+ Err(_) => {
+ return Err(CollectdError::InvalidMeasurementTopic(topic.into()));
+ }
+ };
+
+ let collectd_payload = CollectdPayload::parse_from(mqtt_message.payload.as_slice())
+ .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?;
+
+ Ok(CollectdMessage {
+ metric_group_key: collectd_topic.metric_group_key,
+ metric_key: collectd_topic.metric_key,
+ metric_value: collectd_payload.metric_value,
+ })
+ }
+}
+
+#[derive(Debug)]
+struct CollectdTopic<'a> {
+ metric_group_key: &'a str,
+ metric_key: &'a str,
+}
+
+#[derive(Debug)]
+struct InvalidCollectdTopicName;
+
+impl<'a> CollectdTopic<'a> {
+ fn from_str(topic_name: &'a str) -> Result<Self, InvalidCollectdTopicName> {
+ let mut iter = topic_name.split('/');
+ let _collectd_prefix = iter.next().ok_or(InvalidCollectdTopicName)?;
+ let _hostname = iter.next().ok_or(InvalidCollectdTopicName)?;
+ let metric_group_key = iter.next().ok_or(InvalidCollectdTopicName)?;
+ let metric_key = iter.next().ok_or(InvalidCollectdTopicName)?;
+
+ match iter.next() {
+ None => Ok(CollectdTopic {
+ metric_group_key,
+ metric_key,
+ }),
+ Some(_) => Err(InvalidCollectdTopicName),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct CollectdPayload {
+ _timestamp: f64,
+ metric_value: f64,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum CollectdPayloadError {
+ #[error("Non UTF-8 payload: {0:?}")]
+ NonUTF8MeasurementPayload(Vec<u8>),
+
+ #[error("Invalid payload: {0}. Expected payload format: <timestamp>:<value>")]
+ InvalidMeasurementPayloadFormat(String),
+
+ #[error("Invalid measurement timestamp: {0}. Epoch time value expected")]
+ InvalidMeasurementTimestamp(String),
+
+ #[error("Invalid measurement value: {0}. Must be a number")]
+ InvalidMeasurementValue(String),
+}
+
+impl CollectdPayload {
+ fn parse_from(payload: &[u8]) -> Result<Self, CollectdPayloadError> {
+ let payload = from_utf8(payload)
+ .map_err(|_err| CollectdPayloadError::NonUTF8MeasurementPayload(payload.into()))?;
+ let mut iter = payload.split(':');
+
+ let _timestamp =
+ iter.next()
+ .ok_or(CollectdPayloadError::InvalidMeasurementPayloadFormat(
+ payload.to_string(),
+ ))?;
+ let _timestamp = _timestamp.parse::<f64>().map_err(|_err| {
+ CollectdPayloadError::InvalidMeasurementTimestamp(_timestamp.to_string())
+ })?;
+
+ let metric_value =
+ iter.next()
+ .ok_or(CollectdPayloadError::InvalidMeasurementPayloadFormat(
+ payload.to_string(),
+ ))?;
+ let metric_value = metric_value
+ .trim_end_matches(char::from(0)) //Trim \u{0} character from the end of the MQTT payload
+ .parse::<f64>()
+ .map_err(|_err| {
+ CollectdPayloadError::InvalidMeasurementValue(metric_value.to_string())
+ })?;
+
+ match iter.next() {
+ None => Ok(CollectdPayload {
+ _timestamp,
+ metric_value,
+ }),
+ Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(
+ payload.to_string(),
+ )),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use assert_matches::assert_matches;
+ use mqtt_client::Topic;
+
+ use super::*;
+
+ #[test]
+ fn collectd_message_parsing() {
+ let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
+ let mqtt_message = Message::new(&topic, "123456789:32.5");
+
+ let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap();
+
+ let CollectdMessage {
+ metric_group_key,
+ metric_key,
+ metric_value,
+ } = collectd_message;
+
+ assert_eq!(metric_group_key, "temperature");
+ assert_eq!(metric_key, "value");
+ assert_eq!(metric_value, 32.5);
+ }
+
+ #[test]
+ fn invalid_collectd_message_topic() {
+ let topic = Topic::new("collectd/less/level").unwrap();
+ let mqtt_message = Message::new(&topic, "123456789:32.5");
+
+ let result = CollectdMessage::parse_from(&mqtt_message);
+
+ assert_matches!(result, Err(CollectdError::InvalidMeasurementTopic(_)));
+ }
+
+ #[test]
+ fn invalid_collectd_message_payload() {
+ let topic = Topic::new("collectd/host/group/key").unwrap();
+ let invalid_collectd_message = Message::new(&topic, "123456789");
+
+ let result = CollectdMessage::parse_from(&invalid_collectd_message);
+
+ assert_matches!(result, Err(CollectdError::InvalidMeasurementPayload(_, _)));
+ }
+
+ #[test]
+ fn invalid_collectd_topic_less_levels() {
+ let result = CollectdTopic::from_str("collectd/less/levels");
+
+ assert_matches!(result, Err(InvalidCollectdTopicName));
+ }
+
+ #[test]
+ fn invalid_collectd_topic_more_levels() {
+ let result = CollectdTopic::from_str("collectd/more/levels/than/needed");
+
+ assert_matches!(result, Err(InvalidCollectdTopicName));
+ }
+
+ #[test]
+ fn invalid_collectd_payload_no_seperator() {
+ let payload: Vec<u8> = "123456789".into();
+ let result = CollectdPayload::parse_from(&payload);
+
+ assert_matches!(
+ result,
+ Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_))
+ );
+ }
+
+ #[test]
+ fn invalid_collectd_payload_more_seperators() {
+ let payload: Vec<u8> = "123456789:98.6:abc".into();
+ let result = CollectdPayload::parse_from(&payload);
+
+ assert_matches!(
+ result,
+ Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_))
+ );
+ }
+
+ #[test]
+ fn invalid_collectd_metric_value() {
+ let payload: Vec<u8> = "123456789:abc".into();
+ let result = CollectdPayload::parse_from(&payload);
+
+ assert_matches!(
+ result,
+ Err(CollectdPayloadError::InvalidMeasurementValue(_))
+ );
+ }
+
+ #[test]
+ fn invalid_collectd_metric_timestamp() {
+ let payload: Vec<u8> = "abc:98.6".into();
+ let result = CollectdPayload::parse_from(&payload);
+
+ assert_matches!(
+ result,
+ Err(CollectdPayloadError::InvalidMeasurementTimestamp(_))
+ );
+ }
+
+ #[test]
+ fn very_large_metric_value() {
+ let payload: Vec<u8> = format!("123456789:{}", u128::MAX).into();
+ let collectd_payload = CollectdPayload::parse_from(&payload).unwrap();
+
+ assert_eq!(collectd_payload.metric_value, u128::MAX as f64);
+ }
+
+ #[test]
+ fn very_small_metric_value() {
+ let payload: Vec<u8> = format!("123456789:{}", i128::MIN).into();
+ let collectd_payload = CollectdPayload::parse_from(&payload).unwrap();
+
+ assert_eq!(collectd_payload.metric_value, i128::MIN as f64);
+ }
+}
diff --git a/mapper/dm_agent/src/main.rs b/mapper/dm_agent/src/main.rs
new file mode 100644
index 00000000..b1f19827
--- /dev/null
+++ b/mapper/dm_agent/src/main.rs
@@ -0,0 +1,31 @@
+mod batcher;
+mod collectd;
+mod monitor;
+mod mqtt;
+
+use tracing::{debug_span, info, Instrument};
+
+use crate::monitor::DeviceMonitor;
+
+const APP_NAME: &str = "tedge-dm-agent";
+const DEFAULT_LOG_LEVEL: &str = "warn";
+const TIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f%:z";
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+ let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| DEFAULT_LOG_LEVEL.into());
+ tracing_subscriber::fmt()
+ .with_timer(tracing_subscriber::fmt::time::ChronoUtc::with_format(
+ TIME_FORMAT.into(),
+ ))
+ .with_env_filter(filter)
+ .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
+ .init();
+
+ info!("{} starting!", APP_NAME);
+ DeviceMonitor::run()
+ .instrument(debug_span!(APP_NAME))
+ .await?;
+
+ Ok(())
+}
diff --git a/mapper/dm_agent/src/monitor.rs b/mapper/dm_agent/src/monitor.rs
new file mode 100644
index 00000000..5fde4ac8
--- /dev/null
+++ b/mapper/dm_agent/src/monitor.rs
@@ -0,0 +1,50 @@
+use thin_edge_json::group::MeasurementGrouper;
+use tracing::{instrument, log::error};
+
+use crate::{
+ batcher::{DeviceMonitorError, MessageBatchPublisher, MessageBatcher},
+ mqtt::{MqttClient, MqttClientImpl},
+};
+
+const DEFAULT_HOST: &str = "localhost";
+const DEFAULT_PORT: u16 = 1883;
+const CLIENT_ID: &str = "tedge-dm-agent";
+
+#[derive(Debug)]
+pub struct DeviceMonitor;
+
+impl DeviceMonitor {
+ #[instrument(name = "monitor")]
+ pub async fn run() -> Result<(), DeviceMonitorError> {
+ let config = mqtt_client::Config::new(DEFAULT_HOST, DEFAULT_PORT).queue_capacity(1024);
+ let mqtt_client = MqttClientImpl::connect(CLIENT_ID, &config).await?;
+
+ let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>();
+
+ let message_batch_producer = MessageBatcher::new(sender, mqtt_client.clone())?;
+ let join_handle1 = tokio::task::spawn(async move {
+ match message_batch_producer.run().await {
+ Ok(_) => error!("Unexpected end of message batcher thread"),
+ Err(err) => error!("Error in message batcher thread: {}", err),
+ }
+ });
+
+ let mut message_batch_consumer = MessageBatchPublisher::new(receiver, mqtt_client.clone())?;
+ let join_handle2 = tokio::task::spawn(async move {
+ message_batch_consumer.run().await;
+ });
+
+ let mut errors = mqtt_client.subscribe_errors();
+ let join_handle3 = tokio::task::spawn(async move {
+ while let Some(error) = errors.next().await {
+ error!("MQTT error: {}", error);
+ }
+ });
+
+ let _ = join_handle1.await;
+ let _ = join_handle2.await;
+ let _ = join_handle3.await;
+
+ Ok(())
+ }
+}
diff --git a/mapper/dm_agent/src/mqtt.rs b/mapper/dm_agent/src/mqtt.rs
new file mode 100644
index 00000000..0d18be41
--- /dev/null
+++ b/mapper/dm_agent/src/mqtt.rs
@@ -0,0 +1,86 @@
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use mockall::automock;
+use mqtt_client::{Client, ErrorStream, Message, MessageId, MessageStream, TopicFilter};
+
+#[automock]
+#[async_trait]
+pub trait MqttClient: Send + Sync {
+ fn subscribe_errors(&self) -> Box<dyn MqttErrorStream>;
+
+ async fn subscribe(
+ &self,
+ filter: TopicFilter,
+ ) -> Result<Box<dyn MqttMessageStream>, mqtt_client::Error>;
+
+ async fn publish(&self, message: Message) -> Result<MessageId, mqtt_client::Error>;
+}
+
+#[async_trait]
+#[automock]
+pub trait MqttMessageStream: Send + Sync {
+ async fn next(&mut self) -> Option<Message>;
+}
+
+pub struct MqttMessageStreamImpl {
+ message_stream: MessageStream,
+}
+
+#[async_trait]
+impl MqttMessageStream for MqttMessageStreamImpl {
+ async fn next(&mut self) -> Option<Message> {
+ self.message_stream.next().await
+ }
+}
+
+#[automock]
+#[async_trait]
+pub trait MqttErrorStream: Send + Sync {
+ async fn next(&mut self) -> Option<Arc<mqtt_client::Error>>;