diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-30 11:11:40 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-30 11:11:40 +0530 |
commit | 358437d14166fb32448f99f158963bedcf9236f1 (patch) | |
tree | 9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core/tedge_mapper/src/collectd/monitor.rs | |
parent | f8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff) | |
parent | 3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff) |
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core/tedge_mapper/src/collectd/monitor.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index 93ea68cd..6063dca6 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; +const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd"; +const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd"; +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[derive(Debug)] pub struct DeviceMonitorConfig { @@ -64,8 +67,13 @@ impl DeviceMonitor { #[instrument(skip(self), name = "monitor")] pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? + let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC); + let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC); + + let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); + input_topic.add_all(health_check_topic.clone()); + let mqtt_config = mqtt_channel::Config::new( self.device_monitor_config.host.to_string(), self.device_monitor_config.port, @@ -90,19 +98,25 @@ impl DeviceMonitor { }); let mut collectd_messages = mqtt_client.received; + let mut output_messages = mqtt_client.published.clone(); let input_join_handle = tokio::task::spawn(async move { while let Some(message) = collectd_messages.next().await { - match CollectdMessage::parse_from(&message) { - Ok(collectd_message) => { - for msg in collectd_message { - let batch_input = BatchDriverInput::Event(msg); - if let Err(err) = msg_send.send(batch_input).await { - error!("Error while processing a collectd message: {}", err); + if health_check_topic.accept(&message) { + let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP); + let _ = output_messages.send(health_message).await; + } else { + match CollectdMessage::parse_from(&message) { + Ok(collectd_message) => { + for msg in collectd_message { + let batch_input = BatchDriverInput::Event(msg); + if let Err(err) = msg_send.send(batch_input).await { + error!("Error while processing a collectd message: {}", err); + } } } - } - Err(err) => { - error!("Error while decoding a collectd message: {}", err); + Err(err) => { + error!("Error while decoding a collectd message: {}", err); + } } } } |