summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-03-30 11:11:40 +0530
committerGitHub <noreply@github.com>2022-03-30 11:11:40 +0530
commit358437d14166fb32448f99f158963bedcf9236f1 (patch)
tree9f03ac20206a6c56f6b8295af2d5967338fd756d /crates/core
parentf8d1f56977c371a6cab5af6b3deda1f4e9052897 (diff)
parent3a40a44e7d0ed69a5fd1270c58efc08f959b72d2 (diff)
Merge PR #1025 MQTT health endpoints for tedge-daemons
Closes #769 MQTT health endpoint for tedge-daemons
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/agent_interface/src/lib.rs6
-rw-r--r--crates/core/agent_interface/src/messages.rs4
-rw-r--r--crates/core/tedge_agent/src/agent.rs72
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs34
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs88
5 files changed, 167 insertions, 37 deletions
diff --git a/crates/core/agent_interface/src/lib.rs b/crates/core/agent_interface/src/lib.rs
index 147b2e88..fd5972c6 100644
--- a/crates/core/agent_interface/src/lib.rs
+++ b/crates/core/agent_interface/src/lib.rs
@@ -6,9 +6,9 @@ pub mod topic;
pub use download::*;
pub use error::*;
pub use messages::{
- control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
- RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareRequestResponse,
- SoftwareUpdateRequest, SoftwareUpdateResponse,
+ control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus,
+ RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse,
+ SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
pub use software::*;
diff --git a/crates/core/agent_interface/src/messages.rs b/crates/core/agent_interface/src/messages.rs
index e124bf78..4df6f41d 100644
--- a/crates/core/agent_interface/src/messages.rs
+++ b/crates/core/agent_interface/src/messages.rs
@@ -25,6 +25,10 @@ where
}
}
+pub const fn health_check_topic() -> &'static str {
+ "tedge/health-check/tedge-agent"
+}
+
pub const fn software_filter_topic() -> &'static str {
"tedge/commands/req/software/#"
}
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 00c1c71a..0528cfb9 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -8,9 +8,10 @@ use crate::{
},
};
use agent_interface::{
- control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
- RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse,
- SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse,
+ control_filter_topic, health_check_topic, software_filter_topic, Jsonify, OperationStatus,
+ RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest,
+ SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
+ SoftwareUpdateResponse,
};
use flockfile::{check_another_instance_is_not_running, Flockfile};
use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter};
@@ -27,6 +28,7 @@ use tracing::{debug, error, info, instrument, warn};
const SM_PLUGINS: &str = "sm-plugins";
const AGENT_LOG_PATH: &str = "tedge/agent";
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[cfg(not(test))]
const INIT_COMMAND: &str = "init";
@@ -38,10 +40,12 @@ const INIT_COMMAND: &str = "echo";
pub struct SmAgentConfig {
pub errors_topic: Topic,
pub mqtt_config: mqtt_channel::Config,
+ pub request_topic_health: Topic,
pub request_topic_list: Topic,
pub request_topic_update: Topic,
pub request_topics: TopicFilter,
pub request_topic_restart: Topic,
+ pub response_topic_health: Topic,
pub response_topic_list: Topic,
pub response_topic_update: Topic,
pub response_topic_restart: Topic,
@@ -58,9 +62,17 @@ impl Default for SmAgentConfig {
let mqtt_config = mqtt_channel::Config::default();
- let request_topics = vec![software_filter_topic(), control_filter_topic()]
- .try_into()
- .expect("Invalid topic filter");
+ let request_topics = vec![
+ health_check_topic(),
+ software_filter_topic(),
+ control_filter_topic(),
+ ]
+ .try_into()
+ .expect("Invalid topic filter");
+
+ let request_topic_health = Topic::new_unchecked(health_check_topic());
+
+ let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent");
let request_topic_list =
Topic::new(SoftwareListRequest::topic_name()).expect("Invalid topic");
@@ -93,9 +105,11 @@ impl Default for SmAgentConfig {
Self {
errors_topic,
mqtt_config,
+ request_topic_health,
request_topic_list,
request_topic_update,
request_topics,
+ response_topic_health,
response_topic_list,
response_topic_update,
request_topic_restart,
@@ -281,6 +295,12 @@ impl SmAgent {
while let Some(message) = requests.next().await {
debug!("Request {:?}", message);
match &message.topic {
+ topic if topic == &self.config.request_topic_health => {
+ let health_message =
+ Message::new(&self.config.response_topic_health, HEALTH_STATUS_UP);
+ let _ = responses.publish(health_message).await;
+ }
+
topic if topic == &self.config.request_topic_list => {
let _success = self
.handle_software_list_request(
@@ -746,4 +766,44 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ /// test health check request response contract
+ async fn health_check() -> Result<(), AgentError> {
+ let (responses, mut response_sink) = mqtt_tests::output_stream();
+ let expected_responses = vec![message(
+ r#"tedge/health/tedge-agent"#,
+ r#"{"status": "up"}"#,
+ )];
+ let mut requests =
+ mqtt_tests::input_stream(vec![message(r#"tedge/health-check/tedge-agent"#, "")]).await;
+
+ let (dir, tedge_config_location) = create_temp_tedge_config().unwrap();
+
+ tokio::spawn(async move {
+ let mut agent = SmAgent::try_new(
+ "tedge_agent_test",
+ SmAgentConfig::try_new(tedge_config_location).unwrap(),
+ )
+ .unwrap();
+
+ let plugins = Arc::new(Mutex::new(
+ ExternalPlugins::open(
+ PathBuf::from(&dir.path()).join("sm-plugins"),
+ get_default_plugin(&agent.config.config_location).unwrap(),
+ Some("sudo".into()),
+ )
+ .unwrap(),
+ ));
+ let () = agent
+ .process_subscribed_messages(&mut requests, &mut response_sink, &plugins)
+ .await
+ .unwrap();
+ });
+
+ let responses = responses.collect().await;
+ assert_eq!(expected_responses, responses);
+
+ Ok(())
+ }
}
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index 93ea68cd..6063dca6 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -12,6 +12,9 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w
const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
+const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
+const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
#[derive(Debug)]
pub struct DeviceMonitorConfig {
@@ -64,8 +67,13 @@ impl DeviceMonitor {
#[instrument(skip(self), name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
+ let health_check_topic = TopicFilter::new_unchecked(HEALTH_CHECK_TOPIC);
+ let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC);
+
+ let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
+ input_topic.add_all(health_check_topic.clone());
+
let mqtt_config = mqtt_channel::Config::new(
self.device_monitor_config.host.to_string(),
self.device_monitor_config.port,
@@ -90,19 +98,25 @@ impl DeviceMonitor {
});
let mut collectd_messages = mqtt_client.received;
+ let mut output_messages = mqtt_client.published.clone();
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
- match CollectdMessage::parse_from(&message) {
- Ok(collectd_message) => {
- for msg in collectd_message {
- let batch_input = BatchDriverInput::Event(msg);
- if let Err(err) = msg_send.send(batch_input).await {
- error!("Error while processing a collectd message: {}", err);
+ if health_check_topic.accept(&message) {
+ let health_message = Message::new(&health_status_topic, HEALTH_STATUS_UP);
+ let _ = output_messages.send(health_message).await;
+ } else {
+ match CollectdMessage::parse_from(&message) {
+ Ok(collectd_message) => {
+ for msg in collectd_message {
+ let batch_input = BatchDriverInput::Event(msg);
+ if let Err(err) = msg_send.send(batch_input).await {
+ error!("Error while processing a collectd message: {}", err);
+ }
}
}
- }
- Err(err) => {
- error!("Error while decoding a collectd message: {}", err);
+ Err(err) => {
+ error!("Error while decoding a collectd message: {}", err);
+ }
}
}
}
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index f0691397..5208f04b 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -3,12 +3,13 @@ use std::time::Duration;
use crate::core::{converter::*, error::*};
use mqtt_channel::{
- Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver,
+ Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
use tracing::{error, info, instrument};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
+const HEALTH_STATUS_UP: &str = r#"{"status": "up"}"#;
pub async fn create_mapper(
app_name: &str,
@@ -19,17 +20,16 @@ pub async fn create_mapper(
info!("{} starting", app_name);
let mapper_config = converter.get_mapper_config();
- let mqtt_client = Connection::new(&mqtt_config(
- app_name,
- &mqtt_host,
- mqtt_port,
- mapper_config.in_topic_filter.clone(),
- )?)
- .await?;
+ let mut topic_filter = mapper_config.in_topic_filter.clone();
+ topic_filter.add(format!("tedge/health-check/{}", app_name).as_str())?;
+
+ let mqtt_client =
+ Connection::new(&mqtt_config(app_name, &mqtt_host, mqtt_port, topic_filter)?).await?;
Mapper::subscribe_errors(mqtt_client.errors);
Ok(Mapper::new(
+ app_name.to_string(),
mqtt_client.received,
mqtt_client.published,
converter,
@@ -40,13 +40,13 @@ pub fn mqtt_config(
name: &str,
host: &str,
port: u16,
- topics: TopicFilter,
+ topic_filter: TopicFilter,
) -> Result<mqtt_channel::Config, anyhow::Error> {
Ok(mqtt_channel::Config::default()
.with_host(host)
.with_port(port)
.with_session_name(name)
- .with_subscriptions(topics)
+ .with_subscriptions(topic_filter)
.with_max_packet_size(10 * 1024 * 1024))
}
@@ -54,18 +54,26 @@ pub struct Mapper {
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
+ health_check_topic: TopicFilter,
+ health_topic: Topic,
}
impl Mapper {
pub fn new(
+ name: String,
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
) -> Self {
+ let health_check_topic =
+ TopicFilter::new_unchecked(format!("tedge/health-check/{}", name).as_str());
+ let health_topic = Topic::new_unchecked(format!("tedge/health/{}", name).as_str());
Self {
input,
output,
converter,
+ health_check_topic,
+ health_topic,
}
}
@@ -114,9 +122,14 @@ impl Mapper {
}
async fn process_message(&mut self, message: Message) {
- let converted_messages = self.converter.convert(&message).await;
- for converted_message in converted_messages.into_iter() {
- let _ = self.output.send(converted_message).await;
+ if self.health_check_topic.accept(&message) {
+ let health_message = Message::new(&self.health_topic, HEALTH_STATUS_UP);
+ let _ = self.output.send(health_message).await;
+ } else {
+ let converted_messages = self.converter.convert(&message).await;
+ for converted_message in converted_messages.into_iter() {
+ let _ = self.output.send(converted_message).await;
+ }
}
}
}
@@ -143,11 +156,12 @@ mod tests {
.with_subscriptions(TopicFilter::new_unchecked("in_topic"));
let mqtt_client = Connection::new(&mqtt_config).await?;
- let mut mapper = Mapper {
- input: mqtt_client.received,
- output: mqtt_client.published,
- converter: Box::new(UppercaseConverter::new()),
- };
+ let mut mapper = Mapper::new(
+ name.to_string(),
+ mqtt_client.received,
+ mqtt_client.published,
+ Box::new(UppercaseConverter::new()),
+ );
// Let's run the mapper in the background
tokio::spawn(async move {
@@ -177,6 +191,44 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ #[serial_test::serial]
+ async fn health_check() -> Result<(), anyhow::Error> {
+ // Given an MQTT broker
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ // Given a mapper
+ let name = "mapper_under_test";
+
+ let mut mapper = create_mapper(
+ name,
+ "localhost".to_string(),
+ broker.port,
+ Box::new(UppercaseConverter::new()),
+ )
+ .await?;
+
+ // Let's run the mapper in the background
+ tokio::spawn(async move {
+ let _ = mapper.run().await;
+ });
+ sleep(Duration::from_secs(1)).await;
+
+ let health_check_topic = format!("tedge/health-check/{name}");
+ let health_topic = format!("tedge/health/{name}");
+ let actual = broker
+ .wait_for_response_on_publish(
+ &health_check_topic,
+ "",
+ &health_topic,
+ Duration::from_secs(1),
+ )
+ .await;
+ assert_eq!(actual.unwrap(), HEALTH_STATUS_UP);
+
+ Ok(())
+ }
+
struct UppercaseConverter {
mapper_config: MapperConfig,
}