diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-04-06 20:27:20 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-06 20:27:20 +0530 |
commit | 3a5f9a1e1cec75e395115567acf17ddce2f6de59 (patch) | |
tree | 483ace318460b36b9cf3bc3c6e557e603d291922 /crates/core/tedge_watchdog/src | |
parent | 84765d85b6af4ecb1ca5ffa07629eb11c360dbed (diff) |
[953] tedge watchdog (#1026)
* [953] tedge-systemd-watchdog
* update service file to watchdog
* Closes #953 thin-edge watchdog
* health check for all the services
* update document
Diffstat (limited to 'crates/core/tedge_watchdog/src')
-rw-r--r-- | crates/core/tedge_watchdog/src/error.rs | 30 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/src/main.rs | 35 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/src/systemd_watchdog.rs | 149 |
3 files changed, 214 insertions, 0 deletions
diff --git a/crates/core/tedge_watchdog/src/error.rs b/crates/core/tedge_watchdog/src/error.rs new file mode 100644 index 00000000..ff5fb14b --- /dev/null +++ b/crates/core/tedge_watchdog/src/error.rs @@ -0,0 +1,30 @@ +use mqtt_channel::MqttError; + +use tedge_config::{ConfigSettingError, TEdgeConfigError}; + +#[derive(Debug, thiserror::Error)] +pub enum WatchdogError { + #[error("Fail to run `{cmd}`: {from}")] + CommandExecError { cmd: String, from: std::io::Error }, + + #[error(transparent)] + FromTedgeConfigError(#[from] TEdgeConfigError), + + #[error(transparent)] + FromConfigSettingError(#[from] ConfigSettingError), + + #[error(transparent)] + FromMqttError(#[from] MqttError), + + #[error(transparent)] + DeserializeError(#[from] serde_json::Error), + + #[error(transparent)] + ParseWatchdogSecToInt(#[from] std::num::ParseIntError), + + #[error(transparent)] + ParseSystemdFile(#[from] std::io::Error), + + #[error("Did not find the WatchdogSec{file}")] + NoWatchdogSec { file: String }, +} diff --git a/crates/core/tedge_watchdog/src/main.rs b/crates/core/tedge_watchdog/src/main.rs new file mode 100644 index 00000000..e4478b2f --- /dev/null +++ b/crates/core/tedge_watchdog/src/main.rs @@ -0,0 +1,35 @@ +use clap::Parser; +use std::path::PathBuf; +use tedge_config::DEFAULT_TEDGE_CONFIG_PATH; + +mod error; +mod systemd_watchdog; + +#[derive(Debug, clap::Parser)] +#[clap( +name = clap::crate_name!(), +version = clap::crate_version!(), +about = clap::crate_description!() +)] +pub struct WatchdogOpt { + /// Turn-on the debug log level. + /// + /// If off only reports ERROR, WARN, and INFO + /// If on also reports DEBUG and TRACE + #[clap(long)] + pub debug: bool, + + /// Start the watchdog from custom path + /// + /// WARNING: This is mostly used in testing. + #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] + pub config_dir: PathBuf, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + let watchdog_opt = WatchdogOpt::parse(); + tedge_utils::logging::initialise_tracing_subscriber(watchdog_opt.debug); + + systemd_watchdog::start_watchdog(watchdog_opt.config_dir).await +} diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs new file mode 100644 index 00000000..8a6ad4b6 --- /dev/null +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -0,0 +1,149 @@ +use crate::error::WatchdogError; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use mqtt_channel::{Config, Message, PubChannel, Topic}; +use nanoid::nanoid; + +use freedesktop_entry_parser::parse_entry; +use serde::{Deserialize, Serialize}; +use std::time::Instant; +use std::{ + path::PathBuf, + process::{self, Command, ExitStatus, Stdio}, +}; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, + TEdgeConfigLocation, +}; + +#[derive(Serialize, Deserialize)] +pub struct HealthStatus { + status: String, + pid: u32, +} + +pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> { + // Send ready notification to systemd. + notify_systemd(process::id(), "--ready")?; + + let tedge_services = vec![ + "tedge-mapper-c8y", + "tedge-mapper-az", + "tedge-mapper-collectd", + "tedge-agent", + ]; + + let watchdog_threads = FuturesUnordered::new(); + + for service in tedge_services { + match get_watchdog_sec(&format!("/lib/systemd/system/{service}.service")) { + Ok(interval) => { + let req_topic = format!("tedge/health-check/{service}"); + let res_topic = format!("tedge/health/{service}"); + let tedge_config_location = + tedge_config::TEdgeConfigLocation::from_custom_root(tedge_config_dir.clone()); + + watchdog_threads.push(tokio::spawn(async move { + monitor_tedge_service( + tedge_config_location, + service, + &req_topic, + &res_topic, + interval, + ) + .await + })); + } + + Err(_e) => continue, // Watchdog not enabled for this service + } + } + futures::future::join_all(watchdog_threads).await; + Ok(()) +} + +async fn monitor_tedge_service( + tedge_config_location: TEdgeConfigLocation, + name: &str, + req_topic: &str, + res_topic: &str, + interval: u64, +) -> Result<(), WatchdogError> { + let client_id: &str = &format!("{}_{}", name, nanoid!()); + let mqtt_config = get_mqtt_config(tedge_config_location, client_id)? + .with_subscriptions(res_topic.try_into()?); + let client = mqtt_channel::Connection::new(&mqtt_config).await?; + let mut received = client.received; + let mut publisher = client.published; + + println!("Starting watchdog for {} service", name); + + loop { + let message = Message::new(&Topic::new(req_topic)?, ""); + let _ = publisher + .publish(message) + .await + .map_err(|e| eprintln!("Publish failed with error: {}", e)); + + let start = Instant::now(); + + match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next()) + .await + { + Ok(Some(msg)) => { + let message = msg.payload_str()?; + + let p: HealthStatus = serde_json::from_str(message)?; + + notify_systemd(p.pid, "WATCHDOG=1")?; + } + Ok(None) => {} + Err(elapsed) => { + eprintln!("The {name} failed with {elapsed}"); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(interval) - start.elapsed()).await; + } +} + +fn get_mqtt_config( + tedge_config_location: TEdgeConfigLocation, + client_id: &str, +) -> Result<Config, WatchdogError> { + let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location); + let tedge_config = config_repository.load()?; + let mqtt_config = Config::default() + .with_session_name(client_id) + .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) + .with_port(tedge_config.query(MqttPortSetting)?.into()); + Ok(mqtt_config) +} + +fn notify_systemd(pid: u32, status: &str) -> Result<ExitStatus, WatchdogError> { + let pid_opt = format!("--pid={pid}"); + Command::new("systemd-notify") + .args([status, &pid_opt]) + .stdin(Stdio::null()) + .status() + .map_err(|err| WatchdogError::CommandExecError { + cmd: String::from("systemd-notify"), + from: err, + }) +} + +fn get_watchdog_sec(service_file: &str) -> Result<u64, WatchdogError> { + let entry = parse_entry(service_file)?; + if let Some(interval) = entry.section("Service").attr("WatchdogSec") { + interval.parse().map_err({ + eprintln!( + "Failed to parse the to WatchdogSec to integer from {}", + service_file + ); + WatchdogError::ParseWatchdogSecToInt + }) + } else { + Err(WatchdogError::NoWatchdogSec { + file: service_file.to_string(), + }) + } +} |