summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_watchdog/src
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-05-27 16:59:53 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-05-27 16:59:53 +0530
commita10c6c119899a8a9b36884e4e93e3f6bd873d793 (patch)
tree40bfb9f047545b7a4ece09e05f6cba2dab41bb35 /crates/core/tedge_watchdog/src
parente3775e430d3109081d3926ab4e7b13b05e1c2741 (diff)
Fix watchdog health check with timestamp validation
Diffstat (limited to 'crates/core/tedge_watchdog/src')
-rw-r--r--crates/core/tedge_watchdog/src/systemd_watchdog.rs56
1 files changed, 43 insertions, 13 deletions
diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
index 3fec6840..5ee6b916 100644
--- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs
+++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
@@ -1,5 +1,6 @@
use crate::error::WatchdogError;
use freedesktop_entry_parser::parse_entry;
+use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mqtt_channel::{Config, Message, PubChannel, Topic};
@@ -14,12 +15,14 @@ use tedge_config::{
ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting,
TEdgeConfigLocation,
};
+use time::OffsetDateTime;
use tracing::{debug, error, info, warn};
-#[derive(Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct HealthStatus {
status: String,
pid: u32,
+ time: i64,
}
pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
@@ -90,20 +93,22 @@ async fn monitor_tedge_service(
let start = Instant::now();
- match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next())
- .await
+ let request_timestamp = OffsetDateTime::now_utc().unix_timestamp();
+ match tokio::time::timeout(
+ tokio::time::Duration::from_secs(interval),
+ get_latest_health_status_message(request_timestamp, &mut received),
+ )
+ .await
{
- Ok(Some(msg)) => {
- let message = msg.payload_str()?;
-
- let p: HealthStatus = serde_json::from_str(message)?;
-
- debug!("Sending notification for {} with pid: {}", name, p.pid);
- notify_systemd(p.pid, "WATCHDOG=1")?;
+ Ok(health_status) => {
+ debug!(
+ "Sending notification for {} with pid: {}",
+ name, health_status.pid
+ );
+ notify_systemd(health_status.pid, "WATCHDOG=1")?;
}
- Ok(None) => {}
- Err(elapsed) => {
- warn!("The {name} failed with {elapsed}");
+ Err(_) => {
+ warn!("No health check response received from {name} in time");
}
}
@@ -114,6 +119,31 @@ async fn monitor_tedge_service(
}
}
+async fn get_latest_health_status_message(
+ request_timestamp: i64,
+ messages: &mut mpsc::UnboundedReceiver<Message>,
+) -> HealthStatus {
+ loop {
+ if let Some(message) = messages.next().await {
+ if let Ok(message) = message.payload_str() {
+ debug!("Health response received: {}", message);
+ if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) {
+ if health_status.time >= request_timestamp {
+ return health_status;
+ } else {
+ debug!(
+ "Ignoring stale health response: {:?} older than request time: {}",
+ health_status, request_timestamp
+ );
+ }
+ } else {
+ error!("Invalid health response received: {}", message);
+ }
+ }
+ }
+ }
+}
+
fn get_mqtt_config(
tedge_config_location: TEdgeConfigLocation,
client_id: &str,