summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDidier Wenzek <didier.wenzek@acidalie.com>2021-09-06 09:47:39 +0100
committerGitHub <noreply@github.com>2021-09-06 09:47:39 +0100
commit0e008473f4acef7bbdd5fe62af78384d9d1ff97a (patch)
tree70a72c319d89fe6406c70b8fafdb871e8ac59cf6
parent32522ff526ca83d5eb213f334de709c4f55c39cd (diff)
[CIT-459] Use the batcher to batch collectd message (#401)
* [CIT-459] impl Batchable for CollectdMessage * [CIT-459] Use the batcher to batch collectd messages * [CIT-459] Cargo fmt * [CIT-459] Fix typos * [CIT-459] Use the monitor configuration to set the batcher parameters * [CIT-459] Propagate the end of stream along the baching pipeline * [CIT-459] Remove unused `mut` * [CIT-459] Set default values for batching window and max delay * [CIT-459] Fix the default maximum message delay Co-authored-by: Wenzek <diw@softwareag.com>
-rw-r--r--Cargo.lock1
-rw-r--r--mapper/tedge_mapper/Cargo.toml1
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/batcher.rs456
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/collectd.rs81
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/monitor.rs119
5 files changed, 183 insertions, 475 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 808cf626..58cac3ba 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2599,6 +2599,7 @@ dependencies = [
"assert-json-diff",
"assert_matches",
"async-trait",
+ "batcher",
"c8y_translator_lib",
"chrono",
"clock",
diff --git a/mapper/tedge_mapper/Cargo.toml b/mapper/tedge_mapper/Cargo.toml
index 28bb6fbb..a88388ab 100644
--- a/mapper/tedge_mapper/Cargo.toml
+++ b/mapper/tedge_mapper/Cargo.toml
@@ -27,6 +27,7 @@ stop-on-upgrade = false
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
+batcher = {path = "../../common/batcher" }
c8y_translator_lib = {path = "../cumulocity/c8y_translator_lib" }
chrono = "0.4"
clock = {path = "../../common/clock" }
diff --git a/mapper/tedge_mapper/src/collectd_mapper/batcher.rs b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs
index a49b92df..2f9429bc 100644
--- a/mapper/tedge_mapper/src/collectd_mapper/batcher.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs
@@ -1,20 +1,14 @@
-use clock::{Clock, Timestamp};
-use mqtt_client::{Message, MqttClient, MqttMessageStream, Topic, TopicFilter};
-use std::sync::Arc;
+use clock::Timestamp;
+use mqtt_client::Payload;
use thin_edge_json::{
group::{MeasurementGroup, MeasurementGrouper},
measurement::MeasurementVisitor,
serialize::ThinEdgeJsonSerializer,
};
-use tokio::{
- select,
- sync::mpsc::{UnboundedReceiver, UnboundedSender},
- time,
- time::Duration,
-};
-use tracing::{error, log::warn};
use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError};
+use chrono::Local;
+use thin_edge_json::group::MeasurementGrouperError;
#[derive(Debug)]
pub struct MessageBatch {
@@ -22,6 +16,31 @@ pub struct MessageBatch {
}
impl MessageBatch {
+ pub fn thin_edge_json_bytes(
+ messages: Vec<CollectdMessage>,
+ ) -> Result<Payload, DeviceMonitorError> {
+ let mut messages = messages.into_iter();
+
+ if let Some(first_message) = messages.next() {
+ let timestamp = first_message.timestamp.with_timezone(Local::now().offset());
+ let mut batch = MessageBatch::start_batch(first_message, timestamp)?;
+ for message in messages {
+ batch.add_to_batch(message)?;
+ }
+ let measurements = batch.end_batch()?;
+
+ let mut tedge_json_serializer = ThinEdgeJsonSerializer::new();
+ measurements.accept(&mut tedge_json_serializer)?;
+
+ let payload = tedge_json_serializer.bytes()?;
+ Ok(payload)
+ } else {
+ Err(DeviceMonitorError::InvalidThinEdgeJsonError(
+ MeasurementGrouperError::UnexpectedEnd,
+ ))
+ }
+ }
+
fn start_batch(
collectd_message: CollectdMessage,
timestamp: Timestamp,
@@ -49,188 +68,29 @@ impl MessageBatch {
}
}
-pub struct MessageBatcher {
- sender: UnboundedSender<MeasurementGroup>,
- mqtt_client: Arc<dyn MqttClient>,
- source_topic_filter: TopicFilter,
- batching_window: Duration,
- clock: Arc<dyn Clock>,
-}
-
-impl MessageBatcher {
- pub fn new(
- sender: UnboundedSender<MeasurementGroup>,
- mqtt_client: Arc<dyn MqttClient>,
- batching_window: Duration,
- source_topic_filter: TopicFilter,
- clock: Arc<dyn Clock>,
- ) -> Self {
- Self {
- sender,
- mqtt_client,
- source_topic_filter,
- batching_window,
- clock,
- }
- }
-
- pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let mut messages = self
- .mqtt_client
- .subscribe(self.source_topic_filter.clone())
- .await?;
-
- loop {
- match self.receive_message(messages.as_mut()).await {
- Some((message, timestamp)) => {
- // Build a message batch until the batching window times out and return the batch
- let message_batch_result = self
- .build_message_batch_with_timeout(message, timestamp, messages.as_mut())
- .await;
-
- match message_batch_result {
- Ok(message_batch) => {
- // Send the current batch to the batch processor
- let _ = self.sender.send(message_batch).map_err(|err| {
- error!("Error while publishing a message batch: {}", err)
- });
- }
- Err(err) => {
- error!("Error while building a message batch: {}", err);
- }
- }
- }
- None => {
- //If the message batching loop returns, it means the MQTT connection has closed
- error!("MQTT connection closed. Retrying...");
- }
- }
- }
- }
-
- async fn build_message_batch_with_timeout(
- &self,
- first_message: Message,
- first_message_timestamp: Timestamp,
- messages: &mut dyn MqttMessageStream,
- ) -> Result<MeasurementGroup, DeviceMonitorError> {
- let collectd_message = CollectdMessage::parse_from(&first_message)?;
- let mut message_batch =
- MessageBatch::start_batch(collectd_message, first_message_timestamp)?;
-
- // Creates a sleep timer future handler and does not await here
- // for sleep to finish, but inside the select loop
- let sleep = time::sleep(self.batching_window);
- tokio::pin!(sleep);
-
- loop {
- select! {
- _ = &mut sleep => {
- break;
- }
- maybe_message = self.receive_message(messages) => {
- match maybe_message {
- Some((message, _timestamp)) => {
- let collectd_message = match CollectdMessage::parse_from(&message) {
- Ok(message) => message,
- Err(err) => {
- error!("Error parsing collectd message: {}", err);
- continue; // Even if one message is faulty, we skip that one and keep building the batch
- },
- };
- message_batch.add_to_batch(collectd_message)?;
- }
- None => break
- }
- }
-
- }
- }
-
- Ok(message_batch.end_batch()?)
- }
-
- async fn receive_message(
- &self,
- messages: &mut dyn MqttMessageStream,
- ) -> Option<(Message, Timestamp)> {
- messages
- .next()
- .await
- .map(|message| (message, self.clock.now()))
- }
-}
-
-pub struct MessageBatchPublisher {
- receiver: UnboundedReceiver<MeasurementGroup>,
- mqtt_client: Arc<dyn MqttClient>,
- target_topic: Topic,
-}
-
-impl MessageBatchPublisher {
- pub fn new(
- receiver: UnboundedReceiver<MeasurementGroup>,
- mqtt_client: Arc<dyn MqttClient>,
- target_topic: Topic,
- ) -> Self {
- Self {
- receiver,
- mqtt_client,
- target_topic,
- }
- }
-
- pub async fn run(&mut self) {
- while let Some(message_group) = self.receiver.recv().await {
- if let Err(err) = self.publish_as_mqtt_message(message_group).await {
- error!("Error publishing the measurement batch: {}", err);
- }
- }
-
- warn!("MQTT message channel closed. Can not proceed");
- }
-
- async fn publish_as_mqtt_message(
- &mut self,
- message_group: MeasurementGroup,
- ) -> Result<(), DeviceMonitorError> {
- let mut tedge_json_serializer = ThinEdgeJsonSerializer::new();
- message_group.accept(&mut tedge_json_serializer)?;
-
- let tedge_message = Message::new(&self.target_topic, tedge_json_serializer.bytes()?);
-
- self.mqtt_client.publish(tedge_message).await?;
-
- Ok(())
- }
-}
-
#[cfg(test)]
mod tests {
-
use super::*;
use assert_matches::assert_matches;
- use clock::WallClock;
- use futures::future::{pending, ready};
- use mockall::Sequence;
- use mqtt_client::{MockMqttClient, MockMqttErrorStream, MockMqttMessageStream, QoS};
- use tokio::time::{self, Instant};
+ use chrono::{TimeZone, Utc};
+ use clock::{Clock, WallClock};
#[test]
fn test_message_batch_processor() -> anyhow::Result<()> {
- let collectd_message = CollectdMessage::new("temperature", "value", 32.5);
+ let timestamp = Utc.ymd(2015, 5, 15).and_hms_milli(0, 0, 1, 444);
+ let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp);
let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?;
- let collectd_message = CollectdMessage::new("coordinate", "x", 50.0);
+ let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
- let collectd_message = CollectdMessage::new("coordinate", "y", 70.0);
+ let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
- let collectd_message = CollectdMessage::new("pressure", "value", 98.2);
+ let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp);
message_batch.add_to_batch(collectd_message)?;
- let collectd_message = CollectdMessage::new("coordinate", "z", 90.0);
+ let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
let message_group = message_batch.end_batch()?;
@@ -260,246 +120,4 @@ mod tests {
Ok(())
}
-
- #[tokio::test]
- async fn batch_publisher() -> anyhow::Result<()> {
- let mut message_grouper = MeasurementGrouper::new();
-
- message_grouper.visit_start_group("temperature")?;
- message_grouper.visit_measurement("value", 32.5)?;
- message_grouper.visit_end_group()?;
-
- let message_group = message_grouper.end()?;
-
- let (_sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>();
-
- let mut mqtt_client = MockMqttClient::new();
- mqtt_client.expect_publish().times(1).returning(|message| {
- assert_eq!(message.topic.name, "tedge/measurements"); // The test assertion happens here
- Ok(())
- });
-
- let mut publisher = MessageBatchPublisher::new(
- receiver,
- Arc::new(mqtt_client),
- Topic::new("tedge/measurements")?,
- );
- publisher.publish_as_mqtt_message(message_group).await?;
-
- Ok(())
- }
-
- #[tokio::test]
- async fn batching_with_window_timeout() -> anyhow::Result<()> {
- let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>();
-
- let mqtt_client = build_mock_mqtt_client();
-
- let mut seq = Sequence::new(); // To control the order of mock returns
- let mut message_stream = MockMqttMessageStream::default();
- message_stream
- .expect_next()
- .times(1)
- .in_sequence(&mut seq) // The first value to be returned by this mock stream
- .returning(|| {
- time::pause();
- let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
- let message = Message::new(&topic, "123456789:32.5");
- Box::pin(ready(Some(message)))
- });
-
- message_stream
- .expect_next()
- .times(1)
- .in_sequence(&mut seq) // The second value to be returned by this mock stream
- .returning(|| {
- Box::pin(async {
- time::advance(Duration::from_millis(100)).await; // Advance time, but stay within the batching window so that this message is part of the batch
- let topic = Topic::new("collectd/localhost/pressure/value").unwrap();
- let message = Message::new(&topic, "123456789:98.2");
- Some(message)
- })
- });
-
- message_stream
- .expect_next()
- .times(1)
- .in_sequence(&mut seq) // The second value to be returned by this mock stream
- .returning(|| {
- Box::pin(async {
- time::advance(Duration::from_millis(1000)).await; // Advance time beyond the batching window so that upcoming messages arrive after the window is closed
- time::resume();
- let topic = Topic::new("collectd/localhost/dummy/value").unwrap();
- let message = Message::new(&topic, "123456789:98.2");
- Some(message)
- })
- });
-
- // This third message from this stream will not even be read as the batching window has closed with the previous message
- message_stream
- .expect_next()
- .times(0)
- .in_sequence(&mut seq) // The third value to be returned by this mock stream
- .returning(|| {
- println!("Third message time: {:?}", Instant::now());
- Box::pin(async {
- let topic = Topic::new("collectd/localhost/speed/value").unwrap();
- let message = Message::new(&topic, "123456789:350");
- Some(message)
- })
- });
-
- // Block the stream from the 4th invocation onwards
- message_stream
- .expect_next()
- .returning(|| Box::pin(pending())); // Block the stream with a pending future
-
- let clock = Arc::new(WallClock);
- let builder = MessageBatcher::new(
- sender,
- Arc::new(mqtt_client),
- Duration::from_millis(500),
- TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce),
- clock.clone(),
- );
-
- let first_message = message_stream.next().await.unwrap();
-
- let message_group = builder
- .build_message_batch_with_timeout(first_message, clock.now(), &mut message_stream)
- .await?;
-
- assert_eq!(
- message_group.get_measurement_value(Some("temperature"), "value"),
- Some(32.5)
- );
- assert_eq!(
- message_group.get_measurement_value(Some("pressure"), "value"),
- Some(98.2)
- );
- assert_eq!(
- message_group.get_measurement_value(Some("speed"), "value"),
- None // This measurement isn't included in the batch because it came after the batching window
- );
-
- Ok(())
- }
-
- #[tokio::test]
- async fn batching_with_invalid_messages_within_a_batch() -> anyhow::Result<()> {
- let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>();
-
- let mqtt_client = build_mock_mqtt_client();
-
- let mut message_stream = build_message_stream_from_messages(vec![
- ("collectd/localhost/temperature/value", 32.5),
- ("collectd/pressure/value", 98.0), // Erroneous collectd message with invalid topic
- ("collectd/localhost/speed/value", 350.0),
- ]);
-
- let first_message = message_stream.next().await.unwrap();
- let clock = WallClock;
- let builder = MessageBatcher::new(
- sender,
- Arc::new(mqtt_client),
- Duration::from_millis(1000),
- TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce),
- Arc::new(clock.clone()),
- );
- let message_group = builder
- .build_message_batch_with_timeout(first_message, clock.now(), &mut message_stream)
- .await?;
-
- assert_eq!(
- message_group.get_measurement_value(Some("temperature"), "value"),
- Some(32.5)
- );
- assert_eq!(
- message_group.get_measurement_value(Some("pressure"), "value"),
- None // This measurement isn't included in the batch because the value was erroneous
- );
- assert_eq!(
- message_group.get_measurement_value(Some("speed"), "value"),
- Some(350.0) // This measurement is included in the batch even though the last message was erroneous
- );
-
- Ok(())
- }
-
- #[tokio::test]
- async fn batching_with_erroneous_first_message() -> anyhow::Result<()> {
- let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>();
-
- let mqtt_client = build_mock_mqtt_client();
-
- let mut message_stream = build_message_stream_from_messages(vec![]);
-
- let topic = Topic::new("collectd/host/group/key")?;
- let invalid_collectd_message = Message::new(&topic, "123456789"); // Invalid payload
-
- let clock = Arc::new(WallClock);
- let builder = MessageBatcher::new(
- sender,
- Arc::new(mqtt_client),
- Duration::from_millis(1000),
- TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce),
- clock.clone(),
- );
- let result = builder
- .build_message_batch_with_timeout(
- invalid_collectd_message,
- clock.now(),
- &mut message_stream,
- )
- .await;
-
- assert_matches!(
- result,
- Err(DeviceMonitorError::InvalidCollectdMeasurementError(_))
- );
-
- Ok(())
- }
-
- fn build_mock_mqtt_client() -> MockMqttClient {
- let mut mqtt_client = MockMqttClient::new();
-
- mqtt_client.expect_subscribe_errors().returning(|| {
- let error_stream = MockMqttErrorStream::default();
- Box::new(error_stream)
- });
-
- mqtt_client.expect_subscribe().returning(|_| {
- let message_stream = MockMqttMessageStream::default();
- Ok(Box::new(message_stream))
- });
-
- mqtt_client
- }
-
- fn build_message_stream_from_messages(
- message_map: Vec<(&'static str, f64)>,
- ) -> MockMqttMessageStream {
- let mut seq = Sequence::new(); // To control the order of mock returns
- let mut message_stream = MockMqttMessageStream::default();
-
- for message in message_map {
- message_stream
- .expect_next()
- .times(1)
- .in_sequence(&mut seq) // The third value to be returned by this mock stream
- .returning(move || {
- let topic = Topic::new(message.0).unwrap();
- let message = Message::new(&topic, format!("123456789:{}", message.1));
- Box::pin(ready(Some(message)))
- });
- }
-
- // Block the stream from the 4th invocation onwards
- message_stream
- .expect_next()
- .returning(|| Box::pin(pending())); // Block the stream with a pending future
-
- message_stream
- }
}
diff --git a/mapper/tedge_mapper/src/collectd_mapper/collectd.rs b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs
index a15474e5..9d387314 100644
--- a/mapper/tedge_mapper/src/collectd_mapper/collectd.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs
@@ -1,10 +1,13 @@
+use batcher::Batchable;
+use chrono::{DateTime, NaiveDateTime, Utc};
use mqtt_client::Message;
use thin_edge_json::measurement::MeasurementVisitor;
#[derive(Debug)]
-pub struct CollectdMessage<'a> {
- pub metric_group_key: &'a str,
- pub metric_key: &'a str,
+pub struct CollectdMessage {
+ pub metric_group_key: String,
+ pub metric_key: String,
+ pub timestamp: DateTime<Utc>,
pub metric_value: f64,
}
@@ -23,24 +26,34 @@ pub enum CollectdError {
NonUTF8MeasurementPayload(Vec<u8>),
}
-impl<'a> CollectdMessage<'a> {
+impl CollectdMessage {
pub fn accept<T>(&self, visitor: &mut T) -> Result<(), T::Error>
where
T: MeasurementVisitor,
{
- visitor.visit_grouped_measurement(self.metric_group_key, self.metric_key, self.metric_value)
+ visitor.visit_grouped_measurement(
+ &self.metric_group_key,
+ &self.metric_key,
+ self.metric_value,
+ )
}
#[cfg(test)]
- pub fn new(metric_group_key: &'a str, metric_key: &'a str, metric_value: f64) -> Self {
+ pub fn new(
+ metric_group_key: &str,
+ metric_key: &str,
+ metric_value: f64,
+ timestamp: DateTime<Utc>,
+ ) -> Self {
Self {
- metric_group_key,
- metric_key,
+ metric_group_key: metric_group_key.to_string(),
+ metric_key: metric_key.to_string(),
+ timestamp,
metric_value,
}
}
- pub fn parse_from(mqtt_message: &'a Message) -> Result<Self, CollectdError> {
+ pub fn parse_from(mqtt_message: &Message) -> Result<Self, CollectdError> {
let topic = mqtt_message.topic.name.as_str();
let collectd_topic = match CollectdTopic::from_str(topic) {
Ok(collectd_topic) => collectd_topic,
@@ -57,15 +70,16 @@ impl<'a> CollectdMessage<'a> {
.map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?;
Ok(CollectdMessage {
- metric_group_key: collectd_topic.metric_group_key,
- metric_key: collectd_topic.metric_key,
+ metric_group_key: collectd_topic.metric_group_key.to_string(),
+ metric_key: collectd_topic.metric_key.to_string(),
+ timestamp: collectd_payload.timestamp(),
metric_value: collectd_payload.metric_value,
})
}
}
-#[derive(Debug)]
-struct CollectdTopic<'a> {
+#[derive(Debug, Eq, PartialEq, Hash)]
+pub struct CollectdTopic<'a> {
metric_group_key: &'a str,
metric_key: &'a str,
}
@@ -93,7 +107,7 @@ impl<'a> CollectdTopic<'a> {
#[derive(Debug)]
struct CollectdPayload {
- _timestamp: f64,
+ timestamp: f64,
metric_value: f64,
}
@@ -113,12 +127,12 @@ impl CollectdPayload {
fn parse_from(payload: &str) -> Result<Self, CollectdPayloadError> {
let mut iter = payload.split(':');
- let _timestamp = iter.next().ok_or_else(|| {
+ let timestamp = iter.next().ok_or_else(|| {
CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string())
})?;
- let _timestamp = _timestamp.parse::<f64>().map_err(|_err| {
- CollectdPayloadError::InvalidMeasurementTimestamp(_timestamp.to_string())
+ let timestamp = timestamp.parse::<f64>().map_err(|_err| {
+ CollectdPayloadError::InvalidMeasurementTimestamp(timestamp.to_string())
})?;
let metric_value = iter.next().ok_or_else(|| {
@@ -131,7 +145,7 @@ impl CollectdPayload {
match iter.next() {
None => Ok(CollectdPayload {
- _timestamp,
+ timestamp,
metric_value,
}),
Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(
@@ -139,11 +153,30 @@ impl CollectdPayload {
)),
}
}
+
+ pub fn timestamp(&self) -> DateTime<Utc> {
+ let timestamp = self.timestamp.trunc() as i64;
+ let nanoseconds = (self.timestamp.fract() * 1.0e9) as u32;
+ DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(timestamp, nanoseconds), Utc)
+ }
+}
+
+impl Batchable for CollectdMessage {
+ type Key = String;
+
+ fn key(&self) -> Self::Key {
+ format!("{}/{}", &self.metric_group_key, &self.metric_key)
+ }
+
+ fn event_time(&self) -> DateTime<Utc> {
+ self.timestamp
+ }
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
+ use chrono::TimeZone;
use mqtt_client::Topic;
use super::*;
@@ -158,29 +191,39 @@ mod tests {
let CollectdMessage {
metric_group_key,
metric_key,
+ timestamp,
metric_value,
} = collectd_message;
assert_eq!(metric_group_key, "temperature");
assert_eq!(metric_key, "value");
+ assert_eq!(
+ timestamp,
+ Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0)
+ );
assert_eq!(metric_value, 32.5);
}
#[test]
fn collectd_null_terminated_message_parsing() {
let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
- let mqtt_message = Message::new(&topic, "123456789:32.5\u{0}");
+ let mqtt_message = Message::new(&topic, "123456789.125:32.5\u{0}");
let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap();
let CollectdMessage {
metric_group_key,
metric_key,
+ timestamp,
metric_value,
} = collectd_message;
assert_eq!(metric_group_key, "temperature");
assert_eq!(metric_key, "value");
+ assert_eq!(
+ timestamp,
+ Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 125)
+ );
assert_eq!(metric_value, 32.5);
}
diff --git a/mapper/tedge_mapper/src/collectd_mapper/monitor.rs b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs
index e7189b42..7ffaf3ac 100644
--- a/mapper/tedge_mapper/src/collectd_mapper/monitor.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs
@@ -1,23 +1,22 @@
-use clock::WallClock;
-use mqtt_client::{Client, MqttClient};
+use mqtt_client::{Client, Message, MqttClient};
use std::sync::Arc;
use tracing::{instrument, log::error};
-use crate::collectd_mapper::{
- batcher::{MessageBatchPublisher, MessageBatcher},
- error::DeviceMonitorError,
-};
+use crate::collectd_mapper::batcher::MessageBatch;
+use crate::collectd_mapper::collectd::CollectdMessage;
+use crate::collectd_mapper::error::DeviceMonitorError;
+use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher};
+use mqtt_client::{QoS, Topic, TopicFilter};
const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: u16 = 1883;
const DEFAULT_MQTT_CLIENT_ID: &str = "collectd-mapper";
-const DEFAULT_BATCHING_WINDOW: u64 = 200;
+const DEFAULT_BATCHING_WINDOW: u32 = 500;
+const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 200;
+const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
-use mqtt_client::{QoS, Topic, TopicFilter};
-use std::time::Duration;
-
#[derive(Debug)]
pub struct DeviceMonitorConfig {
host: &'static str,
@@ -25,7 +24,9 @@ pub struct DeviceMonitorConfig {
mqtt_client_id: &'static str,
mqtt_source_topic: &'static str,
mqtt_target_topic: &'static str,
- batching_window: u64,
+ batching_window: u32,
+ maximum_message_delay: u32,
+ message_leap_limit: u32,
}
impl Default for DeviceMonitorConfig {
@@ -37,6 +38,8 @@ impl Default for DeviceMonitorConfig {
mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC,
mqtt_target_topic: DEFAULT_MQTT_TARGET_TOPIC,
batching_window: DEFAULT_BATCHING_WINDOW,
+ maximum_message_delay: DEFAULT_MAXIMUM_MESSAGE_DELAY,
+ message_leap_limit: DEFAULT_MESSAGE_LEAP_LIMIT,
}
}
}
@@ -61,50 +64,92 @@ impl DeviceMonitor {
#[instrument(name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let config = mqtt_client::Config::new(
+ let mqtt_config = mqtt_client::Config::new(
self.device_monitor_config.host,
self.device_monitor_config.port,
)
.queue_capacity(1024);
- let mqtt_client: Arc<dyn MqttClient> =
- Arc::new(Client::connect(self.device_monitor_config.mqtt_client_id, &config).await?);
-
- let (sender, receiver) =
- tokio::sync::mpsc::unbounded_channel::<thin_edge_json::group::MeasurementGroup>();
-
- let message_batch_producer = MessageBatcher::new(
- sender,
- mqtt_client.clone(),
- Duration::from_millis(self.device_monitor_config.batching_window),
- TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?.qos(QoS::AtMostOnce),
- Arc::new(WallClock),
+ let mqtt_client: Arc<dyn MqttClient> = Arc::new(
+ Client::connect(self.device_monitor_config.mqtt_client_id, &mqtt_config).await?,
);
- let join_handle1 = tokio::task::spawn(async move {
- match message_batch_producer.run().await {
+
+ let batch_config = BatchConfigBuilder::new()
+ .event_jitter(self.device_monitor_config.batching_window)
+ .delivery_jitter(self.device_monitor_config.maximum_message_delay)
+ .message_leap_limit(self.device_monitor_config.message_leap_limit)
+ .build();
+ let (msg_send, msg_recv) = tokio::sync::mpsc::channel(100);
+ let (batch_send, mut batch_recv) = tokio::sync::mpsc::channel(100);
+ let driver = BatchDriver::new(Batcher::new(batch_config), msg_recv, batch_send);
+ let driver_join_handle = tokio::task::spawn(async move {
+ match driver.run().await {
Ok(_) => error!("Unexpected end of message batcher thread"),
Err(err) => error!("Error in message batcher thread: {}", err),
}
});
- let mut message_batch_consumer = MessageBatchPublisher::new(
- receiver,
- mqtt_client.clone(),
- Topic::new(self.device_monitor_config.mqtt_target_topic)?,
- );
- let join_handle2 = tokio::task::spawn(async move {
- message_batch_consumer.run().await;
+ let input_mqtt_client = mqtt_client.clone();
+ let input_topic =
+ TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?.qos(QoS::AtMostOnce);
+ let mut collectd_messages = input_mqtt_client.subscribe(input_topic).await?;
+ let input_join_handle = tokio::task::spawn(async move {
+ while let Some(message) = collectd_messages.next().await {
+ match CollectdMessage::parse_from(&message) {
+ Ok(collectd_message) => {
+ let batch_input = BatchDriverInput::Event(collectd_message);
+ if let Err(err) = msg_send.send(batch_input).await {
+ error!("Error while processing a collectd message: {}", err);
+ }
+ }
+ Err(err) => {
+ error!("Error while decoding a collectd message: {}", err);
+ }
+ }
+ }
+ // The MQTT connection has been closed by the process itself.
+ log::info!("Stop batching");
+ let eof = BatchDriverInput::Flush;
+ msg_send.send(eof).await
+ });
+
+ let output_mqtt_client = mqtt_client.clone();
+ let output_topic = Topic::new(self.device_monitor_config.mqtt_target_topic)?;
+