summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-05-31 12:38:28 +0530
committerGitHub <noreply@github.com>2022-05-31 12:38:28 +0530
commitaacb5ff692e4ae0c6f70673e6aa9798b7d13cb71 (patch)
tree3d21f1c1eb134439801e110cf1f0aab61164eaa1
parent505b038e76e5ccf7da9c0323962f52ac734b76f8 (diff)
parent34a3f2f29ee2e6eaeadf5a1f41f85f841ef3d4bb (diff)
Merge PR #1170 Fix tedge watchdog timeout misalignment with monitored services
Fix tedge watchdog timeout misalignment with monitored services
-rw-r--r--Cargo.lock1
-rw-r--r--crates/core/tedge_agent/src/agent.rs4
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs4
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs7
-rw-r--r--crates/core/tedge_watchdog/Cargo.toml3
-rw-r--r--crates/core/tedge_watchdog/src/systemd_watchdog.rs96
-rw-r--r--docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md54
7 files changed, 128 insertions, 41 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 27e25416..da66fb41 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3066,6 +3066,7 @@ dependencies = [
"tedge_config",
"tedge_utils",
"thiserror",
+ "time",
"tokio",
"tracing",
]
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index b7d946b0..755fc7af 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -27,6 +27,7 @@ use tedge_config::{
TEdgeConfigLocation, TmpPathSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH,
};
use tedge_utils::file::create_directory_with_user_group;
+use time::OffsetDateTime;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, warn};
@@ -296,7 +297,8 @@ impl SmAgent {
topic if self.config.request_topics_health.accept_topic(topic) => {
let health_status = json!({
"status": "up",
- "pid": process::id()
+ "pid": process::id(),
+ "time": OffsetDateTime::now_utc().unix_timestamp(),
})
.to_string();
let health_message =
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index f927f887..b21f6dcf 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -3,6 +3,7 @@ use std::process;
use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher};
use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter};
use serde_json::json;
+use time::OffsetDateTime;
use tracing::{error, info, instrument};
use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError};
@@ -109,7 +110,8 @@ impl DeviceMonitor {
if health_check_topics.accept(&message) {
let health_status = json!({
"status": "up",
- "pid": process::id()
+ "pid": process::id(),
+ "time": OffsetDateTime::now_utc().unix_timestamp(),
})
.to_string();
let health_message = Message::new(&health_status_topic, health_status);
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 9309a31e..fb5713f4 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -7,6 +7,7 @@ use mqtt_channel::{
UnboundedSender,
};
use serde_json::json;
+use time::OffsetDateTime;
use tracing::{error, info, instrument};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
@@ -133,7 +134,8 @@ impl Mapper {
if self.health_check_topics.accept(&message) {
let health_status = json!({
"status": "up",
- "pid": process::id()
+ "pid": process::id(),
+ "time": OffsetDateTime::now_utc().unix_timestamp(),
})
.to_string();
let health_message = Message::new(&self.health_status_topic, health_status);
@@ -242,7 +244,7 @@ mod tests {
let common_health_check_topic = "tedge/health-check";
let health_status = broker
.wait_for_response_on_publish(
- &common_health_check_topic,
+ common_health_check_topic,
"",
&health_topic,
Duration::from_secs(1),
@@ -252,6 +254,7 @@ mod tests {
let health_status: Value = serde_json::from_str(health_status.as_str())?;
assert_json_include!(actual: &health_status, expected: json!({"status": "up"}));
assert!(health_status["pid"].is_number());
+ assert!(health_status["time"].is_number());
Ok(())
}
diff --git a/crates/core/tedge_watchdog/Cargo.toml b/crates/core/tedge_watchdog/Cargo.toml
index 7374eb5c..da3647c0 100644
--- a/crates/core/tedge_watchdog/Cargo.toml
+++ b/crates/core/tedge_watchdog/Cargo.toml
@@ -28,5 +28,6 @@ freedesktop_entry_parser = "1.3.0"
tedge_config = { path = "../../common/tedge_config" }
tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
thiserror ="1.0.30"
-tokio = { version = "1.12", features = ["sync", "time"] }
+time = { version = "0.3", features = ["formatting", "serde-well-known"] }
+tokio = { version = "1.12", features = ["sync", "time", "rt-multi-thread"] }
tracing = { version = "0.1", features = ["attributes", "log"] }
diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
index 77825103..8682a9c1 100644
--- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs
+++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
@@ -1,5 +1,6 @@
use crate::error::WatchdogError;
use freedesktop_entry_parser::parse_entry;
+use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mqtt_channel::{Config, Message, PubChannel, Topic};
@@ -14,12 +15,14 @@ use tedge_config::{
ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting,
TEdgeConfigLocation,
};
-use tracing::{error, info, warn};
+use time::OffsetDateTime;
+use tracing::{debug, error, info, warn};
-#[derive(Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
pub struct HealthStatus {
status: String,
pid: u32,
+ time: i64,
}
pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
@@ -49,7 +52,7 @@ pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Err
service,
&req_topic,
&res_topic,
- interval / 2,
+ interval / 4,
)
.await
}));
@@ -90,19 +93,22 @@ async fn monitor_tedge_service(
let start = Instant::now();
- match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next())
- .await
+ let request_timestamp = OffsetDateTime::now_utc().unix_timestamp();
+ match tokio::time::timeout(
+ tokio::time::Duration::from_secs(interval),
+ get_latest_health_status_message(request_timestamp, &mut received),
+ )
+ .await
{
- Ok(Some(msg)) => {
- let message = msg.payload_str()?;
-
- let p: HealthStatus = serde_json::from_str(message)?;
-
- notify_systemd(p.pid, "WATCHDOG=1")?;
+ Ok(health_status) => {
+ debug!(
+ "Sending notification for {} with pid: {}",
+ name, health_status.pid
+ );
+ notify_systemd(health_status.pid, "WATCHDOG=1")?;
}
- Ok(None) => {}
- Err(elapsed) => {
- warn!("The {name} failed with {elapsed}");
+ Err(_) => {
+ warn!("No health check response received from {name} in time");
}
}
@@ -113,6 +119,31 @@ async fn monitor_tedge_service(
}
}
+async fn get_latest_health_status_message(
+ request_timestamp: i64,
+ messages: &mut mpsc::UnboundedReceiver<Message>,
+) -> HealthStatus {
+ loop {
+ if let Some(message) = messages.next().await {
+ if let Ok(message) = message.payload_str() {
+ debug!("Health response received: {}", message);
+ if let Ok(health_status) = serde_json::from_str::<HealthStatus>(message) {
+ if health_status.time >= request_timestamp {
+ return health_status;
+ } else {
+ debug!(
+ "Ignoring stale health response: {:?} older than request time: {}",
+ health_status, request_timestamp
+ );
+ }
+ } else {
+ error!("Invalid health response received: {}", message);
+ }
+ }
+ }
+ }
+}
+
fn get_mqtt_config(
tedge_config_location: TEdgeConfigLocation,
client_id: &str,
@@ -157,3 +188,40 @@ fn get_watchdog_sec(service_file: &str) -> Result<u64, WatchdogError> {
})
}
}
+
+#[cfg(test)]
+mod tests {
+ use anyhow::Result;
+ use serde_json::json;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_get_latest_health_status_message() -> Result<()> {
+ let (mut sender, mut receiver) = mpsc::unbounded::<Message>();
+ let health_topic = Topic::new("tedge/health/test-service").expect("Valid topic");
+
+ for x in 1..5i64 {
+ let health_status = json!({
+ "status": "up",
+ "pid": 123u32,
+ "time": x,
+ })
+ .to_string();
+ let health_message = Message::new(&health_topic, health_status);
+ sender.publish(health_message).await?;
+ }
+
+ let health_status = get_latest_health_status_message(3, &mut receiver).await;
+ assert_eq!(health_status.time, 3);
+
+ let timeout_error = tokio::time::timeout(
+ tokio::time::Duration::from_secs(1),
+ get_latest_health_status_message(5, &mut receiver),
+ )
+ .await;
+ assert!(timeout_error.is_err());
+
+ Ok(())
+ }
+}
diff --git a/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md b/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md
index 50c39967..c7a188fa 100644
--- a/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md
+++ b/docs/src/howto-guides/021_enable_tedge_watchdog_using_systemd.md
@@ -2,24 +2,31 @@
## Introduction
-The systemd watchdog feature enables systemd to detect when a service is unhealthy or unresponsive and attempt to fix it by restarting that service.
+The systemd watchdog feature enables systemd to detect when a service is unhealthy or unresponsive and
+attempt to fix it by restarting that service.
To detect if a service is healthy or not, systemd relies on periodic health notifications from that service at regular intervals.
-If the service fails to send that notification within a time threshold, then systemd will assume that service to be unhealthy and restart it.
+If the service fails to send that notification within a time threshold,
+then systemd will assume that service to be unhealthy and restart it.
This document describes how the systemd watchdog mechanism can be enabled for thin-edge services.
-## Enabling the `watchdog` feature in `systemd`
+## Enabling the systemd watchdog feature for a tedge service
-Enabling systemd `watchdog` for a `thin-edge.io` service (tedge_agent, tedge_mapper_c8y/az/collectd)
-using the `systemd` is a two-step process.
+Enabling systemd watchdog for a `thin-edge.io` service (tedge-agent, tedge-mapper-c8y/az/collectd) is a two-step process.
-### Step 1: Enable the `watchdog` feature in the `systemd` service file
-For example to enable the `watchdog` feature for `tedge-mapper-c8y` service, update systemd service file as shown below.
+### Step 1: Enable the watchdog feature in the systemd service file
-Add `tedge-watchdog.service` in `After` under `[Unit]` section.
-Add `WatchdogSec=5` under `[Service]` section.
+For example, to enable the watchdog feature for `tedge-mapper-c8y` service,
+update the systemd service file as shown below:
-The sample service file after updating looks as below.
+> Note: The systemd service file for tedge services are usually present in `/lib/systemd/system` directory,
+> like `/lib/systemd/system/tedge-mapper-c8y.service`.
+
+Add `tedge-watchdog.service` as an `After` service dependency under `[Unit]` section.
+Add the watchdog interval as `WatchdogSec=30` under `[Service]` section.
+Update the restart condition as `Restart=always` under `[Service]` section.
+
+Here is the updated service file for `tedge-mapper-c8y` service:
```shell
[Unit]
@@ -29,19 +36,16 @@ After=syslog.target network.target mosquitto.service tedge-watchdog.service
[Service]
User=tedge-mapper
ExecStart=/usr/bin/tedge_mapper c8y
-Restart=on-failure
+Restart=always
RestartPreventExitStatus=255
-WatchdogSec=5
+WatchdogSec=30
```
-> Note: The systemd service file for tedge services are usually present
-in `/lib/systemd/system` directory, like `/lib/systemd/system/tedge-mapper-c8y.service`.
-
### Step 2: Start the `tedge-watchdog` service
The `tedge-watchdog` service is responsible for periodically checking the health of
-all tedge services for which the watchdog feature is enabled, and send systemd
-watchdog notifications on their behalf to systemd.
+all tedge services for which the watchdog feature is enabled,
+and send systemd watchdog notifications on their behalf to systemd.
Start and enable the `tedge-watchdog` service as follows:
@@ -50,16 +54,22 @@ systemctl start tedge-watchdog.service
systemctl enable tedge-watchdog.service
```
-Now, the `tedge-watchdog` service will be keep sending health check messages to the monitored services periodically within their configured `WatchdogSec` interval.
+Once started, the `tedge-watchdog` service will keep checking the health of the monitored tedge services
+by periodically sending health check messages to them within their configured `WatchdogSec` interval.
-The health check request for service is published to `tedge/health-check/<service-name>` topic and the health status response from that service is expected on `tedge/health/<service-name>` topic.
+The health check request for service is published to `tedge/health-check/<service-name>` topic and
+the health status response from that service is expected on `tedge/health/<service-name>` topic.
-Once the health status response is received from a particular service, the `tedge-watchdog` service will send the watchdog notification on behalf of that service to systemd.
+Once the health status response is received from a particular service,
+the `tedge-watchdog` service will send the [systemd notification](https://www.freedesktop.org/software/systemd/man/sd_notify.html#) to systemd on behalf of that monitored service.
## Debugging
-One can observe the message exchange between the `service` and the `watchdog` by subscribing to `tedge/health/#` and `tedge/health-check/#` topics.
+
+One can observe the message exchange between the `service` and the `watchdog`
+by subscribing to `tedge/health/#` and `tedge/health-check/#` topics.
For more info check [here](./020_monitor_tedge_health.md)
-> Note: If the watchdog service did not send the notification to the systemd within `WatchdogSec`, then the systemd will kill the existing service process and restarts it.
+> Note: If the watchdog service does not send the notification to the systemd within `WatchdogSec` interval for a service,
+> then systemd restarts that service by killing the old process and spawning a new one to replace it.
> Note: [Here](https://www.medo64.com/2019/01/systemd-watchdog-for-any-service/) is an example about using `systemd watchdog` feature.