summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-06-30 15:55:46 +0100
committerinitard <alex.solomes@softwareag.com>2022-07-29 13:12:28 +0100
commit72b8bf44c15df2ca890f8dc40cbc9111ba8bc9e2 (patch)
tree31920b615c0d3e6c7092dd4b0305acf6123c3f07
parent7e29063c59ab37d75333c68ead11cd369562d343 (diff)
implementing inotify crate for c8y_configuration_plugin
Signed-off-by: initard <solo@softwareag.com>
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml2
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs183
2 files changed, 103 insertions, 82 deletions
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index b54ff0f0..20a611a3 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -25,7 +25,7 @@ mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
-tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
+tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging", "fs-notify"] }
thin_edge_json = { path = "../../crates/core/thin_edge_json" }
thiserror = "1.0"
tokio = { version = "1.9", default_features = false, features = [ "fs", "io-util", "macros", "rt-multi-thread","signal"] }
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 665d9f1c..162e9e71 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -22,9 +22,10 @@ use tedge_config::{
use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group};
use thin_edge_json::health::{health_check_topics, send_health_status};
+use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent};
use tracing::{debug, error, info};
-pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml";
+pub const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-configuration-plugin.toml";
pub const DEFAULT_PLUGIN_CONFIG_TYPE: &str = "c8y-configuration-plugin";
pub const CONFIG_CHANGE_TOPIC: &str = "tedge/configuration_change";
@@ -58,16 +59,11 @@ pub struct ConfigPluginOpt {
#[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
pub config_dir: PathBuf,
-
- #[clap(long = "config-file", default_value = DEFAULT_PLUGIN_CONFIG_FILE_PATH)]
- pub config_file: PathBuf,
}
async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mut topic_filter =
mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str());
- let _ = topic_filter
- .add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str());
let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin"));
let mqtt_config = mqtt_channel::Config::default()
@@ -98,7 +94,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Load tedge config from the provided location
let tedge_config_location =
- tedge_config::TEdgeConfigLocation::from_custom_root(config_plugin_opt.config_dir);
+ tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir);
let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
let tedge_config = config_repository.load()?;
@@ -110,7 +106,8 @@ async fn main() -> Result<(), anyhow::Error> {
mqtt_port,
&mut http_client,
tmp_dir,
- &config_plugin_opt.config_file,
+ &config_plugin_opt.config_dir,
+ DEFAULT_PLUGIN_CONFIG_FILE,
)
.await
}
@@ -119,9 +116,11 @@ async fn run(
mqtt_port: u16,
http_client: &mut impl C8YHttpProxy,
tmp_dir: PathBuf,
- config_file_path: &Path,
+ config_dir: &Path,
+ config_file: &str,
) -> Result<(), anyhow::Error> {
- let mut plugin_config = PluginConfig::new(config_file_path);
+ let config_file_path = config_dir.join(config_file);
+ let mut plugin_config = PluginConfig::new(&config_file_path);
let mut mqtt_client = create_mqtt_client(mqtt_port).await?;
@@ -137,22 +136,45 @@ async fn run(
);
let () = mqtt_client.published.send(msg).await?;
- // Mqtt message loop
- process_mqtt_message(
- &mut plugin_config,
- &mut mqtt_client,
- config_file_path,
- http_client,
- tmp_dir,
- )
- .await?;
-
- mqtt_client.close().await;
-
- Ok(())
+ let fs_notification_stream = fs_notify_stream(&[(
+ config_dir,
+ Some(config_file.to_string()),
+ &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created],
+ )])?;
+ pin_mut!(fs_notification_stream);
+
+ loop {
+ tokio::select! {
+ message = mqtt_client.received.next() => {
+ if let Some(message) = message {
+ process_mqtt_message(
+ message,
+ &mut plugin_config,
+ &mut mqtt_client,
+ &config_file_path,
+ http_client,
+ tmp_dir.clone(),
+ )
+ .await?;
+ } else {
+ // message is None and the connection has been closed
+ return Ok(())
+ }
+ }
+ Some(Ok((path, mask))) = fs_notification_stream.next() => {
+ match mask {
+ FileEvent::Modified | FileEvent::Deleted | FileEvent::Created => {
+ plugin_config = PluginConfig::new(&path);
+ let message = plugin_config.to_supported_config_types_message()?;
+ mqtt_client.published.send(message).await?;
+ },
+ }
+ }}
+ }
}
async fn process_mqtt_message(
+ message: Message,
plugin_config: &mut PluginConfig,
mqtt_client: &mut Connection,
config_file_path: &Path,
@@ -160,69 +182,67 @@ async fn process_mqtt_message(
tmp_dir: PathBuf,
) -> Result<(), anyhow::Error> {
let health_check_topics = health_check_topics("c8y-configuration-plugin");
- while let Some(message) = mqtt_client.received.next().await {
- debug!("Received {:?}", message);
- if health_check_topics.accept(&message) {
- send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await;
- } else if let Ok(payload) = message.payload_str() {
- let result = match message.topic.name.as_str() {
- "tedge/configuration_change/c8y-configuration-plugin" => {
- // Reload the plugin config file
- let plugin_config = PluginConfig::new(config_file_path);
- // Resend the supported config types
- let msg = plugin_config.to_supported_config_types_message()?;
- mqtt_client.published.send(msg).await?;
- Ok(())
- }
- _ => {
- match payload.split(',').next().unwrap_or_default() {
- "524" => {
- let maybe_config_download_request =
- SmartRestConfigDownloadRequest::from_smartrest(payload);
- if let Ok(config_download_request) = maybe_config_download_request {
- handle_config_download_request(
- plugin_config,
- config_download_request,
- tmp_dir.clone(),
- mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Download SmartREST payload: {}", payload);
- Ok(())
- }
- }
- "526" => {
- // retrieve config file upload smartrest request from payload
- let maybe_config_upload_request =
- SmartRestConfigUploadRequest::from_smartrest(payload);
-
- if let Ok(config_upload_request) = maybe_config_upload_request {
- // handle the config file upload request
- handle_config_upload_request(
- plugin_config,
- config_upload_request,
- mqtt_client,
- http_client,
- )
- .await
- } else {
- error!("Incorrect Upload SmartREST payload: {}", payload);
- Ok(())
- }
+ debug!("Received {:?}", message);
+ if health_check_topics.accept(&message) {
+ send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await;
+ } else if let Ok(payload) = message.payload_str() {
+ let result = match message.topic.name.as_str() {
+ "tedge/configuration_change/c8y-configuration-plugin" => {
+ // Reload the plugin config file
+ let plugin_config = PluginConfig::new(config_file_path);
+ // Resend the supported config types
+ let msg = plugin_config.to_supported_config_types_message()?;
+ mqtt_client.published.send(msg).await?;
+ Ok(())
+ }
+ _ => {
+ match payload.split(',').next().unwrap_or_default() {
+ "524" => {
+ let maybe_config_download_request =
+ SmartRestConfigDownloadRequest::from_smartrest(payload);
+ if let Ok(config_download_request) = maybe_config_download_request {
+ handle_config_download_request(
+ plugin_config,
+ config_download_request,
+ tmp_dir.clone(),
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Download SmartREST payload: {}", payload);
+ Ok(())
}
- _ => {
- // Ignore operation messages not meant for this plugin
+ }
+ "526" => {
+ // retrieve config file upload smartrest request from payload
+ let maybe_config_upload_request =
+ SmartRestConfigUploadRequest::from_smartrest(payload);
+
+ if let Ok(config_upload_request) = maybe_config_upload_request {
+ // handle the config file upload request
+ handle_config_upload_request(
+ plugin_config,
+ config_upload_request,
+ mqtt_client,
+ http_client,
+ )
+ .await
+ } else {
+ error!("Incorrect Upload SmartREST payload: {}", payload);
Ok(())
}
}
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
}
- };
-
- if let Err(err) = result {
- error!("Handling of operation: '{payload}' failed with {err}");
}
+ };
+
+ if let Err(err) = result {
+ error!("Handling of operation: '{payload}' failed with {err}");
}
}
Ok(())
@@ -315,7 +335,8 @@ mod tests {
broker.port,
&mut http_client,
tmp_dir.path().to_path_buf(),
- PathBuf::from(test_config_path).as_path(),
+ tmp_dir.path(),
+ test_config_path,
)
.await;
});