summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_watchdog/src/systemd_watchdog.rs
blob: 7782510370ac07745ff6827104b251640a5f82fb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use crate::error::WatchdogError;
use freedesktop_entry_parser::parse_entry;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use mqtt_channel::{Config, Message, PubChannel, Topic};
use nanoid::nanoid;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use std::{
    path::PathBuf,
    process::{self, Command, ExitStatus, Stdio},
};
use tedge_config::{
    ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting,
    TEdgeConfigLocation,
};
use tracing::{error, info, warn};

#[derive(Serialize, Deserialize)]
pub struct HealthStatus {
    status: String,
    pid: u32,
}

pub async fn start_watchdog(tedge_config_dir: PathBuf) -> Result<(), anyhow::Error> {
    // Send ready notification to systemd.
    notify_systemd(process::id(), "--ready")?;

    let tedge_services = vec![
        "tedge-mapper-c8y",
        "tedge-mapper-az",
        "tedge-mapper-collectd",
        "tedge-agent",
    ];

    let watchdog_tasks = FuturesUnordered::new();

    for service in tedge_services {
        match get_watchdog_sec(&format!("/lib/systemd/system/{service}.service")) {
            Ok(interval) => {
                let req_topic = format!("tedge/health-check/{service}");
                let res_topic = format!("tedge/health/{service}");
                let tedge_config_location =
                    tedge_config::TEdgeConfigLocation::from_custom_root(tedge_config_dir.clone());

                watchdog_tasks.push(tokio::spawn(async move {
                    monitor_tedge_service(
                        tedge_config_location,
                        service,
                        &req_topic,
                        &res_topic,
                        interval / 2,
                    )
                    .await
                }));
            }

            Err(_) => {
                warn!("Watchdog is not enabled for {}", service);
                continue;
            }
        }
    }
    futures::future::join_all(watchdog_tasks).await;
    Ok(())
}

async fn monitor_tedge_service(
    tedge_config_location: TEdgeConfigLocation,
    name: &str,
    req_topic: &str,
    res_topic: &str,
    interval: u64,
) -> Result<(), WatchdogError> {
    let client_id: &str = &format!("{}_{}", name, nanoid!());
    let mqtt_config = get_mqtt_config(tedge_config_location, client_id)?
        .with_subscriptions(res_topic.try_into()?);
    let client = mqtt_channel::Connection::new(&mqtt_config).await?;
    let mut received = client.received;
    let mut publisher = client.published;

    info!("Starting watchdog for {} service", name);

    loop {
        let message = Message::new(&Topic::new(req_topic)?, "");
        let _ = publisher
            .publish(message)
            .await
            .map_err(|e| warn!("Publish failed with error: {}", e));

        let start = Instant::now();

        match tokio::time::timeout(tokio::time::Duration::from_secs(interval), received.next())
            .await
        {
            Ok(Some(msg)) => {
                let message = msg.payload_str()?;

                let p: HealthStatus = serde_json::from_str(message)?;

                notify_systemd(p.pid, "WATCHDOG=1")?;
            }
            Ok(None) => {}
            Err(elapsed) => {
                warn!("The {name} failed with {elapsed}");
            }
        }

        let elapsed = start.elapsed();
        if elapsed < tokio::time::Duration::from_secs(interval) {
            tokio::time::sleep(tokio::time::Duration::from_secs(interval) - elapsed).await;
        }
    }
}

fn get_mqtt_config(
    tedge_config_location: TEdgeConfigLocation,
    client_id: &str,
) -> Result<Config, WatchdogError> {
    let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location);
    let tedge_config = config_repository.load()?;
    let mqtt_config = Config::default()
        .with_session_name(client_id)
        .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
        .with_port(tedge_config.query(MqttPortSetting)?.into());
    Ok(mqtt_config)
}

fn notify_systemd(pid: u32, status: &str) -> Result<ExitStatus, WatchdogError> {
    let pid_opt = format!("--pid={pid}");
    Command::new("systemd-notify")
        .args([status, &pid_opt])
        .stdin(Stdio::null())
        .status()
        .map_err(|err| WatchdogError::CommandExecError {
            cmd: String::from("systemd-notify"),
            from: err,
        })
}

fn get_watchdog_sec(service_file: &str) -> Result<u64, WatchdogError> {
    let entry = parse_entry(service_file)?;
    if let Some(interval) = entry.section("Service").attr("WatchdogSec") {
        match interval.parse::<u64>() {
            Ok(i) => Ok(i),
            Err(e) => {
                error!(
                    "Failed to parse the to WatchdogSec to integer from {}",
                    service_file
                );
                Err(WatchdogError::ParseWatchdogSecToInt(e))
            }
        }
    } else {
        Err(WatchdogError::NoWatchdogSec {
            file: service_file.to_string(),
        })
    }
}