From 964e15417ff4bb70fdd6cf9a2b66a592a470839c Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 23 Mar 2022 23:05:44 +0530 Subject: Closes #769 MQTT health endpoint for tedge-mapper --- crates/core/tedge_mapper/src/core/mapper.rs | 88 +++++++++++++++++++++++------ 1 file changed, 70 insertions(+), 18 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index f0691397..5208f04b 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -3,12 +3,13 @@ use std::time::Duration; use crate::core::{converter::*, error::*}; use mqtt_channel::{ - Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver, + Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver, UnboundedSender, }; use tracing::{error, info, instrument}; const SYNC_WINDOW: Duration = Duration::from_secs(3); +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; pub async fn create_mapper( app_name: &str, @@ -19,17 +20,16 @@ pub async fn create_mapper( info!("{} starting", app_name); let mapper_config = converter.get_mapper_config(); - let mqtt_client = Connection::new(&mqtt_config( - app_name, - &mqtt_host, - mqtt_port, - mapper_config.in_topic_filter.clone(), - )?) - .await?; + let mut topic_filter = mapper_config.in_topic_filter.clone(); + topic_filter.add(format!("tedge/health-check/{}", app_name).as_str())?; + + let mqtt_client = + Connection::new(&mqtt_config(app_name, &mqtt_host, mqtt_port, topic_filter)?).await?; Mapper::subscribe_errors(mqtt_client.errors); Ok(Mapper::new( + app_name.to_string(), mqtt_client.received, mqtt_client.published, converter, @@ -40,13 +40,13 @@ pub fn mqtt_config( name: &str, host: &str, port: u16, - topics: TopicFilter, + topic_filter: TopicFilter, ) -> Result { Ok(mqtt_channel::Config::default() .with_host(host) .with_port(port) .with_session_name(name) - .with_subscriptions(topics) + .with_subscriptions(topic_filter) .with_max_packet_size(10 * 1024 * 1024)) } @@ -54,18 +54,26 @@ pub struct Mapper { input: UnboundedReceiver, output: UnboundedSender, converter: Box>, + health_check_topic: TopicFilter, + health_topic: Topic, } impl Mapper { pub fn new( + name: String, input: UnboundedReceiver, output: UnboundedSender, converter: Box>, ) -> Self { + let health_check_topic = + TopicFilter::new_unchecked(format!("tedge/health-check/{}", name).as_str()); + let health_topic = Topic::new_unchecked(format!("tedge/health/{}", name).as_str()); Self { input, output, converter, + health_check_topic, + health_topic, } } @@ -114,9 +122,14 @@ impl Mapper { } async fn process_message(&mut self, message: Message) { - let converted_messages = self.converter.convert(&message).await; - for converted_message in converted_messages.into_iter() { - let _ = self.output.send(converted_message).await; + if self.health_check_topic.accept(&message) { + let health_message = Message::new(&self.health_topic, HEALTH_STATUS_UP); + let _ = self.output.send(health_message).await; + } else { + let converted_messages = self.converter.convert(&message).await; + for converted_message in converted_messages.into_iter() { + let _ = self.output.send(converted_message).await; + } } } } @@ -143,11 +156,12 @@ mod tests { .with_subscriptions(TopicFilter::new_unchecked("in_topic")); let mqtt_client = Connection::new(&mqtt_config).await?; - let mut mapper = Mapper { - input: mqtt_client.received, - output: mqtt_client.published, - converter: Box::new(UppercaseConverter::new()), - }; + let mut mapper = Mapper::new( + name.to_string(), + mqtt_client.received, + mqtt_client.published, + Box::new(UppercaseConverter::new()), + ); // Let's run the mapper in the background tokio::spawn(async move { @@ -177,6 +191,44 @@ mod tests { Ok(()) } + #[tokio::test] + #[serial_test::serial] + async fn health_check() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + + // Given a mapper + let name = "mapper_under_test"; + + let mut mapper = create_mapper( + name, + "localhost".to_string(), + broker.port, + Box::new(UppercaseConverter::new()), + ) + .await?; + + // Let's run the mapper in the background + tokio::spawn(async move { + let _ = mapper.run().await; + }); + sleep(Duration::from_secs(1)).await; + + let health_check_topic = format!("tedge/health-check/{name}"); + let health_topic = format!("tedge/health/{name}"); + let actual = broker + .wait_for_response_on_publish( + &health_check_topic, + "", + &health_topic, + Duration::from_secs(1), + ) + .await; + assert_eq!(actual.unwrap(), HEALTH_STATUS_UP); + + Ok(()) + } + struct UppercaseConverter { mapper_config: MapperConfig, } -- cgit v1.2.3 From 8bb8ac8d525119b3f9c98b2835f21d64901a0135 Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Tue, 29 Mar 2022 16:52:10 +0530 Subject: MQTT health endpoint for collectd-mapper --- crates/core/tedge_mapper/src/collectd/monitor.rs | 34 +++++++++++++++++------- 1 file changed, 24 insertions(+), 10 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index c804deb5..578ababa 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; +const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd"; +const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd"; +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[derive(Debug)] pub struct DeviceMonitorConfig { @@ -64,8 +67,13 @@ impl DeviceMonitor { #[instrument(skip(self), name = "monitor")] pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? + let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC); + let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC); + + let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); + input_topic.add_all(health_check_topic.clone()); + let mqtt_config = mqtt_channel::Config::new( self.device_monitor_config.host.to_string(), self.device_monitor_config.port, @@ -90,19 +98,25 @@ impl DeviceMonitor { }); let mut collectd_messages = mqtt_client.received; + let mut output_messages = mqtt_client.published.clone(); let input_join_handle = tokio::task::spawn(async move { while let Some(message) = collectd_messages.next().await { - match CollectdMessage::parse_from(&message) { - Ok(collectd_message) => { - for msg in collectd_message { - let batch_input = BatchDriverInput::Event(msg); - if let Err(err) = msg_send.send(batch_input).await { - error!("Error while processing a collectd message: {}", err); + if health_check_topic.accept(&message) { + let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP); + let _ = output_messages.send(health_message).await; + } else { + match CollectdMessage::parse_from(&message) { + Ok(collectd_message) => { + for msg in collectd_message { + let batch_input = BatchDriverInput::Event(msg); + if let Err(err) = msg_send.send(batch_input).await { + error!("Error while processing a collectd message: {}", err); + } } } - } - Err(err) => { - error!("Error while decoding a collectd message: {}", err); + Err(err) => { + error!("Error while decoding a collectd message: {}", err); + } } } } -- cgit v1.2.3