summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_watchdog/src
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-04-06 20:27:20 +0530
committerGitHub <noreply@github.com>2022-04-06 20:27:20 +0530
commit3a5f9a1e1cec75e395115567acf17ddce2f6de59 (patch)
tree483ace318460b36b9cf3bc3c6e557e603d291922 /crates/core/tedge_watchdog/src
parent84765d85b6af4ecb1ca5ffa07629eb11c360dbed (diff)
[953] tedge watchdog (#1026)
* [953] tedge-systemd-watchdog * update service file to watchdog * Closes #953 thin-edge watchdog * health check for all the services * update document
Diffstat (limited to 'crates/core/tedge_watchdog/src')
-rw-r--r--crates/core/tedge_watchdog/src/error.rs30
-rw-r--r--crates/core/tedge_watchdog/src/main.rs35
-rw-r--r--crates/core/tedge_watchdog/src/systemd_watchdog.rs149
3 files changed, 214 insertions, 0 deletions
diff --git a/crates/core/tedge_watchdog/src/error.rs b/crates/core/tedge_watchdog/src/error.rs
new file mode 100644
index 00000000..ff5fb14b
--- /dev/null
+++ b/crates/core/tedge_watchdog/src/error.rs
@@ -0,0 +1,30 @@
+use mqtt_channel::MqttError;
+
+use tedge_config::{ConfigSettingError, TEdgeConfigError};
+
+#[derive(Debug, thiserror::Error)]
+pub enum WatchdogError {
+ #[error("Fail to run `{cmd}`: {from}")]
+ CommandExecError { cmd: String, from: std::io::Error },
+
+ #[error(transparent)]
+ FromTedgeConfigError(#[from] TEdgeConfigError),
+
+ #[error(transparent)]
+ FromConfigSettingError(#[from] ConfigSettingError),
+
+ #[error(transparent)]
+ FromMqttError(#[from] MqttError),
+
+ #[error(transparent)]
+ DeserializeError(#[from] serde_json::Error),
+
+ #[error(transparent)]
+ ParseWatchdogSecToInt(#[from] std::num::ParseIntError),
+
+ #[error(transparent)]
+ ParseSystemdFile(#[from] std::io::Error),
+
+ #[error("Did not find the WatchdogSec{file}")]
+ NoWatchdogSec { file: String },
+}
diff --git a/crates/core/tedge_watchdog/src/main.rs b/crates/core/tedge_watchdog/src/main.rs
new file mode 100644
index 00000000..e4478b2f
--- /dev/null
+++ b/crates/core/tedge_watchdog/src/main.rs
@@ -0,0 +1,35 @@
+use clap::Parser;
+use std::path::PathBuf;
+use tedge_config::DEFAULT_TEDGE_CONFIG_PATH;
+
+mod error;
+mod systemd_watchdog;
+
+#[derive(Debug, clap::Parser)]
+#[clap(
+name = clap::crate_name!(),
+version = clap::crate_version!(),
+about = clap::crate_description!()
+)]
+pub struct WatchdogOpt {
+ /// Turn-on the debug log level.
+ ///
+ /// If off only reports ERROR, WARN, and INFO
+ /// If on also reports DEBUG and TRACE
+ #[clap(long)]
+ pub debug: bool,
+
+ /// Start the watchdog from custom path
+ ///
+ /// WARNING: This is mostly used in testing.
+ #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
+ pub config_dir: PathBuf,
+}
+
+#[tokio::main]
+async fn main() -> Result<(), anyhow::Error> {
+ let watchdog_opt = WatchdogOpt::parse();
+ tedge_utils::logging::initialise_tracing_subscriber(watchdog_opt.debug);
+
+ systemd_watchdog::start_watchdog(watchdog_opt.config_dir).await
+}
diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
new file mode 100644
index 00000000..8a6ad4b6
--- /dev/null
+++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
@@ -0,0 +1,149 @@
+use crate::error::WatchdogError;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use mqtt_channel::{Config, Message, PubChannel, Topic};
+use nanoid::nanoid;
+
+use freedesktop_entry_parser::parse_entry;
+use serde::{Deserialize, Serialize};
+use std::time::Instant;
+use std::{
+ path::PathBuf,
+ process::{self, Command, ExitStatus, Stdio},
+};
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting,
+ TEdgeConfigLocation,
+};
+
+#[derive(Serialize, Deserialize)]
+pub struct HealthStatus {
+ status: String,
+ pid: u32,
+}
+
+pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
+ // Send ready notification to systemd.
+ notify_systemd(process::id(), "--ready")?;
+
+ let tedge_services = vec![
+ "tedge-mapper-c8y",
+ "tedge-mapper-az",
+ "tedge-mapper-collectd",
+ "tedge-agent",
+ ];
+
+ let watchdog_threads = FuturesUnordered::new();
+
+ for service in tedge_services {
+ match get_watchdog_sec(&format!("/lib/systemd/system/{service}.service")) {
+ Ok(interval) => {
+ let req_topic = format!("tedge/health-check/{service}");
+ let res_topic = format!("tedge/health/{service}");
+ let tedge_config_location =
+ tedge_config::TEdgeConfigLocation::from_custom_root(tedge_config_dir.clone());
+
+ watchdog_threads.push(tokio::spawn(async move {
+ monitor_tedge_service(
+ tedge_config_location,
+ service,
+ &req_topic,
+ &res_topic,
+ interval,
+ )
+ .await
+ }));
+ }
+
+ Err(_e) => continue, // Watchdog not enabled for this service
+ }
+ }
+ futures::future::join_all(watchdog_threads).await;
+ Ok(())
+}
+
+async fn monitor_tedge_service(
+ tedge_config_location: TEdgeConfigLocation,
+ name: &str,
+ req_topic: &str,
+ res_topic: &str,
+ interval: u64,
+) -> Result<(), WatchdogError> {
+ let client_id: &str = &format!("{}_{}", name, nanoid!());
+ let mqtt_config = get_mqtt_config(tedge_config_location, client_id)?
+ .with_subscriptions(res_topic.try_into()?);
+ let client = mqtt_channel::Connection::new(&mqtt_config).await?;
+ let mut received = client.received;
+ let mut publisher = client.published;
+
+ println!("Starting watchdog for {} service", name);
+
+ loop {
+ let message = Message::new(&Topic::new(req_topic)?, "");
+ let _ = publisher
+ .publish(message)
+ .await
+ .map_err(|e| eprintln!("Publish failed with error: {}", e));
+
+ let start = Instant::now();
+
+ match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next())
+ .await
+ {
+ Ok(Some(msg)) => {
+ let message = msg.payload_str()?;
+
+ let p: HealthStatus = serde_json::from_str(message)?;
+
+ notify_systemd(p.pid, "WATCHDOG=1")?;
+ }
+ Ok(None) => {}
+ Err(elapsed) => {
+ eprintln!("The {name} failed with {elapsed}");
+ }
+ }
+ tokio::time::sleep(tokio::time::Duration::from_secs(interval) - start.elapsed()).await;
+ }
+}
+
+fn get_mqtt_config(
+ tedge_config_location: TEdgeConfigLocation,
+ client_id: &str,
+) -> Result<Config, WatchdogError> {
+ let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location);
+ let tedge_config = config_repository.load()?;
+ let mqtt_config = Config::default()
+ .with_session_name(client_id)
+ .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
+ .with_port(tedge_config.query(MqttPortSetting)?.into());
+ Ok(mqtt_config)
+}
+
+fn notify_systemd(pid: u32, status: &str) -> Result<ExitStatus, WatchdogError> {
+ let pid_opt = format!("--pid={pid}");
+ Command::new("systemd-notify")
+ .args([status, &pid_opt])
+ .stdin(Stdio::null())
+ .status()
+ .map_err(|err| WatchdogError::CommandExecError {
+ cmd: String::from("systemd-notify"),
+ from: err,
+ })
+}
+
+fn get_watchdog_sec(service_file: &str) -> Result<u64, WatchdogError> {
+ let entry = parse_entry(service_file)?;
+ if let Some(interval) = entry.section("Service").attr("WatchdogSec") {
+ interval.parse().map_err({
+ eprintln!(
+ "Failed to parse the to WatchdogSec to integer from {}",
+ service_file
+ );
+ WatchdogError::ParseWatchdogSecToInt
+ })
+ } else {
+ Err(WatchdogError::NoWatchdogSec {
+ file: service_file.to_string(),
+ })
+ }
+}