summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/collectd/monitor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_mapper/src/collectd/monitor.rs')
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs34
1 files changed, 24 insertions, 10 deletions
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index 93ea68cd..6063dca6 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w
const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
+const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
+const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[derive(Debug)]
pub struct DeviceMonitorConfig {
@@ -64,8 +67,13 @@ impl DeviceMonitor {
#[instrument(skip(self), name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
+ let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC);
+ let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC);
+
+ let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
+ input_topic.add_all(health_check_topic.clone());
+
let mqtt_config = mqtt_channel::Config::new(
self.device_monitor_config.host.to_string(),
self.device_monitor_config.port,
@@ -90,19 +98,25 @@ impl DeviceMonitor {
});
let mut collectd_messages = mqtt_client.received;
+ let mut output_messages = mqtt_client.published.clone();
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
- match CollectdMessage::parse_from(&message) {
- Ok(collectd_message) => {
- for msg in collectd_message {
- let batch_input = BatchDriverInput::Event(msg);
- if let Err(err) = msg_send.send(batch_input).await {
- error!("Error while processing a collectd message: {}", err);
+ if health_check_topic.accept(&message) {
+ let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP);
+ let _ = output_messages.send(health_message).await;
+ } else {
+ match CollectdMessage::parse_from(&message) {
+ Ok(collectd_message) => {
+ for msg in collectd_message {
+ let batch_input = BatchDriverInput::Event(msg);
+ if let Err(err) = msg_send.send(batch_input).await {
+ error!("Error while processing a collectd message: {}", err);
+ }
}
}
- }
- Err(err) => {
- error!("Error while decoding a collectd message: {}", err);
+ Err(err) => {
+ error!("Error while decoding a collectd message: {}", err);
+ }
}
}
}