summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--crates/core/agent_interface/src/lib.rs2
-rw-r--r--crates/core/agent_interface/src/messages.rs4
-rw-r--r--crates/core/tedge_agent/Cargo.toml2
-rw-r--r--crates/core/tedge_agent/src/agent.rs23
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs9
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs49
-rw-r--r--docs/src/howto-guides/020_monitor_tedge_health2
7 files changed, 49 insertions, 42 deletions
diff --git a/crates/core/agent_interface/src/lib.rs b/crates/core/agent_interface/src/lib.rs
index fd5972c6..745201f2 100644
--- a/crates/core/agent_interface/src/lib.rs
+++ b/crates/core/agent_interface/src/lib.rs
@@ -6,7 +6,7 @@ pub mod topic;
pub use download::*;
pub use error::*;
pub use messages::{
- control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus,
+ control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus,
RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse,
SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
diff --git a/crates/core/agent_interface/src/messages.rs b/crates/core/agent_interface/src/messages.rs
index 4df6f41d..eddf119e 100644
--- a/crates/core/agent_interface/src/messages.rs
+++ b/crates/core/agent_interface/src/messages.rs
@@ -25,8 +25,8 @@ where
}
}
-pub const fn health_check_topic() -> &'static str {
- "tedge/health-check/tedge-agent"
+pub fn health_check_topics() -> Vec<&'static str> {
+ vec!["tedge/health-check", "tedge/health-check/tedge-agent"]
}
pub const fn software_filter_topic() -> &'static str {
diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml
index b95a8593..76bde633 100644
--- a/crates/core/tedge_agent/Cargo.toml
+++ b/crates/core/tedge_agent/Cargo.toml
@@ -37,7 +37,7 @@ tedge_config = { path = "../../common/tedge_config" }
tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
thiserror = "1.0"
time = { version = "0.3", features = ["formatting"] }
-tokio = { version = "1.8", features = ["fs","process", "rt"] }
+tokio = { version = "1.8", features = ["fs","process", "rt", "rt-multi-thread"] }
toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 0528cfb9..f5a1be14 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -8,7 +8,7 @@ use crate::{
},
};
use agent_interface::{
- control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus,
+ control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus,
RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest,
SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
SoftwareUpdateResponse,
@@ -40,7 +40,7 @@ const INIT_COMMAND: &str = "echo";
pub struct SmAgentConfig {
pub errors_topic: Topic,
pub mqtt_config: mqtt_channel::Config,
- pub request_topic_health: Topic,
+ pub request_topics_health: TopicFilter,
pub request_topic_list: Topic,
pub request_topic_update: Topic,
pub request_topics: TopicFilter,
@@ -62,15 +62,14 @@ impl Default for SmAgentConfig {
let mqtt_config = mqtt_channel::Config::default();
- let request_topics = vec![
- health_check_topic(),
- software_filter_topic(),
- control_filter_topic(),
- ]
- .try_into()
- .expect("Invalid topic filter");
+ let mut request_topics: TopicFilter = vec![software_filter_topic(), control_filter_topic()]
+ .try_into()
+ .expect("Invalid topic filter");
- let request_topic_health = Topic::new_unchecked(health_check_topic());
+ let request_topics_health: TopicFilter = health_check_topics()
+ .try_into()
+ .expect("Invalid topic filter");
+ request_topics.add_all(request_topics_health.clone());
let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent");
@@ -105,7 +104,7 @@ impl Default for SmAgentConfig {
Self {
errors_topic,
mqtt_config,
- request_topic_health,
+ request_topics_health,
request_topic_list,
request_topic_update,
request_topics,
@@ -295,7 +294,7 @@ impl SmAgent {
while let Some(message) = requests.next().await {
debug!("Request {:?}", message);
match &message.topic {
- topic if topic == &self.config.request_topic_health => {
+ topic if self.config.request_topics_health.accept_topic(topic) => {
let health_message =
Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP);
let _ = responses.publish(health_message).await;
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index 6063dca6..3a755a0f 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -12,6 +12,7 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w
const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
+const COMMON_HEALTH_CHECK_TOPIC: &str = "tedge/health-check";
const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
@@ -67,12 +68,14 @@ impl DeviceMonitor {
#[instrument(skip(self), name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC);
+ let health_check_topics: TopicFilter = vec![COMMON_HEALTH_CHECK_TOPIC, HEALTH_CHECK_TOPIC]
+ .try_into()
+ .expect("Valid health topics");
let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC);
let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
- input_topic.add_all(health_check_topic.clone());
+ input_topic.add_all(health_check_topics.clone());
let mqtt_config = mqtt_channel::Config::new(
self.device_monitor_config.host.to_string(),
@@ -101,7 +104,7 @@ impl DeviceMonitor {
let mut output_messages = mqtt_client.published.clone();
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
- if health_check_topic.accept(&message) {
+ if health_check_topics.accept(&message) {
let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP);
let _ = output_messages.send(health_message).await;
} else {
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 5208f04b..51c01078 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -19,9 +19,18 @@ pub async fn create_mapper(
) -> Result<Mapper, anyhow::Error> {
info!("{} starting", app_name);
+ let health_check_topics: TopicFilter = vec![
+ "tedge/health-check",
+ format!("tedge/health-check/{}", app_name).as_str(),
+ ]
+ .try_into()
+ .expect("health check topics must be valid");
+
+ let health_status_topic = Topic::new_unchecked(format!("tedge/health/{}", app_name).as_str());
+
let mapper_config = converter.get_mapper_config();
let mut topic_filter = mapper_config.in_topic_filter.clone();
- topic_filter.add(format!("tedge/health-check/{}", app_name).as_str())?;
+ topic_filter.add_all(health_check_topics.clone());
let mqtt_client =
Connection::new(&mqtt_config(app_name, &mqtt_host, mqtt_port, topic_filter)?).await?;
@@ -29,10 +38,11 @@ pub async fn create_mapper(
Mapper::subscribe_errors(mqtt_client.errors);
Ok(Mapper::new(
- app_name.to_string(),
mqtt_client.received,
mqtt_client.published,
converter,
+ health_check_topics,
+ health_status_topic,
))
}
@@ -54,26 +64,24 @@ pub struct Mapper {
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
- health_check_topic: TopicFilter,
- health_topic: Topic,
+ health_check_topics: TopicFilter,
+ health_status_topic: Topic,
}
impl Mapper {
pub fn new(
- name: String,
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
+ health_check_topics: TopicFilter,
+ health_status_topic: Topic,
) -> Self {
- let health_check_topic =
- TopicFilter::new_unchecked(format!("tedge/health-check/{}", name).as_str());
- let health_topic = Topic::new_unchecked(format!("tedge/health/{}", name).as_str());
Self {
input,
output,
converter,
- health_check_topic,
- health_topic,
+ health_check_topics,
+ health_status_topic,
}
}
@@ -122,8 +130,8 @@ impl Mapper {
}
async fn process_message(&mut self, message: Message) {
- if self.health_check_topic.accept(&message) {
- let health_message = Message::new(&self.health_topic, HEALTH_STATUS_UP);
+ if self.health_check_topics.accept(&message) {
+ let health_message = Message::new(&self.health_status_topic, HEALTH_STATUS_UP);
let _ = self.output.send(health_message).await;
} else {
let converted_messages = self.converter.convert(&message).await;
@@ -150,18 +158,13 @@ mod tests {
// Given a mapper
let name = "mapper_under_test";
- let mqtt_config = mqtt_channel::Config::default()
- .with_port(broker.port)
- .with_session_name(name)
- .with_subscriptions(TopicFilter::new_unchecked("in_topic"));
- let mqtt_client = Connection::new(&mqtt_config).await?;
-
- let mut mapper = Mapper::new(
- name.to_string(),
- mqtt_client.received,
- mqtt_client.published,
+ let mut mapper = create_mapper(
+ name,
+ "localhost".into(),
+ broker.port,
Box::new(UppercaseConverter::new()),
- );
+ )
+ .await?;
// Let's run the mapper in the background
tokio::spawn(async move {
diff --git a/docs/src/howto-guides/020_monitor_tedge_health b/docs/src/howto-guides/020_monitor_tedge_health
index dc886aec..19f50315 100644
--- a/docs/src/howto-guides/020_monitor_tedge_health
+++ b/docs/src/howto-guides/020_monitor_tedge_health
@@ -19,6 +19,8 @@ with the following payload:
{ "status": "up" }
```
+All daemons will also respond to health checks sent to the common health check endpoint `tedge/health-check`.
+
## Supported MQTT topic endpoints
The following endpoints are currently supported by various tedge daemons: