summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_agent/src/agent.rs
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-03-30 11:11:40 +0530
committerGitHub <noreply@github.com>2022-03-30 11:11:40 +0530
commit358437d14166fb32448f99f158963bedcf9236f1 (patch)
tree9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core/tedge_agent/src/agent.rs
parentf8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff)
parent3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff)
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core/tedge_agent/src/agent.rs')
-rw-r--r--crates/core/tedge_agent/src/agent.rs72
1 files changed, 66 insertions, 6 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 00c1c71a..0528cfb9 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -8,9 +8,10 @@ use crate::{
},
};
use agent_interface::{
- control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
- RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse,
- SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse,
+ control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus,
+ RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest,
+ SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
+ SoftwareUpdateResponse,
};
use flockfile::{check_another_instance_is_not_running, Flockfile};
use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter};
@@ -27,6 +28,7 @@ use tracing::{debug, error, info, instrument, warn};
const SM_PLUGINS: &str = "sm-plugins";
const AGENT_LOG_PATH: &str = "tedge/agent";
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[cfg(not(test))]
const INIT_COMMAND: &str = "init";
@@ -38,10 +40,12 @@ const INIT_COMMAND: &str = "echo";
pub struct SmAgentConfig {
pub errors_topic: Topic,
pub mqtt_config: mqtt_channel::Config,
+ pub request_topic_health: Topic,
pub request_topic_list: Topic,
pub request_topic_update: Topic,
pub request_topics: TopicFilter,
pub request_topic_restart: Topic,
+ pub response_topic_health: Topic,
pub response_topic_list: Topic,
pub response_topic_update: Topic,
pub response_topic_restart: Topic,
@@ -58,9 +62,17 @@ impl Default for SmAgentConfig {
let mqtt_config = mqtt_channel::Config::default();
- let request_topics = vec![software_filter_topic(), control_filter_topic()]
- .try_into()
- .expect("Invalid topic filter");
+ let request_topics = vec![
+ health_check_topic(),
+ software_filter_topic(),
+ control_filter_topic(),
+ ]
+ .try_into()
+ .expect("Invalid topic filter");
+
+ let request_topic_health = Topic::new_unchecked(health_check_topic());
+
+ let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent");
let request_topic_list =
Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic");
@@ -93,9 +105,11 @@ impl Default for SmAgentConfig {
Self {
errors_topic,
mqtt_config,
+ request_topic_health,
request_topic_list,
request_topic_update,
request_topics,
+ response_topic_health,
response_topic_list,
response_topic_update,
request_topic_restart,
@@ -281,6 +295,12 @@ impl SmAgent {
while let Some(message) = requests.next().await {
debug!("Request {:?}", message);
match &message.topic {
+ topic if topic == &self.config.request_topic_health => {
+ let health_message =
+ Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP);
+ let _ = responses.publish(health_message).await;
+ }
+
topic if topic == &self.config.request_topic_list => {
let _success = self
.handle_software_list_request(
@@ -746,4 +766,44 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ /// test health check request response contract
+ async fn health_check() -> Result<(), AgentError> {
+ let (responses, mut response_sink) = mqtt_tests::output_stream();
+ let expected_responses = vec![message(
+ r#"tedge/health/tedge-agent"#,
+ r#"{"status": "up"}"#,
+ )];
+ let mut requests =
+ mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await;
+
+ let (dir, tedge_config_location) = create_temp_tedge_config().unwrap();
+
+ tokio::spawn(async move {
+ let mut agent = SmAgent::try_new(
+ "tedge_agent_test",
+ SmAgentConfig::try_new(tedge_config_location).unwrap(),
+ )
+ .unwrap();
+
+ let plugins = Arc::new(Mutex::new(
+ ExternalPlugins::open(
+ PathBuf::from(&dir.path()).join("sm-plugins"),
+ get_default_plugin(&agent.config.config_location).unwrap(),
+ Some("sudo".into()),
+ )
+ .unwrap(),
+ ));
+ let () = agent
+ .process_subscribed_messages(&mut requests, &mut response_sink, &plugins)
+ .await
+ .unwrap();
+ });
+
+ let responses = responses.collect().await;
+ assert_eq!(expected_responses, responses);
+
+ Ok(())
+ }
}