diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-05-31 12:38:28 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-31 12:38:28 +0530 |
commit | aacb5ff692e4ae0c6f70673e6aa9798b7d13cb71 (patch) | |
tree | 3d21f1c1eb134439801e110cf1f0aab61164eaa1 | |
parent | 505b038e76e5ccf7da9c0323962f52ac734b76f8 (diff) | |
parent | 34a3f2f29ee2e6eaeadf5a1f41f85f841ef3d4bb (diff) |
Merge PR #1170 Fix tedge watchdog timeout misalignment with monitored services
Fix tedge watchdog timeout misalignment with monitored services
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 7 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/Cargo.toml | 3 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/src/systemd_watchdog.rs | 96 | ||||
-rw-r--r-- | docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md | 54 |
7 files changed, 128 insertions, 41 deletions
@@ -3066,6 +3066,7 @@ dependencies = [ "tedge_config", "tedge_utils", "thiserror", + "time", "tokio", "tracing", ] diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index b7d946b0..755fc7af 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -27,6 +27,7 @@ use tedge_config::{ TEdgeConfigLocation, TmpPathSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH, }; use tedge_utils::file::create_directory_with_user_group; +use time::OffsetDateTime; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, warn}; @@ -296,7 +297,8 @@ impl SmAgent { topic if self.config.request_topics_health.accept_topic(topic) => { let health_status = json!({ "status": "up", - "pid": process::id() + "pid": process::id(), + "time": OffsetDateTime::now_utc().unix_timestamp(), }) .to_string(); let health_message = diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index f927f887..b21f6dcf 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -3,6 +3,7 @@ use std::process; use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher}; use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter}; use serde_json::json; +use time::OffsetDateTime; use tracing::{error, info, instrument}; use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError}; @@ -109,7 +110,8 @@ impl DeviceMonitor { if health_check_topics.accept(&message) { let health_status = json!({ "status": "up", - "pid": process::id() + "pid": process::id(), + "time": OffsetDateTime::now_utc().unix_timestamp(), }) .to_string(); let health_message = Message::new(&health_status_topic, health_status); diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 9309a31e..fb5713f4 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -7,6 +7,7 @@ use mqtt_channel::{ UnboundedSender, }; use serde_json::json; +use time::OffsetDateTime; use tracing::{error, info, instrument}; const SYNC_WINDOW: Duration = Duration::from_secs(3); @@ -133,7 +134,8 @@ impl Mapper { if self.health_check_topics.accept(&message) { let health_status = json!({ "status": "up", - "pid": process::id() + "pid": process::id(), + "time": OffsetDateTime::now_utc().unix_timestamp(), }) .to_string(); let health_message = Message::new(&self.health_status_topic, health_status); @@ -242,7 +244,7 @@ mod tests { let common_health_check_topic = "tedge/health-check"; let health_status = broker .wait_for_response_on_publish( - &common_health_check_topic, + common_health_check_topic, "", &health_topic, Duration::from_secs(1), @@ -252,6 +254,7 @@ mod tests { let health_status: Value = serde_json::from_str(health_status.as_str())?; assert_json_include!(actual: &health_status, expected: json!({"status": "up"})); assert!(health_status["pid"].is_number()); + assert!(health_status["time"].is_number()); Ok(()) } diff --git a/crates/core/tedge_watchdog/Cargo.toml b/crates/core/tedge_watchdog/Cargo.toml index 7374eb5c..da3647c0 100644 --- a/crates/core/tedge_watchdog/Cargo.toml +++ b/crates/core/tedge_watchdog/Cargo.toml @@ -28,5 +28,6 @@ freedesktop_entry_parser = "1.3.0" tedge_config = { path = "../../common/tedge_config" } tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } thiserror ="1.0.30" -tokio = { version = "1.12", features = ["sync", "time"] } +time = { version = "0.3", features = ["formatting", "serde-well-known"] } +tokio = { version = "1.12", features = ["sync", "time", "rt-multi-thread"] } tracing = { version = "0.1", features = ["attributes", "log"] } diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index 77825103..8682a9c1 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -1,5 +1,6 @@ use crate::error::WatchdogError; use freedesktop_entry_parser::parse_entry; +use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::StreamExt; use mqtt_channel::{Config, Message, PubChannel, Topic}; @@ -14,12 +15,14 @@ use tedge_config::{ ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfigLocation, }; -use tracing::{error, info, warn}; +use time::OffsetDateTime; +use tracing::{debug, error, info, warn}; -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct HealthStatus { status: String, pid: u32, + time: i64, } pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> { @@ -49,7 +52,7 @@ pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Err service, &req_topic, &res_topic, - interval / 2, + interval / 4, ) .await })); @@ -90,19 +93,22 @@ async fn monitor_tedge_service( let start = Instant::now(); - match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next()) - .await + let request_timestamp = OffsetDateTime::now_utc().unix_timestamp(); + match tokio::time::timeout( + tokio::time::Duration::from_secs(interval), + get_latest_health_status_message(request_timestamp, &mut received), + ) + .await { - Ok(Some(msg)) => { - let message = msg.payload_str()?; - - let p: HealthStatus = serde_json::from_str(message)?; - - notify_systemd(p.pid, "WATCHDOG=1")?; + Ok(health_status) => { + debug!( + "Sending notification for {} with pid: {}", + name, health_status.pid + ); + notify_systemd(health_status.pid, "WATCHDOG=1")?; } - Ok(None) => {} - Err(elapsed) => { - warn!("The {name} failed with {elapsed}"); + Err(_) => { + warn!("No health check response received from {name} in time"); } } @@ -113,6 +119,31 @@ async fn monitor_tedge_service( } } +async fn get_latest_health_status_message( + request_timestamp: i64, + messages: &mut mpsc::UnboundedReceiver<Message>, +) -> HealthStatus { + loop { + if let Some(message) = messages.next().await { + if let Ok(message) = message.payload_str() { + debug!("Health response received: {}", message); + if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) { + if health_status.time >= request_timestamp { + return health_status; + } else { + debug!( + "Ignoring stale health response: {:?} older than request time: {}", + health_status, request_timestamp + ); + } + } else { + error!("Invalid health response received: {}", message); + } + } + } + } +} + fn get_mqtt_config( tedge_config_location: TEdgeConfigLocation, client_id: &str, @@ -157,3 +188,40 @@ fn get_watchdog_sec(service_file: &str) -> Result<u64, WatchdogError> { }) } } + +#[cfg(test)] +mod tests { + use anyhow::Result; + use serde_json::json; + + use super::*; + + #[tokio::test] + async fn test_get_latest_health_status_message() -> Result<()> { + let (mut sender, mut receiver) = mpsc::unbounded::<Message>(); + let health_topic = Topic::new("tedge/health/test-service").expect("Valid topic"); + + for x in 1..5i64 { + let health_status = json!({ + "status": "up", + "pid": 123u32, + "time": x, + }) + .to_string(); + let health_message = Message::new(&health_topic, health_status); + sender.publish(health_message).await?; + } + + let health_status = get_latest_health_status_message(3, &mut receiver).await; + assert_eq!(health_status.time, 3); + + let timeout_error = tokio::time::timeout( + tokio::time::Duration::from_secs(1), + get_latest_health_status_message(5, &mut receiver), + ) + .await; + assert!(timeout_error.is_err()); + + Ok(()) + } +} diff --git a/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md b/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md index 50c39967..c7a188fa 100644 --- a/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md +++ b/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md @@ -2,24 +2,31 @@ ## Introduction -The systemd watchdog feature enables systemd to detect when a service is unhealthy or unresponsive and attempt to fix it by restarting that service. +The systemd watchdog feature enables systemd to detect when a service is unhealthy or unresponsive and +attempt to fix it by restarting that service. To detect if a service is healthy or not, systemd relies on periodic health notifications from that service at regular intervals. -If the service fails to send that notification within a time threshold, then systemd will assume that service to be unhealthy and restart it. +If the service fails to send that notification within a time threshold, +then systemd will assume that service to be unhealthy and restart it. This document describes how the systemd watchdog mechanism can be enabled for thin-edge services. -## Enabling the `watchdog` feature in `systemd` +## Enabling the systemd watchdog feature for a tedge service -Enabling systemd `watchdog` for a `thin-edge.io` service (tedge_agent, tedge_mapper_c8y/az/collectd) -using the `systemd` is a two-step process. +Enabling systemd watchdog for a `thin-edge.io` service (tedge-agent, tedge-mapper-c8y/az/collectd) is a two-step process. -### Step 1: Enable the `watchdog` feature in the `systemd` service file -For example to enable the `watchdog` feature for `tedge-mapper-c8y` service, update systemd service file as shown below. +### Step 1: Enable the watchdog feature in the systemd service file -Add `tedge-watchdog.service` in `After` under `[Unit]` section. -Add `WatchdogSec=5` under `[Service]` section. +For example, to enable the watchdog feature for `tedge-mapper-c8y` service, +update the systemd service file as shown below: -The sample service file after updating looks as below. +> Note: The systemd service file for tedge services are usually present in `/lib/systemd/system` directory, +> like `/lib/systemd/system/tedge-mapper-c8y.service`. + +Add `tedge-watchdog.service` as an `After` service dependency under `[Unit]` section. +Add the watchdog interval as `WatchdogSec=30` under `[Service]` section. +Update the restart condition as `Restart=always` under `[Service]` section. + +Here is the updated service file for `tedge-mapper-c8y` service: ```shell [Unit] @@ -29,19 +36,16 @@ After=syslog.target network.target mosquitto.service tedge-watchdog.service [Service] User=tedge-mapper ExecStart=/usr/bin/tedge_mapper c8y -Restart=on-failure +Restart=always RestartPreventExitStatus=255 -WatchdogSec=5 +WatchdogSec=30 ``` -> Note: The systemd service file for tedge services are usually present -in `/lib/systemd/system` directory, like `/lib/systemd/system/tedge-mapper-c8y.service`. - ### Step 2: Start the `tedge-watchdog` service The `tedge-watchdog` service is responsible for periodically checking the health of -all tedge services for which the watchdog feature is enabled, and send systemd -watchdog notifications on their behalf to systemd. +all tedge services for which the watchdog feature is enabled, +and send systemd watchdog notifications on their behalf to systemd. Start and enable the `tedge-watchdog` service as follows: @@ -50,16 +54,22 @@ systemctl start tedge-watchdog.service systemctl enable tedge-watchdog.service ``` -Now, the `tedge-watchdog` service will be keep sending health check messages to the monitored services periodically within their configured `WatchdogSec` interval. +Once started, the `tedge-watchdog` service will keep checking the health of the monitored tedge services +by periodically sending health check messages to them within their configured `WatchdogSec` interval. -The health check request for service is published to `tedge/health-check/<service-name>` topic and the health status response from that service is expected on `tedge/health/<service-name>` topic. +The health check request for service is published to `tedge/health-check/<service-name>` topic and +the health status response from that service is expected on `tedge/health/<service-name>` topic. -Once the health status response is received from a particular service, the `tedge-watchdog` service will send the watchdog notification on behalf of that service to systemd. +Once the health status response is received from a particular service, +the `tedge-watchdog` service will send the [systemd notification](https://www.freedesktop.org/software/systemd/man/sd_notify.html#) to systemd on behalf of that monitored service. ## Debugging -One can observe the message exchange between the `service` and the `watchdog` by subscribing to `tedge/health/#` and `tedge/health-check/#` topics. + +One can observe the message exchange between the `service` and the `watchdog` +by subscribing to `tedge/health/#` and `tedge/health-check/#` topics. For more info check [here](./020_monitor_tedge_health.md) -> Note: If the watchdog service did not send the notification to the systemd within `WatchdogSec`, then the systemd will kill the existing service process and restarts it. +> Note: If the watchdog service does not send the notification to the systemd within `WatchdogSec` interval for a service, +> then systemd restarts that service by killing the old process and spawning a new one to replace it. > Note: [Here](https://www.medo64.com/2019/01/systemd-watchdog-for-any-service/) is an example about using `systemd watchdog` feature. |