diff options
author | Didier Wenzek <didier.wenzek@acidalie.com> | 2021-09-06 09:47:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-06 09:47:39 +0100 |
commit | 0e008473f4acef7bbdd5fe62af78384d9d1ff97a (patch) | |
tree | 70a72c319d89fe6406c70b8fafdb871e8ac59cf6 | |
parent | 32522ff526ca83d5eb213f334de709c4f55c39cd (diff) |
[CIT-459] Use the batcher to batch collectd message (#401)
* [CIT-459] impl Batchable for CollectdMessage
* [CIT-459] Use the batcher to batch collectd messages
* [CIT-459] Cargo fmt
* [CIT-459] Fix typos
* [CIT-459] Use the monitor configuration to set the batcher parameters
* [CIT-459] Propagate the end of stream along the baching pipeline
* [CIT-459] Remove unused `mut`
* [CIT-459] Set default values for batching window and max delay
* [CIT-459] Fix the default maximum message delay
Co-authored-by: Wenzek <diw@softwareag.com>
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | mapper/tedge_mapper/Cargo.toml | 1 | ||||
-rw-r--r-- | mapper/tedge_mapper/src/collectd_mapper/batcher.rs | 456 | ||||
-rw-r--r-- | mapper/tedge_mapper/src/collectd_mapper/collectd.rs | 81 | ||||
-rw-r--r-- | mapper/tedge_mapper/src/collectd_mapper/monitor.rs | 119 |
5 files changed, 183 insertions, 475 deletions
@@ -2599,6 +2599,7 @@ dependencies = [ "assert-json-diff", "assert_matches", "async-trait", + "batcher", "c8y_translator_lib", "chrono", "clock", diff --git a/mapper/tedge_mapper/Cargo.toml b/mapper/tedge_mapper/Cargo.toml index 28bb6fbb..a88388ab 100644 --- a/mapper/tedge_mapper/Cargo.toml +++ b/mapper/tedge_mapper/Cargo.toml @@ -27,6 +27,7 @@ stop-on-upgrade = false [dependencies] anyhow = "1.0" async-trait = "0.1" +batcher = {path = "../../common/batcher" } c8y_translator_lib = {path = "../cumulocity/c8y_translator_lib" } chrono = "0.4" clock = {path = "../../common/clock" } diff --git a/mapper/tedge_mapper/src/collectd_mapper/batcher.rs b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs index a49b92df..2f9429bc 100644 --- a/mapper/tedge_mapper/src/collectd_mapper/batcher.rs +++ b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs @@ -1,20 +1,14 @@ -use clock::{Clock, Timestamp}; -use mqtt_client::{Message, MqttClient, MqttMessageStream, Topic, TopicFilter}; -use std::sync::Arc; +use clock::Timestamp; +use mqtt_client::Payload; use thin_edge_json::{ group::{MeasurementGroup, MeasurementGrouper}, measurement::MeasurementVisitor, serialize::ThinEdgeJsonSerializer, }; -use tokio::{ - select, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, - time, - time::Duration, -}; -use tracing::{error, log::warn}; use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError}; +use chrono::Local; +use thin_edge_json::group::MeasurementGrouperError; #[derive(Debug)] pub struct MessageBatch { @@ -22,6 +16,31 @@ pub struct MessageBatch { } impl MessageBatch { + pub fn thin_edge_json_bytes( + messages: Vec<CollectdMessage>, + ) -> Result<Payload, DeviceMonitorError> { + let mut messages = messages.into_iter(); + + if let Some(first_message) = messages.next() { + let timestamp = first_message.timestamp.with_timezone(Local::now().offset()); + let mut batch = MessageBatch::start_batch(first_message, timestamp)?; + for message in messages { + batch.add_to_batch(message)?; + } + let measurements = batch.end_batch()?; + + let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); + measurements.accept(&mut tedge_json_serializer)?; + + let payload = tedge_json_serializer.bytes()?; + Ok(payload) + } else { + Err(DeviceMonitorError::InvalidThinEdgeJsonError( + MeasurementGrouperError::UnexpectedEnd, + )) + } + } + fn start_batch( collectd_message: CollectdMessage, timestamp: Timestamp, @@ -49,188 +68,29 @@ impl MessageBatch { } } -pub struct MessageBatcher { - sender: UnboundedSender<MeasurementGroup>, - mqtt_client: Arc<dyn MqttClient>, - source_topic_filter: TopicFilter, - batching_window: Duration, - clock: Arc<dyn Clock>, -} - -impl MessageBatcher { - pub fn new( - sender: UnboundedSender<MeasurementGroup>, - mqtt_client: Arc<dyn MqttClient>, - batching_window: Duration, - source_topic_filter: TopicFilter, - clock: Arc<dyn Clock>, - ) -> Self { - Self { - sender, - mqtt_client, - source_topic_filter, - batching_window, - clock, - } - } - - pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let mut messages = self - .mqtt_client - .subscribe(self.source_topic_filter.clone()) - .await?; - - loop { - match self.receive_message(messages.as_mut()).await { - Some((message, timestamp)) => { - // Build a message batch until the batching window times out and return the batch - let message_batch_result = self - .build_message_batch_with_timeout(message, timestamp, messages.as_mut()) - .await; - - match message_batch_result { - Ok(message_batch) => { - // Send the current batch to the batch processor - let _ = self.sender.send(message_batch).map_err(|err| { - error!("Error while publishing a message batch: {}", err) - }); - } - Err(err) => { - error!("Error while building a message batch: {}", err); - } - } - } - None => { - //If the message batching loop returns, it means the MQTT connection has closed - error!("MQTT connection closed. Retrying..."); - } - } - } - } - - async fn build_message_batch_with_timeout( - &self, - first_message: Message, - first_message_timestamp: Timestamp, - messages: &mut dyn MqttMessageStream, - ) -> Result<MeasurementGroup, DeviceMonitorError> { - let collectd_message = CollectdMessage::parse_from(&first_message)?; - let mut message_batch = - MessageBatch::start_batch(collectd_message, first_message_timestamp)?; - - // Creates a sleep timer future handler and does not await here - // for sleep to finish, but inside the select loop - let sleep = time::sleep(self.batching_window); - tokio::pin!(sleep); - - loop { - select! { - _ = &mut sleep => { - break; - } - maybe_message = self.receive_message(messages) => { - match maybe_message { - Some((message, _timestamp)) => { - let collectd_message = match CollectdMessage::parse_from(&message) { - Ok(message) => message, - Err(err) => { - error!("Error parsing collectd message: {}", err); - continue; // Even if one message is faulty, we skip that one and keep building the batch - }, - }; - message_batch.add_to_batch(collectd_message)?; - } - None => break - } - } - - } - } - - Ok(message_batch.end_batch()?) - } - - async fn receive_message( - &self, - messages: &mut dyn MqttMessageStream, - ) -> Option<(Message, Timestamp)> { - messages - .next() - .await - .map(|message| (message, self.clock.now())) - } -} - -pub struct MessageBatchPublisher { - receiver: UnboundedReceiver<MeasurementGroup>, - mqtt_client: Arc<dyn MqttClient>, - target_topic: Topic, -} - -impl MessageBatchPublisher { - pub fn new( - receiver: UnboundedReceiver<MeasurementGroup>, - mqtt_client: Arc<dyn MqttClient>, - target_topic: Topic, - ) -> Self { - Self { - receiver, - mqtt_client, - target_topic, - } - } - - pub async fn run(&mut self) { - while let Some(message_group) = self.receiver.recv().await { - if let Err(err) = self.publish_as_mqtt_message(message_group).await { - error!("Error publishing the measurement batch: {}", err); - } - } - - warn!("MQTT message channel closed. Can not proceed"); - } - - async fn publish_as_mqtt_message( - &mut self, - message_group: MeasurementGroup, - ) -> Result<(), DeviceMonitorError> { - let mut tedge_json_serializer = ThinEdgeJsonSerializer::new(); - message_group.accept(&mut tedge_json_serializer)?; - - let tedge_message = Message::new(&self.target_topic, tedge_json_serializer.bytes()?); - - self.mqtt_client.publish(tedge_message).await?; - - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; use assert_matches::assert_matches; - use clock::WallClock; - use futures::future::{pending, ready}; - use mockall::Sequence; - use mqtt_client::{MockMqttClient, MockMqttErrorStream, MockMqttMessageStream, QoS}; - use tokio::time::{self, Instant}; + use chrono::{TimeZone, Utc}; + use clock::{Clock, WallClock}; #[test] fn test_message_batch_processor() -> anyhow::Result<()> { - let collectd_message = CollectdMessage::new("temperature", "value", 32.5); + let timestamp = Utc.ymd(2015, 5, 15).and_hms_milli(0, 0, 1, 444); + let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp); let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?; - let collectd_message = CollectdMessage::new("coordinate", "x", 50.0); + let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp); message_batch.add_to_batch(collectd_message)?; - let collectd_message = CollectdMessage::new("coordinate", "y", 70.0); + let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp); message_batch.add_to_batch(collectd_message)?; - let collectd_message = CollectdMessage::new("pressure", "value", 98.2); + let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp); message_batch.add_to_batch(collectd_message)?; - let collectd_message = CollectdMessage::new("coordinate", "z", 90.0); + let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp); message_batch.add_to_batch(collectd_message)?; let message_group = message_batch.end_batch()?; @@ -260,246 +120,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn batch_publisher() -> anyhow::Result<()> { - let mut message_grouper = MeasurementGrouper::new(); - - message_grouper.visit_start_group("temperature")?; - message_grouper.visit_measurement("value", 32.5)?; - message_grouper.visit_end_group()?; - - let message_group = message_grouper.end()?; - - let (_sender, receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>(); - - let mut mqtt_client = MockMqttClient::new(); - mqtt_client.expect_publish().times(1).returning(|message| { - assert_eq!(message.topic.name, "tedge/measurements"); // The test assertion happens here - Ok(()) - }); - - let mut publisher = MessageBatchPublisher::new( - receiver, - Arc::new(mqtt_client), - Topic::new("tedge/measurements")?, - ); - publisher.publish_as_mqtt_message(message_group).await?; - - Ok(()) - } - - #[tokio::test] - async fn batching_with_window_timeout() -> anyhow::Result<()> { - let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>(); - - let mqtt_client = build_mock_mqtt_client(); - - let mut seq = Sequence::new(); // To control the order of mock returns - let mut message_stream = MockMqttMessageStream::default(); - message_stream - .expect_next() - .times(1) - .in_sequence(&mut seq) // The first value to be returned by this mock stream - .returning(|| { - time::pause(); - let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let message = Message::new(&topic, "123456789:32.5"); - Box::pin(ready(Some(message))) - }); - - message_stream - .expect_next() - .times(1) - .in_sequence(&mut seq) // The second value to be returned by this mock stream - .returning(|| { - Box::pin(async { - time::advance(Duration::from_millis(100)).await; // Advance time, but stay within the batching window so that this message is part of the batch - let topic = Topic::new("collectd/localhost/pressure/value").unwrap(); - let message = Message::new(&topic, "123456789:98.2"); - Some(message) - }) - }); - - message_stream - .expect_next() - .times(1) - .in_sequence(&mut seq) // The second value to be returned by this mock stream - .returning(|| { - Box::pin(async { - time::advance(Duration::from_millis(1000)).await; // Advance time beyond the batching window so that upcoming messages arrive after the window is closed - time::resume(); - let topic = Topic::new("collectd/localhost/dummy/value").unwrap(); - let message = Message::new(&topic, "123456789:98.2"); - Some(message) - }) - }); - - // This third message from this stream will not even be read as the batching window has closed with the previous message - message_stream - .expect_next() - .times(0) - .in_sequence(&mut seq) // The third value to be returned by this mock stream - .returning(|| { - println!("Third message time: {:?}", Instant::now()); - Box::pin(async { - let topic = Topic::new("collectd/localhost/speed/value").unwrap(); - let message = Message::new(&topic, "123456789:350"); - Some(message) - }) - }); - - // Block the stream from the 4th invocation onwards - message_stream - .expect_next() - .returning(|| Box::pin(pending())); // Block the stream with a pending future - - let clock = Arc::new(WallClock); - let builder = MessageBatcher::new( - sender, - Arc::new(mqtt_client), - Duration::from_millis(500), - TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce), - clock.clone(), - ); - - let first_message = message_stream.next().await.unwrap(); - - let message_group = builder - .build_message_batch_with_timeout(first_message, clock.now(), &mut message_stream) - .await?; - - assert_eq!( - message_group.get_measurement_value(Some("temperature"), "value"), - Some(32.5) - ); - assert_eq!( - message_group.get_measurement_value(Some("pressure"), "value"), - Some(98.2) - ); - assert_eq!( - message_group.get_measurement_value(Some("speed"), "value"), - None // This measurement isn't included in the batch because it came after the batching window - ); - - Ok(()) - } - - #[tokio::test] - async fn batching_with_invalid_messages_within_a_batch() -> anyhow::Result<()> { - let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>(); - - let mqtt_client = build_mock_mqtt_client(); - - let mut message_stream = build_message_stream_from_messages(vec![ - ("collectd/localhost/temperature/value", 32.5), - ("collectd/pressure/value", 98.0), // Erroneous collectd message with invalid topic - ("collectd/localhost/speed/value", 350.0), - ]); - - let first_message = message_stream.next().await.unwrap(); - let clock = WallClock; - let builder = MessageBatcher::new( - sender, - Arc::new(mqtt_client), - Duration::from_millis(1000), - TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce), - Arc::new(clock.clone()), - ); - let message_group = builder - .build_message_batch_with_timeout(first_message, clock.now(), &mut message_stream) - .await?; - - assert_eq!( - message_group.get_measurement_value(Some("temperature"), "value"), - Some(32.5) - ); - assert_eq!( - message_group.get_measurement_value(Some("pressure"), "value"), - None // This measurement isn't included in the batch because the value was erroneous - ); - assert_eq!( - message_group.get_measurement_value(Some("speed"), "value"), - Some(350.0) // This measurement is included in the batch even though the last message was erroneous - ); - - Ok(()) - } - - #[tokio::test] - async fn batching_with_erroneous_first_message() -> anyhow::Result<()> { - let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGroup>(); - - let mqtt_client = build_mock_mqtt_client(); - - let mut message_stream = build_message_stream_from_messages(vec![]); - - let topic = Topic::new("collectd/host/group/key")?; - let invalid_collectd_message = Message::new(&topic, "123456789"); // Invalid payload - - let clock = Arc::new(WallClock); - let builder = MessageBatcher::new( - sender, - Arc::new(mqtt_client), - Duration::from_millis(1000), - TopicFilter::new("collectd/#")?.qos(QoS::AtMostOnce), - clock.clone(), - ); - let result = builder - .build_message_batch_with_timeout( - invalid_collectd_message, - clock.now(), - &mut message_stream, - ) - .await; - - assert_matches!( - result, - Err(DeviceMonitorError::InvalidCollectdMeasurementError(_)) - ); - - Ok(()) - } - - fn build_mock_mqtt_client() -> MockMqttClient { - let mut mqtt_client = MockMqttClient::new(); - - mqtt_client.expect_subscribe_errors().returning(|| { - let error_stream = MockMqttErrorStream::default(); - Box::new(error_stream) - }); - - mqtt_client.expect_subscribe().returning(|_| { - let message_stream = MockMqttMessageStream::default(); - Ok(Box::new(message_stream)) - }); - - mqtt_client - } - - fn build_message_stream_from_messages( - message_map: Vec<(&'static str, f64)>, - ) -> MockMqttMessageStream { - let mut seq = Sequence::new(); // To control the order of mock returns - let mut message_stream = MockMqttMessageStream::default(); - - for message in message_map { - message_stream - .expect_next() - .times(1) - .in_sequence(&mut seq) // The third value to be returned by this mock stream - .returning(move || { - let topic = Topic::new(message.0).unwrap(); - let message = Message::new(&topic, format!("123456789:{}", message.1)); - Box::pin(ready(Some(message))) - }); - } - - // Block the stream from the 4th invocation onwards - message_stream - .expect_next() - .returning(|| Box::pin(pending())); // Block the stream with a pending future - - message_stream - } } diff --git a/mapper/tedge_mapper/src/collectd_mapper/collectd.rs b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs index a15474e5..9d387314 100644 --- a/mapper/tedge_mapper/src/collectd_mapper/collectd.rs +++ b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs @@ -1,10 +1,13 @@ +use batcher::Batchable; +use chrono::{DateTime, NaiveDateTime, Utc}; use mqtt_client::Message; use thin_edge_json::measurement::MeasurementVisitor; #[derive(Debug)] -pub struct CollectdMessage<'a> { - pub metric_group_key: &'a str, - pub metric_key: &'a str, +pub struct CollectdMessage { + pub metric_group_key: String, + pub metric_key: String, + pub timestamp: DateTime<Utc>, pub metric_value: f64, } @@ -23,24 +26,34 @@ pub enum CollectdError { NonUTF8MeasurementPayload(Vec<u8>), } -impl<'a> CollectdMessage<'a> { +impl CollectdMessage { pub fn accept<T>(&self, visitor: &mut T) -> Result<(), T::Error> where T: MeasurementVisitor, { - visitor.visit_grouped_measurement(self.metric_group_key, self.metric_key, self.metric_value) + visitor.visit_grouped_measurement( + &self.metric_group_key, + &self.metric_key, + self.metric_value, + ) } #[cfg(test)] - pub fn new(metric_group_key: &'a str, metric_key: &'a str, metric_value: f64) -> Self { + pub fn new( + metric_group_key: &str, + metric_key: &str, + metric_value: f64, + timestamp: DateTime<Utc>, + ) -> Self { Self { - metric_group_key, - metric_key, + metric_group_key: metric_group_key.to_string(), + metric_key: metric_key.to_string(), + timestamp, metric_value, } } - pub fn parse_from(mqtt_message: &'a Message) -> Result<Self, CollectdError> { + pub fn parse_from(mqtt_message: &Message) -> Result<Self, CollectdError> { let topic = mqtt_message.topic.name.as_str(); let collectd_topic = match CollectdTopic::from_str(topic) { Ok(collectd_topic) => collectd_topic, @@ -57,15 +70,16 @@ impl<'a> CollectdMessage<'a> { .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?; Ok(CollectdMessage { - metric_group_key: collectd_topic.metric_group_key, - metric_key: collectd_topic.metric_key, + metric_group_key: collectd_topic.metric_group_key.to_string(), + metric_key: collectd_topic.metric_key.to_string(), + timestamp: collectd_payload.timestamp(), metric_value: collectd_payload.metric_value, }) } } -#[derive(Debug)] -struct CollectdTopic<'a> { +#[derive(Debug, Eq, PartialEq, Hash)] +pub struct CollectdTopic<'a> { metric_group_key: &'a str, metric_key: &'a str, } @@ -93,7 +107,7 @@ impl<'a> CollectdTopic<'a> { #[derive(Debug)] struct CollectdPayload { - _timestamp: f64, + timestamp: f64, metric_value: f64, } @@ -113,12 +127,12 @@ impl CollectdPayload { fn parse_from(payload: &str) -> Result<Self, CollectdPayloadError> { let mut iter = payload.split(':'); - let _timestamp = iter.next().ok_or_else(|| { + let timestamp = iter.next().ok_or_else(|| { CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string()) })?; - let _timestamp = _timestamp.parse::<f64>().map_err(|_err| { - CollectdPayloadError::InvalidMeasurementTimestamp(_timestamp.to_string()) + let timestamp = timestamp.parse::<f64>().map_err(|_err| { + CollectdPayloadError::InvalidMeasurementTimestamp(timestamp.to_string()) })?; let metric_value = iter.next().ok_or_else(|| { @@ -131,7 +145,7 @@ impl CollectdPayload { match iter.next() { None => Ok(CollectdPayload { - _timestamp, + timestamp, metric_value, }), Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( @@ -139,11 +153,30 @@ impl CollectdPayload { )), } } + + pub fn timestamp(&self) -> DateTime<Utc> { + let timestamp = self.timestamp.trunc() as i64; + let nanoseconds = (self.timestamp.fract() * 1.0e9) as u32; + DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(timestamp, nanoseconds), Utc) + } +} + +impl Batchable for CollectdMessage { + type Key = String; + + fn key(&self) -> Self::Key { + format!("{}/{}", &self.metric_group_key, &self.metric_key) + } + + fn event_time(&self) -> DateTime<Utc> { + self.timestamp + } } #[cfg(test)] mod tests { use assert_matches::assert_matches; + use chrono::TimeZone; use mqtt_client::Topic; use super::*; @@ -158,29 +191,39 @@ mod tests { let CollectdMessage { metric_group_key, metric_key, + timestamp, metric_value, } = collectd_message; assert_eq!(metric_group_key, "temperature"); assert_eq!(metric_key, "value"); + assert_eq!( + timestamp, + Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0) + ); assert_eq!(metric_value, 32.5); } #[test] fn collectd_null_terminated_message_parsing() { let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); - let mqtt_message = Message::new(&topic, "123456789:32.5\u{0}"); + let mqtt_message = Message::new(&topic, "123456789.125:32.5\u{0}"); let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); let CollectdMessage { metric_group_key, metric_key, + timestamp, metric_value, } = collectd_message; assert_eq!(metric_group_key, "temperature"); assert_eq!(metric_key, "value"); + assert_eq!( + timestamp, + Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 125) + ); assert_eq!(metric_value, 32.5); } diff --git a/mapper/tedge_mapper/src/collectd_mapper/monitor.rs b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs index e7189b42..7ffaf3ac 100644 --- a/mapper/tedge_mapper/src/collectd_mapper/monitor.rs +++ b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs @@ -1,23 +1,22 @@ -use clock::WallClock; -use mqtt_client::{Client, MqttClient}; +use mqtt_client::{Client, Message, MqttClient}; use std::sync::Arc; use tracing::{instrument, log::error}; -use crate::collectd_mapper::{ - batcher::{MessageBatchPublisher, MessageBatcher}, - error::DeviceMonitorError, -}; +use crate::collectd_mapper::batcher::MessageBatch; +use crate::collectd_mapper::collectd::CollectdMessage; +use crate::collectd_mapper::error::DeviceMonitorError; +use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher}; +use mqtt_client::{QoS, Topic, TopicFilter}; const DEFAULT_HOST: &str = "localhost"; const DEFAULT_PORT: u16 = 1883; const DEFAULT_MQTT_CLIENT_ID: &str = "collectd-mapper"; -const DEFAULT_BATCHING_WINDOW: u64 = 200; +const DEFAULT_BATCHING_WINDOW: u32 = 500; +const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 200; +const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; -use mqtt_client::{QoS, Topic, TopicFilter}; -use std::time::Duration; - #[derive(Debug)] pub struct DeviceMonitorConfig { host: &'static str, @@ -25,7 +24,9 @@ pub struct DeviceMonitorConfig { mqtt_client_id: &'static str, mqtt_source_topic: &'static str, mqtt_target_topic: &'static str, - batching_window: u64, + batching_window: u32, + maximum_message_delay: u32, + message_leap_limit: u32, } impl Default for DeviceMonitorConfig { @@ -37,6 +38,8 @@ impl Default for DeviceMonitorConfig { mqtt_source_topic: DEFAULT_MQTT_SOURCE_TOPIC, mqtt_target_topic: DEFAULT_MQTT_TARGET_TOPIC, batching_window: DEFAULT_BATCHING_WINDOW, + maximum_message_delay: DEFAULT_MAXIMUM_MESSAGE_DELAY, + message_leap_limit: DEFAULT_MESSAGE_LEAP_LIMIT, } } } @@ -61,50 +64,92 @@ impl DeviceMonitor { #[instrument(name = "monitor")] pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let config = mqtt_client::Config::new( + let mqtt_config = mqtt_client::Config::new( self.device_monitor_config.host, self.device_monitor_config.port, ) .queue_capacity(1024); - let mqtt_client: Arc<dyn MqttClient> = - Arc::new(Client::connect(self.device_monitor_config.mqtt_client_id, &config).await?); - - let (sender, receiver) = - tokio::sync::mpsc::unbounded_channel::<thin_edge_json::group::MeasurementGroup>(); - - let message_batch_producer = MessageBatcher::new( - sender, - mqtt_client.clone(), - Duration::from_millis(self.device_monitor_config.batching_window), - TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?.qos(QoS::AtMostOnce), - Arc::new(WallClock), + let mqtt_client: Arc<dyn MqttClient> = Arc::new( + Client::connect(self.device_monitor_config.mqtt_client_id, &mqtt_config).await?, ); - let join_handle1 = tokio::task::spawn(async move { - match message_batch_producer.run().await { + + let batch_config = BatchConfigBuilder::new() + .event_jitter(self.device_monitor_config.batching_window) + .delivery_jitter(self.device_monitor_config.maximum_message_delay) + .message_leap_limit(self.device_monitor_config.message_leap_limit) + .build(); + let (msg_send, msg_recv) = tokio::sync::mpsc::channel(100); + let (batch_send, mut batch_recv) = tokio::sync::mpsc::channel(100); + let driver = BatchDriver::new(Batcher::new(batch_config), msg_recv, batch_send); + let driver_join_handle = tokio::task::spawn(async move { + match driver.run().await { Ok(_) => error!("Unexpected end of message batcher thread"), Err(err) => error!("Error in message batcher thread: {}", err), } }); - let mut message_batch_consumer = MessageBatchPublisher::new( - receiver, - mqtt_client.clone(), - Topic::new(self.device_monitor_config.mqtt_target_topic)?, - ); - let join_handle2 = tokio::task::spawn(async move { - message_batch_consumer.run().await; + let input_mqtt_client = mqtt_client.clone(); + let input_topic = + TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?.qos(QoS::AtMostOnce); + let mut collectd_messages = input_mqtt_client.subscribe(input_topic).await?; + let input_join_handle = tokio::task::spawn(async move { + while let Some(message) = collectd_messages.next().await { + match CollectdMessage::parse_from(&message) { + Ok(collectd_message) => { + let batch_input = BatchDriverInput::Event(collectd_message); + if let Err(err) = msg_send.send(batch_input).await { + error!("Error while processing a collectd message: {}", err); + } + } + Err(err) => { + error!("Error while decoding a collectd message: {}", err); + } + } + } + // The MQTT connection has been closed by the process itself. + log::info!("Stop batching"); + let eof = BatchDriverInput::Flush; + msg_send.send(eof).await + }); + + let output_mqtt_client = mqtt_client.clone(); + let output_topic = Topic::new(self.device_monitor_config.mqtt_target_topic)?; + |