summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-03-30 11:11:40 +0530
committerGitHub <noreply@github.com>2022-03-30 11:11:40 +0530
commit358437d14166fb32448f99f158963bedcf9236f1 (patch)
tree9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core/tedge_mapper/src
parentf8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff)
parent3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff)
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs34
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs88
2 files changed, 94 insertions, 28 deletions
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index 93ea68cd..6063dca6 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w
const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
+const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
+const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[derive(Debug)]
pub struct DeviceMonitorConfig {
@@ -64,8 +67,13 @@ impl DeviceMonitor {
#[instrument(skip(self), name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
+ let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC);
+ let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC);
+
+ let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
+ input_topic.add_all(health_check_topic.clone());
+
let mqtt_config = mqtt_channel::Config::new(
self.device_monitor_config.host.to_string(),
self.device_monitor_config.port,
@@ -90,19 +98,25 @@ impl DeviceMonitor {
});
let mut collectd_messages = mqtt_client.received;
+ let mut output_messages = mqtt_client.published.clone();
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
- match CollectdMessage::parse_from(&message) {
- Ok(collectd_message) => {
- for msg in collectd_message {
- let batch_input = BatchDriverInput::Event(msg);
- if let Err(err) = msg_send.send(batch_input).await {
- error!("Error while processing a collectd message: {}", err);
+ if health_check_topic.accept(&message) {
+ let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP);
+ let _ = output_messages.send(health_message).await;
+ } else {
+ match CollectdMessage::parse_from(&message) {
+ Ok(collectd_message) => {
+ for msg in collectd_message {
+ let batch_input = BatchDriverInput::Event(msg);
+ if let Err(err) = msg_send.send(batch_input).await {
+ error!("Error while processing a collectd message: {}", err);
+ }
}
}
- }
- Err(err) => {
- error!("Error while decoding a collectd message: {}", err);
+ Err(err) => {
+ error!("Error while decoding a collectd message: {}", err);
+ }
}
}
}
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index f0691397..5208f04b 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -3,12 +3,13 @@ use std::time::Duration;
use crate::core::{converter::*, error::*};
use mqtt_channel::{
- Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver,
+ Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
use tracing::{error, info, instrument};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
pub async fn create_mapper(
app_name: &str,
@@ -19,17 +20,16 @@ pub async fn create_mapper(
info!("{} starting", app_name);
let mapper_config = converter.get_mapper_config();
- let mqtt_client = Connection::new(&mqtt_config(
- app_name,
- &mqtt_host,
- mqtt_port,
- mapper_config.in_topic_filter.clone(),
- )?)
- .await?;
+ let mut topic_filter = mapper_config.in_topic_filter.clone();
+ topic_filter.add(format!("tedge/health-check/{}", app_name).as_str())?;
+
+ let mqtt_client =
+ Connection::new(&mqtt_config(app_name, &mqtt_host, mqtt_port, topic_filter)?).await?;
Mapper::subscribe_errors(mqtt_client.errors);
Ok(Mapper::new(
+ app_name.to_string(),
mqtt_client.received,
mqtt_client.published,
converter,
@@ -40,13 +40,13 @@ pub fn mqtt_config(
name: &str,
host: &str,
port: u16,
- topics: TopicFilter,
+ topic_filter: TopicFilter,
) -> Result<mqtt_channel::Config, anyhow::Error> {
Ok(mqtt_channel::Config::default()
.with_host(host)
.with_port(port)
.with_session_name(name)
- .with_subscriptions(topics)
+ .with_subscriptions(topic_filter)
.with_max_packet_size(10 * 1024 * 1024))
}
@@ -54,18 +54,26 @@ pub struct Mapper {
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
+ health_check_topic: TopicFilter,
+ health_topic: Topic,
}
impl Mapper {
pub fn new(
+ name: String,
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
) -> Self {
+ let health_check_topic =
+ TopicFilter::new_unchecked(format!("tedge/health-check/{}", name).as_str());
+ let health_topic = Topic::new_unchecked(format!("tedge/health/{}", name).as_str());
Self {
input,
output,
converter,
+ health_check_topic,
+ health_topic,
}
}
@@ -114,9 +122,14 @@ impl Mapper {
}
async fn process_message(&mut self, message: Message) {
- let converted_messages = self.converter.convert(&message).await;
- for converted_message in converted_messages.into_iter() {
- let _ = self.output.send(converted_message).await;
+ if self.health_check_topic.accept(&message) {
+ let health_message = Message::new(&self.health_topic, HEALTH_STATUS_UP);
+ let _ = self.output.send(health_message).await;
+ } else {
+ let converted_messages = self.converter.convert(&message).await;
+ for converted_message in converted_messages.into_iter() {
+ let _ = self.output.send(converted_message).await;
+ }
}
}
}
@@ -143,11 +156,12 @@ mod tests {
.with_subscriptions(TopicFilter::new_unchecked("in_topic"));
let mqtt_client = Connection::new(&mqtt_config).await?;
- let mut mapper = Mapper {
- input: mqtt_client.received,
- output: mqtt_client.published,
- converter: Box::new(UppercaseConverter::new()),
- };
+ let mut mapper = Mapper::new(
+ name.to_string(),
+ mqtt_client.received,
+ mqtt_client.published,
+ Box::new(UppercaseConverter::new()),
+ );
// Let's run the mapper in the background
tokio::spawn(async move {
@@ -177,6 +191,44 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ #[serial_test::serial]
+ async fn health_check() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ // Given a mapper
+ let name = "mapper_under_test";
+
+ let mut mapper = create_mapper(
+ name,
+ "localhost".to_string(),
+ broker.port,
+ Box::new(UppercaseConverter::new()),
+ )
+ .await?;
+
+ // Let's run the mapper in the background
+ tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ sleep(Duration::from_secs(1)).await;
+
+ let health_check_topic = format!("tedge/health-check/{name}");
+ let health_topic = format!("tedge/health/{name}");
+ let actual = broker
+ .wait_for_response_on_publish(
+ &health_check_topic,
+ "",
+ &health_topic,
+ Duration::from_secs(1),
+ )
+ .await;
+ assert_eq!(actual.unwrap(), HEALTH_STATUS_UP);
+
+ Ok(())
+ }
+
struct UppercaseConverter {
mapper_config: MapperConfig,
}