diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-31 11:18:24 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-31 12:13:45 +0530 |
commit | 83a5f2835d0db87dc66ff52d0681f39c063d9f3f (patch) | |
tree | 0d59a4e33e07bc5dd0d1b38b1e59d5d2d59c1848 | |
parent | ea8c0d823bf4d9acd22ac9c84d4b2c7a9c273374 (diff) |
Include PID in health status message of tedge-daemons
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/core/tedge_agent/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 33 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 11 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 36 | ||||
-rw-r--r-- | docs/src/howto-guides/020_monitor_tedge_health | 2 |
6 files changed, 66 insertions, 18 deletions
@@ -2775,6 +2775,7 @@ version = "0.6.1" dependencies = [ "agent_interface", "anyhow", + "assert-json-diff", "assert_cmd", "async-trait", "clap 3.1.6", diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml index 76bde633..63a2ba5a 100644 --- a/crates/core/tedge_agent/Cargo.toml +++ b/crates/core/tedge_agent/Cargo.toml @@ -44,6 +44,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] anyhow = "1.0" assert_cmd = "2.0" +assert-json-diff = "2.0" once_cell = "1.8" mqtt_tests = { path = "../../tests/mqtt_tests" } predicates = "2.1" diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index f5a1be14..df41368f 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -16,6 +16,8 @@ use agent_interface::{ use flockfile::{check_another_instance_is_not_running, Flockfile}; use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter}; use plugin_sm::plugin_manager::{ExternalPlugins, Plugins}; +use serde_json::json; +use std::process; use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, LogPathDefaultSetting, @@ -28,7 +30,6 @@ use tracing::{debug, error, info, instrument, warn}; const SM_PLUGINS: &str = "sm-plugins"; const AGENT_LOG_PATH: &str = "tedge/agent"; -const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[cfg(not(test))] const INIT_COMMAND: &str = "init"; @@ -295,8 +296,13 @@ impl SmAgent { debug!("Request {:?}", message); match &message.topic { topic if self.config.request_topics_health.accept_topic(topic) => { + let health_status = json!({ + "status": "up", + "pid": process::id() + }) + .to_string(); let health_message = - Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP); + Message::new(&self.config.response_topic_health, health_status); let _ = responses.publish(health_message).await; } @@ -640,6 +646,9 @@ mod tests { use std::io::Write; use std::path::PathBuf; + use assert_json_diff::assert_json_include; + use serde_json::Value; + use super::*; const SLASH_RUN_PATH_TEDGE_AGENT_RESTART: &str = "tedge_agent/tedge_agent_restart"; @@ -770,12 +779,11 @@ mod tests { /// test health check request response contract async fn health_check() -> Result<(), AgentError> { let (responses, mut response_sink) = mqtt_tests::output_stream(); - let expected_responses = vec![message( - r#"tedge/health/tedge-agent"#, - r#"{"status": "up"}"#, - )]; - let mut requests = - mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await; + let mut requests = mqtt_tests::input_stream(vec![ + message("tedge/health-check/tedge-agent", ""), + message("tedge/health-check", ""), + ]) + .await; let (dir, tedge_config_location) = create_temp_tedge_config().unwrap(); @@ -801,7 +809,14 @@ mod tests { }); let responses = responses.collect().await; - assert_eq!(expected_responses, responses); + assert_eq!(responses.len(), 2); + + for response in responses { + assert_eq!(response.topic.name, "tedge/health/tedge-agent"); + let health_status: Value = serde_json::from_slice(response.payload_bytes())?; + assert_json_include!(actual: &health_status, expected: json!({"status": "up"})); + assert!(health_status["pid"].is_number()); + } Ok(()) } diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index 3a755a0f..f927f887 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -1,5 +1,8 @@ +use std::process; + use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher}; use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter}; +use serde_json::json; use tracing::{error, info, instrument}; use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError}; @@ -15,7 +18,6 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; const COMMON_HEALTH_CHECK_TOPIC: &str = "tedge/health-check"; const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd"; const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd"; -const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[derive(Debug)] pub struct DeviceMonitorConfig { @@ -105,7 +107,12 @@ impl DeviceMonitor { let input_join_handle = tokio::task::spawn(async move { while let Some(message) = collectd_messages.next().await { if health_check_topics.accept(&message) { - let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP); + let health_status = json!({ + "status": "up", + "pid": process::id() + }) + .to_string(); + let health_message = Message::new(&health_status_topic, health_status); let _ = output_messages.send(health_message).await; } else { match CollectdMessage::parse_from(&message) { diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index 51c01078..9309a31e 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{process, time::Duration}; use crate::core::{converter::*, error::*}; @@ -6,10 +6,10 @@ use mqtt_channel::{ Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver, UnboundedSender, }; +use serde_json::json; use tracing::{error, info, instrument}; const SYNC_WINDOW: Duration = Duration::from_secs(3); -const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; pub async fn create_mapper( app_name: &str, @@ -131,7 +131,12 @@ impl Mapper { async fn process_message(&mut self, message: Message) { if self.health_check_topics.accept(&message) { - let health_message = Message::new(&self.health_status_topic, HEALTH_STATUS_UP); + let health_status = json!({ + "status": "up", + "pid": process::id() + }) + .to_string(); + let health_message = Message::new(&self.health_status_topic, health_status); let _ = self.output.send(health_message).await; } else { let converted_messages = self.converter.convert(&message).await; @@ -145,8 +150,10 @@ impl Mapper { #[cfg(test)] mod tests { use super::*; + use assert_json_diff::assert_json_include; use async_trait::async_trait; use mqtt_channel::{Message, Topic, TopicFilter}; + use serde_json::Value; use std::time::Duration; use tokio::time::sleep; @@ -219,15 +226,32 @@ mod tests { let health_check_topic = format!("tedge/health-check/{name}"); let health_topic = format!("tedge/health/{name}"); - let actual = broker + let health_status = broker .wait_for_response_on_publish( &health_check_topic, "", &health_topic, Duration::from_secs(1), ) - .await; - assert_eq!(actual.unwrap(), HEALTH_STATUS_UP); + .await + .expect("JSON status message"); + let health_status: Value = serde_json::from_str(health_status.as_str())?; + assert_json_include!(actual: &health_status, expected: json!({"status": "up"})); + assert!(health_status["pid"].is_number()); + + let common_health_check_topic = "tedge/health-check"; + let health_status = broker + .wait_for_response_on_publish( + &common_health_check_topic, + "", + &health_topic, + Duration::from_secs(1), + ) + .await + .expect("JSON status message"); + let health_status: Value = serde_json::from_str(health_status.as_str())?; + assert_json_include!(actual: &health_status, expected: json!({"status": "up"})); + assert!(health_status["pid"].is_number()); Ok(()) } diff --git a/docs/src/howto-guides/020_monitor_tedge_health b/docs/src/howto-guides/020_monitor_tedge_health index 19f50315..3cdea621 100644 --- a/docs/src/howto-guides/020_monitor_tedge_health +++ b/docs/src/howto-guides/020_monitor_tedge_health @@ -16,7 +16,7 @@ The daemon will then respond back on the topic: with the following payload: ```json -{ "status": "up" } +{ "status": "up", "pid": <process id of the daemon> } ``` All daemons will also respond to health checks sent to the common health check endpoint `tedge/health-check`. |