summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--crates/core/tedge_agent/Cargo.toml1
-rw-r--r--crates/core/tedge_agent/src/agent.rs33
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs11
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs36
-rw-r--r--docs/src/howto-guides/020_monitor_tedge_health2
6 files changed, 66 insertions, 18 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 84d520aa..ef2e82a6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2775,6 +2775,7 @@ version = "0.6.1"
dependencies = [
"agent_interface",
"anyhow",
+ "assert-json-diff",
"assert_cmd",
"async-trait",
"clap 3.1.6",
diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml
index 76bde633..63a2ba5a 100644
--- a/crates/core/tedge_agent/Cargo.toml
+++ b/crates/core/tedge_agent/Cargo.toml
@@ -44,6 +44,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
anyhow = "1.0"
assert_cmd = "2.0"
+assert-json-diff = "2.0"
once_cell = "1.8"
mqtt_tests = { path = "../../tests/mqtt_tests" }
predicates = "2.1"
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index f5a1be14..df41368f 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -16,6 +16,8 @@ use agent_interface::{
use flockfile::{check_another_instance_is_not_running, Flockfile};
use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter};
use plugin_sm::plugin_manager::{ExternalPlugins, Plugins};
+use serde_json::json;
+use std::process;
use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, LogPathDefaultSetting,
@@ -28,7 +30,6 @@ use tracing::{debug, error, info, instrument, warn};
const SM_PLUGINS: &str = "sm-plugins";
const AGENT_LOG_PATH: &str = "tedge/agent";
-const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[cfg(not(test))]
const INIT_COMMAND: &str = "init";
@@ -295,8 +296,13 @@ impl SmAgent {
debug!("Request {:?}", message);
match &message.topic {
topic if self.config.request_topics_health.accept_topic(topic) => {
+ let health_status = json!({
+ "status": "up",
+ "pid": process::id()
+ })
+ .to_string();
let health_message =
- Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP);
+ Message::new(&self.config.response_topic_health, health_status);
let _ = responses.publish(health_message).await;
}
@@ -640,6 +646,9 @@ mod tests {
use std::io::Write;
use std::path::PathBuf;
+ use assert_json_diff::assert_json_include;
+ use serde_json::Value;
+
use super::*;
const SLASH_RUN_PATH_TEDGE_AGENT_RESTART: &str = "tedge_agent/tedge_agent_restart";
@@ -770,12 +779,11 @@ mod tests {
/// test health check request response contract
async fn health_check() -> Result<(), AgentError> {
let (responses, mut response_sink) = mqtt_tests::output_stream();
- let expected_responses = vec![message(
- r#"tedge/health/tedge-agent"#,
- r#"{"status": "up"}"#,
- )];
- let mut requests =
- mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await;
+ let mut requests = mqtt_tests::input_stream(vec![
+ message("tedge/health-check/tedge-agent", ""),
+ message("tedge/health-check", ""),
+ ])
+ .await;
let (dir, tedge_config_location) = create_temp_tedge_config().unwrap();
@@ -801,7 +809,14 @@ mod tests {
});
let responses = responses.collect().await;
- assert_eq!(expected_responses, responses);
+ assert_eq!(responses.len(), 2);
+
+ for response in responses {
+ assert_eq!(response.topic.name, "tedge/health/tedge-agent");
+ let health_status: Value = serde_json::from_slice(response.payload_bytes())?;
+ assert_json_include!(actual: &health_status, expected: json!({"status": "up"}));
+ assert!(health_status["pid"].is_number());
+ }
Ok(())
}
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index 3a755a0f..f927f887 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -1,5 +1,8 @@
+use std::process;
+
use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher};
use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter};
+use serde_json::json;
use tracing::{error, info, instrument};
use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError};
@@ -15,7 +18,6 @@ const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
const COMMON_HEALTH_CHECK_TOPIC: &str = "tedge/health-check";
const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
-const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[derive(Debug)]
pub struct DeviceMonitorConfig {
@@ -105,7 +107,12 @@ impl DeviceMonitor {
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
if health_check_topics.accept(&message) {
- let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP);
+ let health_status = json!({
+ "status": "up",
+ "pid": process::id()
+ })
+ .to_string();
+ let health_message = Message::new(&health_status_topic, health_status);
let _ = output_messages.send(health_message).await;
} else {
match CollectdMessage::parse_from(&message) {
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index 51c01078..9309a31e 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -1,4 +1,4 @@
-use std::time::Duration;
+use std::{process, time::Duration};
use crate::core::{converter::*, error::*};
@@ -6,10 +6,10 @@ use mqtt_channel::{
Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
+use serde_json::json;
use tracing::{error, info, instrument};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
-const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
pub async fn create_mapper(
app_name: &str,
@@ -131,7 +131,12 @@ impl Mapper {
async fn process_message(&mut self, message: Message) {
if self.health_check_topics.accept(&message) {
- let health_message = Message::new(&self.health_status_topic, HEALTH_STATUS_UP);
+ let health_status = json!({
+ "status": "up",
+ "pid": process::id()
+ })
+ .to_string();
+ let health_message = Message::new(&self.health_status_topic, health_status);
let _ = self.output.send(health_message).await;
} else {
let converted_messages = self.converter.convert(&message).await;
@@ -145,8 +150,10 @@ impl Mapper {
#[cfg(test)]
mod tests {
use super::*;
+ use assert_json_diff::assert_json_include;
use async_trait::async_trait;
use mqtt_channel::{Message, Topic, TopicFilter};
+ use serde_json::Value;
use std::time::Duration;
use tokio::time::sleep;
@@ -219,15 +226,32 @@ mod tests {
let health_check_topic = format!("tedge/health-check/{name}");
let health_topic = format!("tedge/health/{name}");
- let actual = broker
+ let health_status = broker
.wait_for_response_on_publish(
&health_check_topic,
"",
&health_topic,
Duration::from_secs(1),
)
- .await;
- assert_eq!(actual.unwrap(), HEALTH_STATUS_UP);
+ .await
+ .expect("JSON status message");
+ let health_status: Value = serde_json::from_str(health_status.as_str())?;
+ assert_json_include!(actual: &health_status, expected: json!({"status": "up"}));
+ assert!(health_status["pid"].is_number());
+
+ let common_health_check_topic = "tedge/health-check";
+ let health_status = broker
+ .wait_for_response_on_publish(
+ &common_health_check_topic,
+ "",
+ &health_topic,
+ Duration::from_secs(1),
+ )
+ .await
+ .expect("JSON status message");
+ let health_status: Value = serde_json::from_str(health_status.as_str())?;
+ assert_json_include!(actual: &health_status, expected: json!({"status": "up"}));
+ assert!(health_status["pid"].is_number());
Ok(())
}
diff --git a/docs/src/howto-guides/020_monitor_tedge_health b/docs/src/howto-guides/020_monitor_tedge_health
index 19f50315..3cdea621 100644
--- a/docs/src/howto-guides/020_monitor_tedge_health
+++ b/docs/src/howto-guides/020_monitor_tedge_health
@@ -16,7 +16,7 @@ The daemon will then respond back on the topic:
with the following payload:
```json
-{ "status": "up" }
+{ "status": "up", "pid": <process id of the daemon> }
```
All daemons will also respond to health checks sent to the common health check endpoint `tedge/health-check`.