diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-30 11:11:40 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-30 11:11:40 +0530 |
commit | 358437d14166fb32448f99f158963bedcf9236f1 (patch) | |
tree | 9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core/tedge_agent | |
parent | f8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff) | |
parent | 3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff) |
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core/tedge_agent')
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 72 |
1 files changed, 66 insertions, 6 deletions
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 00c1c71a..0528cfb9 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -8,9 +8,10 @@ use crate::{ }, }; use agent_interface::{ - control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest, - RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse, - SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse, + control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus, + RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest, + SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, + SoftwareUpdateResponse, }; use flockfile::{check_another_instance_is_not_running, Flockfile}; use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter}; @@ -27,6 +28,7 @@ use tracing::{debug, error, info, instrument, warn}; const SM_PLUGINS: &str = "sm-plugins"; const AGENT_LOG_PATH: &str = "tedge/agent"; +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[cfg(not(test))] const INIT_COMMAND: &str = "init"; @@ -38,10 +40,12 @@ const INIT_COMMAND: &str = "echo"; pub struct SmAgentConfig { pub errors_topic: Topic, pub mqtt_config: mqtt_channel::Config, + pub request_topic_health: Topic, pub request_topic_list: Topic, pub request_topic_update: Topic, pub request_topics: TopicFilter, pub request_topic_restart: Topic, + pub response_topic_health: Topic, pub response_topic_list: Topic, pub response_topic_update: Topic, pub response_topic_restart: Topic, @@ -58,9 +62,17 @@ impl Default for SmAgentConfig { let mqtt_config = mqtt_channel::Config::default(); - let request_topics = vec![software_filter_topic(), control_filter_topic()] - .try_into() - .expect("Invalid topic filter"); + let request_topics = vec![ + health_check_topic(), + software_filter_topic(), + control_filter_topic(), + ] + .try_into() + .expect("Invalid topic filter"); + + let request_topic_health = Topic::new_unchecked(health_check_topic()); + + let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent"); let request_topic_list = Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic"); @@ -93,9 +105,11 @@ impl Default for SmAgentConfig { Self { errors_topic, mqtt_config, + request_topic_health, request_topic_list, request_topic_update, request_topics, + response_topic_health, response_topic_list, response_topic_update, request_topic_restart, @@ -281,6 +295,12 @@ impl SmAgent { while let Some(message) = requests.next().await { debug!("Request {:?}", message); match &message.topic { + topic if topic == &self.config.request_topic_health => { + let health_message = + Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP); + let _ = responses.publish(health_message).await; + } + topic if topic == &self.config.request_topic_list => { let _success = self .handle_software_list_request( @@ -746,4 +766,44 @@ mod tests { Ok(()) } + + #[tokio::test] + /// test health check request response contract + async fn health_check() -> Result<(), AgentError> { + let (responses, mut response_sink) = mqtt_tests::output_stream(); + let expected_responses = vec![message( + r#"tedge/health/tedge-agent"#, + r#"{"status": "up"}"#, + )]; + let mut requests = + mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await; + + let (dir, tedge_config_location) = create_temp_tedge_config().unwrap(); + + tokio::spawn(async move { + let mut agent = SmAgent::try_new( + "tedge_agent_test", + SmAgentConfig::try_new(tedge_config_location).unwrap(), + ) + .unwrap(); + + let plugins = Arc::new(Mutex::new( + ExternalPlugins::open( + PathBuf::from(&dir.path()).join("sm-plugins"), + get_default_plugin(&agent.config.config_location).unwrap(), + Some("sudo".into()), + ) + .unwrap(), + )); + let () = agent + .process_subscribed_messages(&mut requests, &mut response_sink, &plugins) + .await + .unwrap(); + }); + + let responses = responses.collect().await; + assert_eq!(expected_responses, responses); + + Ok(()) + } } |