diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-03-30 11:11:40 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-30 11:11:40 +0530 |
commit | 358437d14166fb32448f99f158963bedcf9236f1 (patch) | |
tree | 9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core | |
parent | f8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff) | |
parent | 3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff) |
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core')
-rw-r--r-- | crates/core/agent_interface/src/lib.rs | 6 | ||||
-rw-r--r-- | crates/core/agent_interface/src/messages.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 72 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 34 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 88 |
5 files changed, 167 insertions, 37 deletions
diff --git a/crates/core/agent_interface/src/lib.rs b/crates/core/agent_interface/src/lib.rs index 147b2e88..fd5972c6 100644 --- a/crates/core/agent_interface/src/lib.rs +++ b/crates/core/agent_interface/src/lib.rs @@ -6,9 +6,9 @@ pub mod topic; pub use download::*; pub use error::*; pub use messages::{ - control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest, - RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareRequestResponse, - SoftwareUpdateRequest, SoftwareUpdateResponse, + control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus, + RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, + SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse, }; pub use software::*; diff --git a/crates/core/agent_interface/src/messages.rs b/crates/core/agent_interface/src/messages.rs index e124bf78..4df6f41d 100644 --- a/crates/core/agent_interface/src/messages.rs +++ b/crates/core/agent_interface/src/messages.rs @@ -25,6 +25,10 @@ where } } +pub const fn health_check_topic() -> &'static str { + "tedge/health-check/tedge-agent" +} + pub const fn software_filter_topic() -> &'static str { "tedge/commands/req/software/#" } diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 00c1c71a..0528cfb9 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -8,9 +8,10 @@ use crate::{ }, }; use agent_interface::{ - control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest, - RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse, - SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse, + control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus, + RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest, + SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, + SoftwareUpdateResponse, }; use flockfile::{check_another_instance_is_not_running, Flockfile}; use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter}; @@ -27,6 +28,7 @@ use tracing::{debug, error, info, instrument, warn}; const SM_PLUGINS: &str = "sm-plugins"; const AGENT_LOG_PATH: &str = "tedge/agent"; +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[cfg(not(test))] const INIT_COMMAND: &str = "init"; @@ -38,10 +40,12 @@ const INIT_COMMAND: &str = "echo"; pub struct SmAgentConfig { pub errors_topic: Topic, pub mqtt_config: mqtt_channel::Config, + pub request_topic_health: Topic, pub request_topic_list: Topic, pub request_topic_update: Topic, pub request_topics: TopicFilter, pub request_topic_restart: Topic, + pub response_topic_health: Topic, pub response_topic_list: Topic, pub response_topic_update: Topic, pub response_topic_restart: Topic, @@ -58,9 +62,17 @@ impl Default for SmAgentConfig { let mqtt_config = mqtt_channel::Config::default(); - let request_topics = vec![software_filter_topic(), control_filter_topic()] - .try_into() - .expect("Invalid topic filter"); + let request_topics = vec![ + health_check_topic(), + software_filter_topic(), + control_filter_topic(), + ] + .try_into() + .expect("Invalid topic filter"); + + let request_topic_health = Topic::new_unchecked(health_check_topic()); + + let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent"); let request_topic_list = Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic"); @@ -93,9 +105,11 @@ impl Default for SmAgentConfig { Self { errors_topic, mqtt_config, + request_topic_health, request_topic_list, request_topic_update, request_topics, + response_topic_health, response_topic_list, response_topic_update, request_topic_restart, @@ -281,6 +295,12 @@ impl SmAgent { while let Some(message) = requests.next().await { debug!("Request {:?}", message); match &message.topic { + topic if topic == &self.config.request_topic_health => { + let health_message = + Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP); + let _ = responses.publish(health_message).await; + } + topic if topic == &self.config.request_topic_list => { let _success = self .handle_software_list_request( @@ -746,4 +766,44 @@ mod tests { Ok(()) } + + #[tokio::test] + /// test health check request response contract + async fn health_check() -> Result<(), AgentError> { + let (responses, mut response_sink) = mqtt_tests::output_stream(); + let expected_responses = vec![message( + r#"tedge/health/tedge-agent"#, + r#"{"status": "up"}"#, + )]; + let mut requests = + mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await; + + let (dir, tedge_config_location) = create_temp_tedge_config().unwrap(); + + tokio::spawn(async move { + let mut agent = SmAgent::try_new( + "tedge_agent_test", + SmAgentConfig::try_new(tedge_config_location).unwrap(), + ) + .unwrap(); + + let plugins = Arc::new(Mutex::new( + ExternalPlugins::open( + PathBuf::from(&dir.path()).join("sm-plugins"), + get_default_plugin(&agent.config.config_location).unwrap(), + Some("sudo".into()), + ) + .unwrap(), + )); + let () = agent + .process_subscribed_messages(&mut requests, &mut response_sink, &plugins) + .await + .unwrap(); + }); + + let responses = responses.collect().await; + assert_eq!(expected_responses, responses); + + Ok(()) + } } diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index 93ea68cd..6063dca6 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; +const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd"; +const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd"; +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; #[derive(Debug)] pub struct DeviceMonitorConfig { @@ -64,8 +67,13 @@ impl DeviceMonitor { #[instrument(skip(self), name = "monitor")] pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? + let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC); + let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC); + + let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); + input_topic.add_all(health_check_topic.clone()); + let mqtt_config = mqtt_channel::Config::new( self.device_monitor_config.host.to_string(), self.device_monitor_config.port, @@ -90,19 +98,25 @@ impl DeviceMonitor { }); let mut collectd_messages = mqtt_client.received; + let mut output_messages = mqtt_client.published.clone(); let input_join_handle = tokio::task::spawn(async move { while let Some(message) = collectd_messages.next().await { - match CollectdMessage::parse_from(&message) { - Ok(collectd_message) => { - for msg in collectd_message { - let batch_input = BatchDriverInput::Event(msg); - if let Err(err) = msg_send.send(batch_input).await { - error!("Error while processing a collectd message: {}", err); + if health_check_topic.accept(&message) { + let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP); + let _ = output_messages.send(health_message).await; + } else { + match CollectdMessage::parse_from(&message) { + Ok(collectd_message) => { + for msg in collectd_message { + let batch_input = BatchDriverInput::Event(msg); + if let Err(err) = msg_send.send(batch_input).await { + error!("Error while processing a collectd message: {}", err); + } } } - } - Err(err) => { - error!("Error while decoding a collectd message: {}", err); + Err(err) => { + error!("Error while decoding a collectd message: {}", err); + } } } } diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index f0691397..5208f04b 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -3,12 +3,13 @@ use std::time::Duration; use crate::core::{converter::*, error::*}; use mqtt_channel::{ - Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver, + Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver, UnboundedSender, }; use tracing::{error, info, instrument}; const SYNC_WINDOW: Duration = Duration::from_secs(3); +const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#; pub async fn create_mapper( app_name: &str, @@ -19,17 +20,16 @@ pub async fn create_mapper( info!("{} starting", app_name); let mapper_config = converter.get_mapper_config(); - let mqtt_client = Connection::new(&mqtt_config( - app_name, - &mqtt_host, - mqtt_port, - mapper_config.in_topic_filter.clone(), - )?) - .await?; + let mut topic_filter = mapper_config.in_topic_filter.clone(); + topic_filter.add(format!("tedge/health-check/{}", app_name).as_str())?; + + let mqtt_client = + Connection::new(&mqtt_config(app_name, &mqtt_host, mqtt_port, topic_filter)?).await?; Mapper::subscribe_errors(mqtt_client.errors); Ok(Mapper::new( + app_name.to_string(), mqtt_client.received, mqtt_client.published, converter, @@ -40,13 +40,13 @@ pub fn mqtt_config( name: &str, host: &str, port: u16, - topics: TopicFilter, + topic_filter: TopicFilter, ) -> Result<mqtt_channel::Config, anyhow::Error> { Ok(mqtt_channel::Config::default() .with_host(host) .with_port(port) .with_session_name(name) - .with_subscriptions(topics) + .with_subscriptions(topic_filter) .with_max_packet_size(10 * 1024 * 1024)) } @@ -54,18 +54,26 @@ pub struct Mapper { input: UnboundedReceiver<Message>, output: UnboundedSender<Message>, converter: Box<dyn Converter<Error = ConversionError>>, + health_check_topic: TopicFilter, + health_topic: Topic, } impl Mapper { pub fn new( + name: String, input: UnboundedReceiver<Message>, output: UnboundedSender<Message>, converter: Box<dyn Converter<Error = ConversionError>>, ) -> Self { + let health_check_topic = + TopicFilter::new_unchecked(format!("tedge/health-check/{}", name).as_str()); + let health_topic = Topic::new_unchecked(format!("tedge/health/{}", name).as_str()); Self { input, output, converter, + health_check_topic, + health_topic, } } @@ -114,9 +122,14 @@ impl Mapper { } async fn process_message(&mut self, message: Message) { - let converted_messages = self.converter.convert(&message).await; - for converted_message in converted_messages.into_iter() { - let _ = self.output.send(converted_message).await; + if self.health_check_topic.accept(&message) { + let health_message = Message::new(&self.health_topic, HEALTH_STATUS_UP); + let _ = self.output.send(health_message).await; + } else { + let converted_messages = self.converter.convert(&message).await; + for converted_message in converted_messages.into_iter() { + let _ = self.output.send(converted_message).await; + } } } } @@ -143,11 +156,12 @@ mod tests { .with_subscriptions(TopicFilter::new_unchecked("in_topic")); let mqtt_client = Connection::new(&mqtt_config).await?; - let mut mapper = Mapper { - input: mqtt_client.received, - output: mqtt_client.published, - converter: Box::new(UppercaseConverter::new()), - }; + let mut mapper = Mapper::new( + name.to_string(), + mqtt_client.received, + mqtt_client.published, + Box::new(UppercaseConverter::new()), + ); // Let's run the mapper in the background tokio::spawn(async move { @@ -177,6 +191,44 @@ mod tests { Ok(()) } + #[tokio::test] + #[serial_test::serial] + async fn health_check() -> Result<(), anyhow::Error> { + // Given an MQTT broker + let broker = mqtt_tests::test_mqtt_broker(); + + // Given a mapper + let name = "mapper_under_test"; + + let mut mapper = create_mapper( + name, + "localhost".to_string(), + broker.port, + Box::new(UppercaseConverter::new()), + ) + .await?; + + // Let's run the mapper in the background + tokio::spawn(async move { + let _ = mapper.run().await; + }); + sleep(Duration::from_secs(1)).await; + + let health_check_topic = format!("tedge/health-check/{name}"); + let health_topic = format!("tedge/health/{name}"); + let actual = broker + .wait_for_response_on_publish( + &health_check_topic, + "", + &health_topic, + Duration::from_secs(1), + ) + .await; + assert_eq!(actual.unwrap(), HEALTH_STATUS_UP); + + Ok(()) + } + struct UppercaseConverter { mapper_config: MapperConfig, } |