summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-07-29 12:55:03 +0530
committerGitHub <noreply@github.com>2022-07-29 12:55:03 +0530
commit64d300de6b872821e9f435908b941bdd5f191df1 (patch)
tree88b7925f743337cf8f05b1d90f92bfae51f6d0e9
parent7305240fbc3578441aaec1a41e7a3faa264c9efb (diff)
MQTT health endpoints for tedge plugin extensions (#1299)
* tedge watchdog for c8y-log-plugin and c8y-config-plugin This PR also refactors the health check by removing the duplicate code. Pushed the duplicate code to one place and reused it across all the thin-edge services. Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
-rw-r--r--Cargo.lock4
-rw-r--r--crates/common/mqtt_channel/src/topics.rs12
-rw-r--r--crates/core/agent_interface/src/lib.rs6
-rw-r--r--crates/core/agent_interface/src/messages.rs4
-rw-r--r--crates/core/tedge_agent/Cargo.toml1
-rw-r--r--crates/core/tedge_agent/src/agent.rs31
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs25
-rw-r--r--crates/core/tedge_mapper/src/core/mapper.rs37
-rw-r--r--crates/core/tedge_watchdog/src/systemd_watchdog.rs2
-rw-r--r--crates/core/thin_edge_json/Cargo.toml1
-rw-r--r--crates/core/thin_edge_json/src/health.rs29
-rw-r--r--crates/core/thin_edge_json/src/lib.rs1
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs129
-rw-r--r--plugins/c8y_log_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_log_plugin/src/main.rs94
16 files changed, 215 insertions, 163 deletions
diff --git a/Cargo.lock b/Cargo.lock
index dd521590..042a2c4f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -409,6 +409,7 @@ dependencies = [
"tedge_test_utils",
"tedge_utils",
"test-case",
+ "thin_edge_json",
"thiserror",
"tokio",
"toml",
@@ -438,6 +439,7 @@ dependencies = [
"tedge_utils",
"tempfile",
"test-case",
+ "thin_edge_json",
"thiserror",
"time",
"tokio",
@@ -2825,6 +2827,7 @@ dependencies = [
"tedge_config",
"tedge_test_utils",
"tedge_utils",
+ "thin_edge_json",
"thiserror",
"time",
"tokio",
@@ -3054,6 +3057,7 @@ dependencies = [
"criterion",
"json-writer",
"mockall",
+ "mqtt_channel",
"proptest",
"serde",
"serde_json",
diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs
index 156376dc..c9415e4e 100644
--- a/crates/common/mqtt_channel/src/topics.rs
+++ b/crates/common/mqtt_channel/src/topics.rs
@@ -168,6 +168,18 @@ impl From<Topic> for String {
}
}
+impl TryInto<TopicFilter> for Vec<String> {
+ type Error = MqttError;
+
+ fn try_into(self) -> Result<TopicFilter, Self::Error> {
+ let mut filter = TopicFilter::empty();
+ for pattern in self.into_iter() {
+ filter.add(pattern.as_str())?
+ }
+ Ok(filter)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/core/agent_interface/src/lib.rs b/crates/core/agent_interface/src/lib.rs
index 745201f2..147b2e88 100644
--- a/crates/core/agent_interface/src/lib.rs
+++ b/crates/core/agent_interface/src/lib.rs
@@ -6,9 +6,9 @@ pub mod topic;
pub use download::*;
pub use error::*;
pub use messages::{
- control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus,
- RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse,
- SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse,
+ control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
+ RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareRequestResponse,
+ SoftwareUpdateRequest, SoftwareUpdateResponse,
};
pub use software::*;
diff --git a/crates/core/agent_interface/src/messages.rs b/crates/core/agent_interface/src/messages.rs
index eddf119e..e124bf78 100644
--- a/crates/core/agent_interface/src/messages.rs
+++ b/crates/core/agent_interface/src/messages.rs
@@ -25,10 +25,6 @@ where
}
}
-pub fn health_check_topics() -> Vec<&'static str> {
- vec!["tedge/health-check", "tedge/health-check/tedge-agent"]
-}
-
pub const fn software_filter_topic() -> &'static str {
"tedge/commands/req/software/#"
}
diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml
index 2127507e..d20b972a 100644
--- a/crates/core/tedge_agent/Cargo.toml
+++ b/crates/core/tedge_agent/Cargo.toml
@@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../common/tedge_config" }
tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] }
+thin_edge_json = { path = "../../core/thin_edge_json" }
thiserror = "1.0"
time = { version = "0.3", features = ["formatting"] }
tokio = { version = "1.8", features = ["fs","process", "rt", "rt-multi-thread"] }
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 243463f1..66573692 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -7,10 +7,9 @@ use crate::{
},
};
use agent_interface::{
- control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus,
- RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest,
- SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest,
- SoftwareUpdateResponse,
+ control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest,
+ RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse,
+ SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse,
};
use flockfile::{check_another_instance_is_not_running, Flockfile};
use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter};
@@ -18,8 +17,8 @@ use plugin_sm::{
operation_logs::{LogKind, OperationLogs},
plugin_manager::{ExternalPlugins, Plugins},
};
-use serde_json::json;
-use std::process::{self, Command};
+
+use std::process::Command;
use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, LogPathSetting,
@@ -27,7 +26,8 @@ use tedge_config::{
TEdgeConfigLocation, TmpPathSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH,
};
use tedge_utils::file::create_directory_with_user_group;
-use time::OffsetDateTime;
+use thin_edge_json::health::{health_check_topics, send_health_status};
+
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, warn};
@@ -70,9 +70,8 @@ impl Default for SmAgentConfig {
.try_into()
.expect("Invalid topic filter");
- let request_topics_health: TopicFilter = health_check_topics()
- .try_into()
- .expect("Invalid topic filter");
+ let request_topics_health: TopicFilter = health_check_topics("tedge-agent");
+
request_topics.add_all(request_topics_health.clone());
let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent");
@@ -295,15 +294,7 @@ impl SmAgent {
debug!("Request {:?}", message);
match &message.topic {
topic if self.config.request_topics_health.accept_topic(topic) => {
- let health_status = json!({
- "status": "up",
- "pid": process::id(),
- "time": OffsetDateTime::now_utc().unix_timestamp(),
- })
- .to_string();
- let health_message =
- Message::new(&self.config.response_topic_health, health_status);
- let _ = responses.publish(health_message).await;
+ send_health_status(responses, "tedge-agent").await;
}
topic if topic == &self.config.request_topic_list => {
@@ -667,7 +658,7 @@ mod tests {
use std::path::PathBuf;
use assert_json_diff::assert_json_include;
- use serde_json::Value;
+ use serde_json::{json, Value};
use super::*;
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index b21f6dcf..2419ebb6 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -1,13 +1,9 @@
-use std::process;
-
+use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError};
use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher};
use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter};
-use serde_json::json;
-use time::OffsetDateTime;
+use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info, instrument};
-use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError};
-
const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: u16 = 1883;
const DEFAULT_MQTT_CLIENT_ID: &str = "collectd-mapper";
@@ -16,9 +12,6 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w
const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
-const COMMON_HEALTH_CHECK_TOPIC: &str = "tedge/health-check";
-const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd";
-const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd";
#[derive(Debug)]
pub struct DeviceMonitorConfig {
@@ -71,10 +64,7 @@ impl DeviceMonitor {
#[instrument(skip(self), name = "monitor")]
pub async fn run(&self) -> Result<(), DeviceMonitorError> {
- let health_check_topics: TopicFilter = vec![COMMON_HEALTH_CHECK_TOPIC, HEALTH_CHECK_TOPIC]
- .try_into()
- .expect("Valid health topics");
- let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC);
+ let health_check_topics: TopicFilter = health_check_topics("tedge-mapper-collectd");
let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)?
.with_qos(QoS::AtMostOnce);
@@ -108,14 +98,7 @@ impl DeviceMonitor {
let input_join_handle = tokio::task::spawn(async move {
while let Some(message) = collectd_messages.next().await {
if health_check_topics.accept(&message) {
- let health_status = json!({
- "status": "up",
- "pid": process::id(),
- "time": OffsetDateTime::now_utc().unix_timestamp(),
- })
- .to_string();
- let health_message = Message::new(&health_status_topic, health_status);
- let _ = output_messages.send(health_message).await;
+ send_health_status(&mut output_messages, "tedge-mapper-collectd").await;
} else {
match CollectdMessage::parse_from(&message) {
Ok(collectd_message) => {
diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs
index b9144ea5..f103126a 100644
--- a/crates/core/tedge_mapper/src/core/mapper.rs
+++ b/crates/core/tedge_mapper/src/core/mapper.rs
@@ -1,14 +1,15 @@
use crate::c8y::dynamic_discovery::*;
use crate::core::{converter::*, error::*};
use mqtt_channel::{
- Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver,
+ Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver,
UnboundedSender,
};
-use serde_json::json;
+
use std::path::Path;
-use std::{process, time::Duration};
+use std::time::Duration;
use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent};
-use time::OffsetDateTime;
+use thin_edge_json::health::{health_check_topics, send_health_status};
+
use tracing::{error, info, instrument, warn};
const SYNC_WINDOW: Duration = Duration::from_secs(3);
use std::result::Result::Ok;
@@ -21,14 +22,7 @@ pub async fn create_mapper(
) -> Result<Mapper, anyhow::Error> {
info!("{} starting", app_name);
- let health_check_topics: TopicFilter = vec![
- "tedge/health-check",
- format!("tedge/health-check/{}", app_name).as_str(),
- ]
- .try_into()
- .expect("health check topics must be valid");
-
- let health_status_topic = Topic::new_unchecked(format!("tedge/health/{}", app_name).as_str());
+ let health_check_topics: TopicFilter = health_check_topics(app_name);
let mapper_config = converter.get_mapper_config();
let mut topic_filter = mapper_config.in_topic_filter.clone();
@@ -40,11 +34,11 @@ pub async fn create_mapper(
Mapper::subscribe_errors(mqtt_client.errors);
Ok(Mapper::new(
+ app_name.to_string(),
mqtt_client.received,
mqtt_client.published,
converter,
health_check_topics,
- health_status_topic,
))
}
@@ -63,27 +57,27 @@ pub fn mqtt_config(
}
pub struct Mapper {
+ mapper_name: String,
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
health_check_topics: TopicFilter,
- health_status_topic: Topic,
}
impl Mapper {
pub fn new(
+ mapper_name: String,
input: UnboundedReceiver<Message>,
output: UnboundedSender<Message>,
converter: Box<dyn Converter<Error = ConversionError>>,
health_check_topics: TopicFilter,
- health_status_topic: Topic,
) -> Self {
Self {
+ mapper_name,
input,
output,
converter,
health_check_topics,
- health_status_topic,
}
}
@@ -129,14 +123,7 @@ impl Mapper {
async fn process_message(&mut self, message: Message) {
if self.health_check_topics.accept(&message) {
- let health_status = json!({
- "status": "up",
- "pid": process::id(),
- "time": OffsetDateTime::now_utc().unix_timestamp(),
- })
- .to_string();
- let health_message = Message::new(&self.health_status_topic, health_status);
- let _ = self.output.send(health_message).await;
+ send_health_status(&mut self.output, &self.mapper_name).await;
} else {
let converted_messages = self.converter.convert(&message).await;
@@ -241,6 +228,8 @@ mod tests {
Ok(())
}
+ #[cfg(test)]
+ use serde_json::json;
#[tokio::test]
#[serial_test::serial]
async fn health_check() -> Result<(), anyhow::Error> {
diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
index 43ed9af3..093bf393 100644
--- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs
+++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs
@@ -68,6 +68,8 @@ async fn start_watchdog_for_tedge_services(tedge_config_dir: PathBuf) {
"tedge-mapper-az",
"tedge-mapper-collectd",
"tedge-agent",
+ "c8y-log-plugin",
+ "c8y-configuration-plugin",
];
let watchdog_tasks = FuturesUnordered::new();
diff --git a/crates/core/thin_edge_json/Cargo.toml b/crates/core/thin_edge_json/Cargo.toml
index 77602b38..1817629e 100644
--- a/crates/core/thin_edge_json/Cargo.toml
+++ b/crates/core/thin_edge_json/Cargo.toml
@@ -10,6 +10,7 @@ rust-version = "1.58.1"
[dependencies]
clock = { path = "../../common/clock" }
json-writer = { path = "../../common/json_writer" }
+mqtt_channel = { path = "../../common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
thiserror = "1.0"
diff --git a/crates/core/thin_edge_json/src/health.rs b/crates/core/thin_edge_json/src/health.rs
new file mode 100644
index 00000000..20f9d5a3
--- /dev/null
+++ b/crates/core/thin_edge_json/src/health.rs
@@ -0,0 +1,29 @@
+use std::process;
+
+use mqtt_channel::{Message, PubChannel, Topic, TopicFilter};
+use serde_json::json;
+use time::OffsetDateTime;
+
+pub fn health_check_topics(daemon_name: &str) -> TopicFilter {
+ vec![
+ "tedge/health-check".into(),
+ format!("tedge/health-check/{daemon_name}"),
+ ]
+ .try_into()
+ .expect("Invalid topic filter")
+}
+
+pub async fn send_health_status(responses: &mut impl PubChannel, daemon_name: &str) {
+ let response_topic_health =
+ Topic::new_unchecked(format!("tedge/health/{daemon_name}").as_str());
+
+ let health_status = json!({
+ "status": "up",
+ "pid": process::id(),
+ "time": OffsetDateTime::now_utc().unix_timestamp(),
+ })
+ .to_string();
+
+ let health_message = Message::new(&response_topic_health, health_status);
+ let _ = responses.send(health_message).await;
+}
diff --git a/crates/core/thin_edge_json/src/lib.rs b/crates/core/thin_edge_json/src/lib.rs
index 8915bf14..99716ad4 100644
--- a/crates/core/thin_edge_json/src/lib.rs
+++ b/crates/core/thin_edge_json/src/lib.rs
@@ -6,6 +6,7 @@ pub mod builder;
pub mod data;
pub mod event;
pub mod group;
+pub mod health;
pub mod measurement;
pub mod parser;
pub mod serialize;
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index 8b0eeb50..b54ff0f0 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -26,6 +26,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 173ebd3c..665d9f1c 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -13,13 +13,15 @@ use c8y_smartrest::smartrest_deserializer::{
};
use c8y_smartrest::topic::C8yTopic;
use clap::Parser;
-use mqtt_channel::{Message, SinkExt, StreamExt, Topic};
+use mqtt_channel::{Connection, Message, SinkExt, StreamExt, Topic};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, MqttPortSetting, TEdgeConfig, TmpPathSetting,
DEFAULT_TEDGE_CONFIG_PATH,
};
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use thin_edge_json::health::{health_check_topics, send_health_status};
+
use tracing::{debug, error, info};
pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml";
@@ -66,6 +68,7 @@ async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection,
mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
let _ = topic_filter
.add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str());
+ let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin"));
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
@@ -135,60 +138,85 @@ async fn run(
let () = mqtt_client.published.send(msg).await?;
// Mqtt message loop
+ process_mqtt_message(
+ &mut plugin_config,
+ &mut mqtt_client,
+ config_file_path,
+ http_client,
+ tmp_dir,
+ )
+ .await?;
+
+ mqtt_client.close().await;
+
+ Ok(())
+}
+
+async fn process_mqtt_message(
+ plugin_config: &mut PluginConfig,
+ mqtt_client: &mut Connection,
+ config_file_path: &Path,
+ http_client: &mut impl C8YHttpProxy,
+ tmp_dir: PathBuf,
+) -> Result<(), anyhow::Error> {
+ let health_check_topics = health_check_topics("c8y-configuration-plugin");
while let Some(message) = mqtt_client.received.next().await {
debug!("Received {:?}", message);
- if let Ok(payload) = message.payload_str() {
- let result = if let "tedge/configuration_change/c8y-configuration-plugin" =
- message.topic.name.as_str()
- {
- // Reload the plugin config file
- plugin_config = PluginConfig::new(config_file_path);
- // Resend the supported config types
- let msg = plugin_config.to_supported_config_types_message()?;
- mqtt_client.published.send(msg).await?;
- Ok(())
- } else {
- match payload.split(',').next().unwrap_or_default() {
- "524" => {
- let maybe_config_download_request =
- SmartRestConfigDownloadRequest::from_smartrest(payload);
- if let Ok(config_download_request) = maybe_config_download_request {
- handle_config_download_request(
- &plugin_config,
- config_download_request,
- tmp_dir.clone(),
- &mut mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Download SmartREST payload: {}", payload);
- Ok(())
+ if health_check_topics.accept(&message) {
+ send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await;
+ } else if let Ok(payload) = message.payload_str() {
+ let result = match message.topic.name.as_str() {
+ "tedge/configuration_change/c8y-configuration-plugin" => {
+ // Reload the plugin config file
+ let plugin_config = PluginConfig::new(config_file_path);
+ // Resend the supported config types
+ let msg = plugin_config.to_supported_config_types_message()?;
+ mqtt_client.published.send(msg).await?;
+ Ok(())
+ }
+ _ => {
+ match payload.split(',').next().unwrap_or_default() {
+ "524" => {
+ let maybe_config_download_request =
+ SmartRestConfigDownloadRequest::from_smartrest(payload);
+ if let Ok(config_download_request) = maybe_config_download_request {
+ handle_config_download_request(
+ plugin_config,
+ config_download_request,
+ tmp_dir.clone(),
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Download SmartREST payload: {}", payload);
+ Ok(())
+ }
}
- }
- "526" => {
- // retrieve config file upload smartrest request from payload
- let maybe_config_upload_request =
- SmartRestConfigUploadRequest::from_smartrest(payload);
-
- if let Ok(config_upload_request) = maybe_config_upload_request {
- // handle the config file upload request
- handle_config_upload_request(
- &plugin_config,
- config_upload_request,
- &mut mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Upload SmartREST payload: {}", payload);
+ "526" => {
+ // retrieve config file upload smartrest request from payload
+ let maybe_config_upload_request =
+ SmartRestConfigUploadRequest::from_smartrest(payload);
+
+ if let Ok(config_upload_request) = maybe_config_upload_request {
+ // handle the config file upload request
+ handle_config_upload_request(
+ plugin_config,
+ config_upload_request,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Upload SmartREST payload: {}", payload);
+ Ok(())
+ }
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
Ok(())
}
}
- _ => {
- // Ignore operation messages not meant for this plugin
- Ok(())
- }
}
};
@@ -197,9 +225,6 @@ async fn run(
}
}
}
-
- mqtt_client.close().await;
-
Ok(())
}
diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml
index 884add27..8a35e792 100644
--- a/plugins/c8y_log_plugin/Cargo.toml
+++ b/plugins/c8y_log_plugin/Cargo.toml
@@ -30,6 +30,7 @@ serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
time = { version = "0.3" }
+thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
toml = "0.5"
diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs
index 9ca5066d..5e7dcc4b 100644
--- a/plugins/c8y_log_plugin/src/main.rs
+++ b/plugins/c8y_log_plugin/src/main.rs
@@ -12,13 +12,14 @@ use clap::Parser;
use config::LogPluginConfig;
use inotify::{EventMask, EventStream};
use inotify::{Inotify, WatchMask};
-use mqtt_channel::{Connection, StreamExt};
+use mqtt_channel::{Connection, Message, StreamExt, TopicFilter};
use std::path::{Path, PathBuf};
use tedge_config::{
ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig,
DEFAULT_TEDGE_CONFIG_PATH,
};
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
+use thin_edge_json::health::{health_check_topics, send_health_status};
use tracing::{error, info};
use crate::logfile_request::{
@@ -59,7 +60,9 @@ async fn create_mqtt_client(
tedge_config: &TEdgeConfig,
) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
- let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
+ let mut topics: TopicFilter = health_check_topics("c8y-log-plugin");
+
+ topics.add_unchecked(C8yTopic::SmartRestRequest.as_str());
// subscribing also to c8y bridge health topic to know when the bridge is up
topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?;
@@ -99,49 +102,16 @@ async fn run(
) -> Result<(), anyhow::Error> {
let mut plugin_config = LogPluginConfig::default();
let mut inotify_stream = create_inofity_file_watch_stream(config_file)?;
+ let health_check_topics = health_check_topics("c8y-log-plugin");
loop {
tokio::select! {
message = mqtt_client.received.next() => {
if let Some(message) = message {
- if is_c8y_bridge_up(&message) {
- plugin_config = read_log_config(config_file);
- let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?;
- }
- if let Ok(payload) = message.payload_str() {
- let result = match payload.split(',').next().unwrap_or_default() {
- "522" => {
- info!("Log request received: {payload}");
- // retrieve smartrest object from payload
- let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload);
- if let Ok(smartrest_obj) = maybe_smartrest_obj {
- handle_logfile_request_operation(
- &smartrest_obj,
- &plugin_config,
- mqtt_client,
- http_client,
- )
- .await
- } else {
-