diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-07-29 12:55:03 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-29 12:55:03 +0530 |
commit | 64d300de6b872821e9f435908b941bdd5f191df1 (patch) | |
tree | 88b7925f743337cf8f05b1d90f92bfae51f6d0e9 | |
parent | 7305240fbc3578441aaec1a41e7a3faa264c9efb (diff) |
MQTT health endpoints for tedge plugin extensions (#1299)
* tedge watchdog for c8y-log-plugin and c8y-config-plugin
This PR also refactors the health check by removing the duplicate code. Pushed the duplicate code to one place and
reused it across all the thin-edge services.
Signed-off-by: Pradeep Kumar K J <pradeepkumar.kj@softwareag.com>
-rw-r--r-- | Cargo.lock | 4 | ||||
-rw-r--r-- | crates/common/mqtt_channel/src/topics.rs | 12 | ||||
-rw-r--r-- | crates/core/agent_interface/src/lib.rs | 6 | ||||
-rw-r--r-- | crates/core/agent_interface/src/messages.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_agent/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 31 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 25 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/mapper.rs | 37 | ||||
-rw-r--r-- | crates/core/tedge_watchdog/src/systemd_watchdog.rs | 2 | ||||
-rw-r--r-- | crates/core/thin_edge_json/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/thin_edge_json/src/health.rs | 29 | ||||
-rw-r--r-- | crates/core/thin_edge_json/src/lib.rs | 1 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 129 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_log_plugin/src/main.rs | 94 |
16 files changed, 215 insertions, 163 deletions
@@ -409,6 +409,7 @@ dependencies = [ "tedge_test_utils", "tedge_utils", "test-case", + "thin_edge_json", "thiserror", "tokio", "toml", @@ -438,6 +439,7 @@ dependencies = [ "tedge_utils", "tempfile", "test-case", + "thin_edge_json", "thiserror", "time", "tokio", @@ -2825,6 +2827,7 @@ dependencies = [ "tedge_config", "tedge_test_utils", "tedge_utils", + "thin_edge_json", "thiserror", "time", "tokio", @@ -3054,6 +3057,7 @@ dependencies = [ "criterion", "json-writer", "mockall", + "mqtt_channel", "proptest", "serde", "serde_json", diff --git a/crates/common/mqtt_channel/src/topics.rs b/crates/common/mqtt_channel/src/topics.rs index 156376dc..c9415e4e 100644 --- a/crates/common/mqtt_channel/src/topics.rs +++ b/crates/common/mqtt_channel/src/topics.rs @@ -168,6 +168,18 @@ impl From<Topic> for String { } } +impl TryInto<TopicFilter> for Vec<String> { + type Error = MqttError; + + fn try_into(self) -> Result<TopicFilter, Self::Error> { + let mut filter = TopicFilter::empty(); + for pattern in self.into_iter() { + filter.add(pattern.as_str())? + } + Ok(filter) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/agent_interface/src/lib.rs b/crates/core/agent_interface/src/lib.rs index 745201f2..147b2e88 100644 --- a/crates/core/agent_interface/src/lib.rs +++ b/crates/core/agent_interface/src/lib.rs @@ -6,9 +6,9 @@ pub mod topic; pub use download::*; pub use error::*; pub use messages::{ - control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus, - RestartOperationRequest, RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, - SoftwareRequestResponse, SoftwareUpdateRequest, SoftwareUpdateResponse, + control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest, + RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareRequestResponse, + SoftwareUpdateRequest, SoftwareUpdateResponse, }; pub use software::*; diff --git a/crates/core/agent_interface/src/messages.rs b/crates/core/agent_interface/src/messages.rs index eddf119e..e124bf78 100644 --- a/crates/core/agent_interface/src/messages.rs +++ b/crates/core/agent_interface/src/messages.rs @@ -25,10 +25,6 @@ where } } -pub fn health_check_topics() -> Vec<&'static str> { - vec!["tedge/health-check", "tedge/health-check/tedge-agent"] -} - pub const fn software_filter_topic() -> &'static str { "tedge/commands/req/software/#" } diff --git a/crates/core/tedge_agent/Cargo.toml b/crates/core/tedge_agent/Cargo.toml index 2127507e..d20b972a 100644 --- a/crates/core/tedge_agent/Cargo.toml +++ b/crates/core/tedge_agent/Cargo.toml @@ -34,6 +34,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../common/tedge_config" } tedge_utils = { path = "../../common/tedge_utils", features = ["logging"] } +thin_edge_json = { path = "../../core/thin_edge_json" } thiserror = "1.0" time = { version = "0.3", features = ["formatting"] } tokio = { version = "1.8", features = ["fs","process", "rt", "rt-multi-thread"] } diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 243463f1..66573692 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -7,10 +7,9 @@ use crate::{ }, }; use agent_interface::{ - control_filter_topic, health_check_topics, software_filter_topic, Jsonify, OperationStatus, - RestartOperationRequest, RestartOperationResponse, SoftwareError, SoftwareListRequest, - SoftwareListResponse, SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, - SoftwareUpdateResponse, + control_filter_topic, software_filter_topic, Jsonify, OperationStatus, RestartOperationRequest, + RestartOperationResponse, SoftwareError, SoftwareListRequest, SoftwareListResponse, + SoftwareRequestResponse, SoftwareType, SoftwareUpdateRequest, SoftwareUpdateResponse, }; use flockfile::{check_another_instance_is_not_running, Flockfile}; use mqtt_channel::{Connection, Message, PubChannel, StreamExt, SubChannel, Topic, TopicFilter}; @@ -18,8 +17,8 @@ use plugin_sm::{ operation_logs::{LogKind, OperationLogs}, plugin_manager::{ExternalPlugins, Plugins}, }; -use serde_json::json; -use std::process::{self, Command}; + +use std::process::Command; use std::{convert::TryInto, fmt::Debug, path::PathBuf, sync::Arc}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, ConfigSettingAccessorStringExt, LogPathSetting, @@ -27,7 +26,8 @@ use tedge_config::{ TEdgeConfigLocation, TmpPathSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH, }; use tedge_utils::file::create_directory_with_user_group; -use time::OffsetDateTime; +use thin_edge_json::health::{health_check_topics, send_health_status}; + use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, warn}; @@ -70,9 +70,8 @@ impl Default for SmAgentConfig { .try_into() .expect("Invalid topic filter"); - let request_topics_health: TopicFilter = health_check_topics() - .try_into() - .expect("Invalid topic filter"); + let request_topics_health: TopicFilter = health_check_topics("tedge-agent"); + request_topics.add_all(request_topics_health.clone()); let response_topic_health = Topic::new_unchecked("tedge/health/tedge-agent"); @@ -295,15 +294,7 @@ impl SmAgent { debug!("Request {:?}", message); match &message.topic { topic if self.config.request_topics_health.accept_topic(topic) => { - let health_status = json!({ - "status": "up", - "pid": process::id(), - "time": OffsetDateTime::now_utc().unix_timestamp(), - }) - .to_string(); - let health_message = - Message::new(&self.config.response_topic_health, health_status); - let _ = responses.publish(health_message).await; + send_health_status(responses, "tedge-agent").await; } topic if topic == &self.config.request_topic_list => { @@ -667,7 +658,7 @@ mod tests { use std::path::PathBuf; use assert_json_diff::assert_json_include; - use serde_json::Value; + use serde_json::{json, Value}; use super::*; diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index b21f6dcf..2419ebb6 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -1,13 +1,9 @@ -use std::process; - +use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError}; use batcher::{BatchConfigBuilder, BatchDriver, BatchDriverInput, BatchDriverOutput, Batcher}; use mqtt_channel::{Connection, Message, QoS, SinkExt, StreamExt, Topic, TopicFilter}; -use serde_json::json; -use time::OffsetDateTime; +use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info, instrument}; -use super::{batcher::MessageBatch, collectd::CollectdMessage, error::DeviceMonitorError}; - const DEFAULT_HOST: &str = "localhost"; const DEFAULT_PORT: u16 = 1883; const DEFAULT_MQTT_CLIENT_ID: &str = "collectd-mapper"; @@ -16,9 +12,6 @@ const DEFAULT_MAXIMUM_MESSAGE_DELAY: u32 = 400; // Heuristic delay that should w const DEFAULT_MESSAGE_LEAP_LIMIT: u32 = 0; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; -const COMMON_HEALTH_CHECK_TOPIC: &str = "tedge/health-check"; -const HEALTH_CHECK_TOPIC: &str = "tedge/health-check/tedge-mapper-collectd"; -const HEALTH_STATUS_TOPIC: &str = "tedge/health/tedge-mapper-collectd"; #[derive(Debug)] pub struct DeviceMonitorConfig { @@ -71,10 +64,7 @@ impl DeviceMonitor { #[instrument(skip(self), name = "monitor")] pub async fn run(&self) -> Result<(), DeviceMonitorError> { - let health_check_topics: TopicFilter = vec![COMMON_HEALTH_CHECK_TOPIC, HEALTH_CHECK_TOPIC] - .try_into() - .expect("Valid health topics"); - let health_status_topic = Topic::new_unchecked(HEALTH_STATUS_TOPIC); + let health_check_topics: TopicFilter = health_check_topics("tedge-mapper-collectd"); let mut input_topic = TopicFilter::new(self.device_monitor_config.mqtt_source_topic)? .with_qos(QoS::AtMostOnce); @@ -108,14 +98,7 @@ impl DeviceMonitor { let input_join_handle = tokio::task::spawn(async move { while let Some(message) = collectd_messages.next().await { if health_check_topics.accept(&message) { - let health_status = json!({ - "status": "up", - "pid": process::id(), - "time": OffsetDateTime::now_utc().unix_timestamp(), - }) - .to_string(); - let health_message = Message::new(&health_status_topic, health_status); - let _ = output_messages.send(health_message).await; + send_health_status(&mut output_messages, "tedge-mapper-collectd").await; } else { match CollectdMessage::parse_from(&message) { Ok(collectd_message) => { diff --git a/crates/core/tedge_mapper/src/core/mapper.rs b/crates/core/tedge_mapper/src/core/mapper.rs index b9144ea5..f103126a 100644 --- a/crates/core/tedge_mapper/src/core/mapper.rs +++ b/crates/core/tedge_mapper/src/core/mapper.rs @@ -1,14 +1,15 @@ use crate::c8y::dynamic_discovery::*; use crate::core::{converter::*, error::*}; use mqtt_channel::{ - Connection, Message, MqttError, SinkExt, StreamExt, Topic, TopicFilter, UnboundedReceiver, + Connection, Message, MqttError, SinkExt, StreamExt, TopicFilter, UnboundedReceiver, UnboundedSender, }; -use serde_json::json; + use std::path::Path; -use std::{process, time::Duration}; +use std::time::Duration; use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent}; -use time::OffsetDateTime; +use thin_edge_json::health::{health_check_topics, send_health_status}; + use tracing::{error, info, instrument, warn}; const SYNC_WINDOW: Duration = Duration::from_secs(3); use std::result::Result::Ok; @@ -21,14 +22,7 @@ pub async fn create_mapper( ) -> Result<Mapper, anyhow::Error> { info!("{} starting", app_name); - let health_check_topics: TopicFilter = vec![ - "tedge/health-check", - format!("tedge/health-check/{}", app_name).as_str(), - ] - .try_into() - .expect("health check topics must be valid"); - - let health_status_topic = Topic::new_unchecked(format!("tedge/health/{}", app_name).as_str()); + let health_check_topics: TopicFilter = health_check_topics(app_name); let mapper_config = converter.get_mapper_config(); let mut topic_filter = mapper_config.in_topic_filter.clone(); @@ -40,11 +34,11 @@ pub async fn create_mapper( Mapper::subscribe_errors(mqtt_client.errors); Ok(Mapper::new( + app_name.to_string(), mqtt_client.received, mqtt_client.published, converter, health_check_topics, - health_status_topic, )) } @@ -63,27 +57,27 @@ pub fn mqtt_config( } pub struct Mapper { + mapper_name: String, input: UnboundedReceiver<Message>, output: UnboundedSender<Message>, converter: Box<dyn Converter<Error = ConversionError>>, health_check_topics: TopicFilter, - health_status_topic: Topic, } impl Mapper { pub fn new( + mapper_name: String, input: UnboundedReceiver<Message>, output: UnboundedSender<Message>, converter: Box<dyn Converter<Error = ConversionError>>, health_check_topics: TopicFilter, - health_status_topic: Topic, ) -> Self { Self { + mapper_name, input, output, converter, health_check_topics, - health_status_topic, } } @@ -129,14 +123,7 @@ impl Mapper { async fn process_message(&mut self, message: Message) { if self.health_check_topics.accept(&message) { - let health_status = json!({ - "status": "up", - "pid": process::id(), - "time": OffsetDateTime::now_utc().unix_timestamp(), - }) - .to_string(); - let health_message = Message::new(&self.health_status_topic, health_status); - let _ = self.output.send(health_message).await; + send_health_status(&mut self.output, &self.mapper_name).await; } else { let converted_messages = self.converter.convert(&message).await; @@ -241,6 +228,8 @@ mod tests { Ok(()) } + #[cfg(test)] + use serde_json::json; #[tokio::test] #[serial_test::serial] async fn health_check() -> Result<(), anyhow::Error> { diff --git a/crates/core/tedge_watchdog/src/systemd_watchdog.rs b/crates/core/tedge_watchdog/src/systemd_watchdog.rs index 43ed9af3..093bf393 100644 --- a/crates/core/tedge_watchdog/src/systemd_watchdog.rs +++ b/crates/core/tedge_watchdog/src/systemd_watchdog.rs @@ -68,6 +68,8 @@ async fn start_watchdog_for_tedge_services(tedge_config_dir: PathBuf) { "tedge-mapper-az", "tedge-mapper-collectd", "tedge-agent", + "c8y-log-plugin", + "c8y-configuration-plugin", ]; let watchdog_tasks = FuturesUnordered::new(); diff --git a/crates/core/thin_edge_json/Cargo.toml b/crates/core/thin_edge_json/Cargo.toml index 77602b38..1817629e 100644 --- a/crates/core/thin_edge_json/Cargo.toml +++ b/crates/core/thin_edge_json/Cargo.toml @@ -10,6 +10,7 @@ rust-version = "1.58.1" [dependencies] clock = { path = "../../common/clock" } json-writer = { path = "../../common/json_writer" } +mqtt_channel = { path = "../../common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } serde_json = "1" thiserror = "1.0" diff --git a/crates/core/thin_edge_json/src/health.rs b/crates/core/thin_edge_json/src/health.rs new file mode 100644 index 00000000..20f9d5a3 --- /dev/null +++ b/crates/core/thin_edge_json/src/health.rs @@ -0,0 +1,29 @@ +use std::process; + +use mqtt_channel::{Message, PubChannel, Topic, TopicFilter}; +use serde_json::json; +use time::OffsetDateTime; + +pub fn health_check_topics(daemon_name: &str) -> TopicFilter { + vec![ + "tedge/health-check".into(), + format!("tedge/health-check/{daemon_name}"), + ] + .try_into() + .expect("Invalid topic filter") +} + +pub async fn send_health_status(responses: &mut impl PubChannel, daemon_name: &str) { + let response_topic_health = + Topic::new_unchecked(format!("tedge/health/{daemon_name}").as_str()); + + let health_status = json!({ + "status": "up", + "pid": process::id(), + "time": OffsetDateTime::now_utc().unix_timestamp(), + }) + .to_string(); + + let health_message = Message::new(&response_topic_health, health_status); + let _ = responses.send(health_message).await; +} diff --git a/crates/core/thin_edge_json/src/lib.rs b/crates/core/thin_edge_json/src/lib.rs index 8915bf14..99716ad4 100644 --- a/crates/core/thin_edge_json/src/lib.rs +++ b/crates/core/thin_edge_json/src/lib.rs @@ -6,6 +6,7 @@ pub mod builder; pub mod data; pub mod event; pub mod group; +pub mod health; pub mod measurement; pub mod parser; pub mod serialize; diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 8b0eeb50..b54ff0f0 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -26,6 +26,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } +thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 173ebd3c..665d9f1c 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -13,13 +13,15 @@ use c8y_smartrest::smartrest_deserializer::{ }; use c8y_smartrest::topic::C8yTopic; use clap::Parser; -use mqtt_channel::{Message, SinkExt, StreamExt, Topic}; +use mqtt_channel::{Connection, Message, SinkExt, StreamExt, Topic}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, MqttPortSetting, TEdgeConfig, TmpPathSetting, DEFAULT_TEDGE_CONFIG_PATH, }; use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use thin_edge_json::health::{health_check_topics, send_health_status}; + use tracing::{debug, error, info}; pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml"; @@ -66,6 +68,7 @@ async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); let _ = topic_filter .add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str()); + let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin")); let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) @@ -135,60 +138,85 @@ async fn run( let () = mqtt_client.published.send(msg).await?; // Mqtt message loop + process_mqtt_message( + &mut plugin_config, + &mut mqtt_client, + config_file_path, + http_client, + tmp_dir, + ) + .await?; + + mqtt_client.close().await; + + Ok(()) +} + +async fn process_mqtt_message( + plugin_config: &mut PluginConfig, + mqtt_client: &mut Connection, + config_file_path: &Path, + http_client: &mut impl C8YHttpProxy, + tmp_dir: PathBuf, +) -> Result<(), anyhow::Error> { + let health_check_topics = health_check_topics("c8y-configuration-plugin"); while let Some(message) = mqtt_client.received.next().await { debug!("Received {:?}", message); - if let Ok(payload) = message.payload_str() { - let result = if let "tedge/configuration_change/c8y-configuration-plugin" = - message.topic.name.as_str() - { - // Reload the plugin config file - plugin_config = PluginConfig::new(config_file_path); - // Resend the supported config types - let msg = plugin_config.to_supported_config_types_message()?; - mqtt_client.published.send(msg).await?; - Ok(()) - } else { - match payload.split(',').next().unwrap_or_default() { - "524" => { - let maybe_config_download_request = - SmartRestConfigDownloadRequest::from_smartrest(payload); - if let Ok(config_download_request) = maybe_config_download_request { - handle_config_download_request( - &plugin_config, - config_download_request, - tmp_dir.clone(), - &mut mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Download SmartREST payload: {}", payload); - Ok(()) + if health_check_topics.accept(&message) { + send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await; + } else if let Ok(payload) = message.payload_str() { + let result = match message.topic.name.as_str() { + "tedge/configuration_change/c8y-configuration-plugin" => { + // Reload the plugin config file + let plugin_config = PluginConfig::new(config_file_path); + // Resend the supported config types + let msg = plugin_config.to_supported_config_types_message()?; + mqtt_client.published.send(msg).await?; + Ok(()) + } + _ => { + match payload.split(',').next().unwrap_or_default() { + "524" => { + let maybe_config_download_request = + SmartRestConfigDownloadRequest::from_smartrest(payload); + if let Ok(config_download_request) = maybe_config_download_request { + handle_config_download_request( + plugin_config, + config_download_request, + tmp_dir.clone(), + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Download SmartREST payload: {}", payload); + Ok(()) + } } - } - "526" => { - // retrieve config file upload smartrest request from payload - let maybe_config_upload_request = - SmartRestConfigUploadRequest::from_smartrest(payload); - - if let Ok(config_upload_request) = maybe_config_upload_request { - // handle the config file upload request - handle_config_upload_request( - &plugin_config, - config_upload_request, - &mut mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Upload SmartREST payload: {}", payload); + "526" => { + // retrieve config file upload smartrest request from payload + let maybe_config_upload_request = + SmartRestConfigUploadRequest::from_smartrest(payload); + + if let Ok(config_upload_request) = maybe_config_upload_request { + // handle the config file upload request + handle_config_upload_request( + plugin_config, + config_upload_request, + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Upload SmartREST payload: {}", payload); + Ok(()) + } + } + _ => { + // Ignore operation messages not meant for this plugin Ok(()) } } - _ => { - // Ignore operation messages not meant for this plugin - Ok(()) - } } }; @@ -197,9 +225,6 @@ async fn run( } } } - - mqtt_client.close().await; - Ok(()) } diff --git a/plugins/c8y_log_plugin/Cargo.toml b/plugins/c8y_log_plugin/Cargo.toml index 884add27..8a35e792 100644 --- a/plugins/c8y_log_plugin/Cargo.toml +++ b/plugins/c8y_log_plugin/Cargo.toml @@ -30,6 +30,7 @@ serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } time = { version = "0.3" } +thin_edge_json = { path = "../../crates/core/thin_edge_json" } thiserror = "1.0" tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] } toml = "0.5" diff --git a/plugins/c8y_log_plugin/src/main.rs b/plugins/c8y_log_plugin/src/main.rs index 9ca5066d..5e7dcc4b 100644 --- a/plugins/c8y_log_plugin/src/main.rs +++ b/plugins/c8y_log_plugin/src/main.rs @@ -12,13 +12,14 @@ use clap::Parser; use config::LogPluginConfig; use inotify::{EventMask, EventStream}; use inotify::{Inotify, WatchMask}; -use mqtt_channel::{Connection, StreamExt}; +use mqtt_channel::{Connection, Message, StreamExt, TopicFilter}; use std::path::{Path, PathBuf}; use tedge_config::{ ConfigRepository, ConfigSettingAccessor, LogPathSetting, MqttPortSetting, TEdgeConfig, DEFAULT_TEDGE_CONFIG_PATH, }; use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; +use thin_edge_json::health::{health_check_topics, send_health_status}; use tracing::{error, info}; use crate::logfile_request::{ @@ -59,7 +60,9 @@ async fn create_mqtt_client( tedge_config: &TEdgeConfig, ) -> Result<mqtt_channel::Connection, anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); - let mut topics = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); + let mut topics: TopicFilter = health_check_topics("c8y-log-plugin"); + + topics.add_unchecked(C8yTopic::SmartRestRequest.as_str()); // subscribing also to c8y bridge health topic to know when the bridge is up topics.add(C8Y_BRIDGE_HEALTH_TOPIC)?; @@ -99,49 +102,16 @@ async fn run( ) -> Result<(), anyhow::Error> { let mut plugin_config = LogPluginConfig::default(); let mut inotify_stream = create_inofity_file_watch_stream(config_file)?; + let health_check_topics = health_check_topics("c8y-log-plugin"); loop { tokio::select! { message = mqtt_client.received.next() => { if let Some(message) = message { - if is_c8y_bridge_up(&message) { - plugin_config = read_log_config(config_file); - let () = handle_dynamic_log_type_update(&plugin_config, mqtt_client).await?; - } - if let Ok(payload) = message.payload_str() { - let result = match payload.split(',').next().unwrap_or_default() { - "522" => { - info!("Log request received: {payload}"); - // retrieve smartrest object from payload - let maybe_smartrest_obj = SmartRestLogRequest::from_smartrest(payload); - if let Ok(smartrest_obj) = maybe_smartrest_obj { - handle_logfile_request_operation( - &smartrest_obj, - &plugin_config, - mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect SmartREST payload: {}", payload); - Ok(()) - } - } |