diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-05-27 16:59:53 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-05-27 16:59:53 +0530 |
commit | a10c6c119899a8a9b36884e4e93e3f6bd873d793 (patch) | |
tree | 40bfb9f047545b7a4ece09e05f6cba2dab41bb35 /crates/core/tedge_watchdog/src | |
parent | e3775e430d3109081d3926ab4e7b13b05e1c2741 (diff) |
Fix watchdog health check with timestamp validation
Diffstat (limited to 'crates/core/tedge_watchdog/src')
-rw-r--r-- | crates/core/tedge_watchdog/src/systemd_watchdog.rs | 56 |
1 files changed, 43 insertions, 13 deletions
diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index 3fec6840..5ee6b916 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -1,5 +1,6 @@ use crate::error::WatchdogError; use freedesktop_entry_parser::parse_entry; +use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::StreamExt; use mqtt_channel::{Config, Message, PubChannel, Topic}; @@ -14,12 +15,14 @@ use tedge_config::{ ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfigLocation, }; +use time::OffsetDateTime; use tracing::{debug, error, info, warn}; -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct HealthStatus { status: String, pid: u32, + time: i64, } pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> { @@ -90,20 +93,22 @@ async fn monitor_tedge_service( let start = Instant::now(); - match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next()) - .await + let request_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + match tokio::time::timeout( + tokio::time::Duration::from_secs(interval), + get_latest_health_status_message(request_timestamp, &mut received), + ) + .await { - Ok(Some(msg)) => { - let message = msg.payload_str()?; - - let p: HealthStatus = serde_json::from_str(message)?; - - debug!("Sending notification for {} with pid: {}", name, p.pid); - notify_systemd(p.pid, "WATCHDOG=1")?; + Ok(health_status) => { + debug!( + "Sending notification for {} with pid: {}", + name, health_status.pid + ); + notify_systemd(health_status.pid, "WATCHDOG=1")?; } - Ok(None) => {} - Err(elapsed) => { - warn!("The {name} failed with {elapsed}"); + Err(_) => { + warn!("No health check response received from {name} in time"); } } @@ -114,6 +119,31 @@ async fn monitor_tedge_service( } } +async fn get_latest_health_status_message( + request_timestamp: i64, + messages: &mut mpsc::UnboundedReceiver<Message>, +) -> HealthStatus { + loop { + if let Some(message) = messages.next().await { + if let Ok(message) = message.payload_str() { + debug!("Health response received: {}", message); + if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) { + if health_status.time >= request_timestamp { + return health_status; + } else { + debug!( + "Ignoring stale health response: {:?} older than request time: {}", + health_status, request_timestamp + ); + } + } else { + error!("Invalid health response received: {}", message); + } + } + } + } +} + fn get_mqtt_config( tedge_config_location: TEdgeConfigLocation, client_id: &str, |