diff options
Diffstat (limited to 'plugins/c8y_configuration_plugin/src/main.rs')
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 183 |
1 files changed, 102 insertions, 81 deletions
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 665d9f1c..162e9e71 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -22,9 +22,10 @@ use tedge_config::{ use tedge_utils::file::{create_directory_with_user_group, create_file_with_user_group}; use thin_edge_json::health::{health_check_topics, send_health_status}; +use tedge_utils::fs_notify::{fs_notify_stream, pin_mut, FileEvent}; use tracing::{debug, error, info}; -pub const DEFAULT_PLUGIN_CONFIG_FILE_PATH: &str = "/etc/tedge/c8y/c8y-configuration-plugin.toml"; +pub const DEFAULT_PLUGIN_CONFIG_FILE: &str = "c8y/c8y-configuration-plugin.toml"; pub const DEFAULT_PLUGIN_CONFIG_TYPE: &str = "c8y-configuration-plugin"; pub const CONFIG_CHANGE_TOPIC: &str = "tedge/configuration_change"; @@ -58,16 +59,11 @@ pub struct ConfigPluginOpt { #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] pub config_dir: PathBuf, - - #[clap(long = "config-file", default_value = DEFAULT_PLUGIN_CONFIG_FILE_PATH)] - pub config_file: PathBuf, } async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, anyhow::Error> { let mut topic_filter = mqtt_channel::TopicFilter::new_unchecked(C8yTopic::SmartRestRequest.as_str()); - let _ = topic_filter - .add_unchecked(format!("{CONFIG_CHANGE_TOPIC}/{DEFAULT_PLUGIN_CONFIG_TYPE}").as_str()); let _ = topic_filter.add_all(health_check_topics("c8y-configuration-plugin")); let mqtt_config = mqtt_channel::Config::default() @@ -98,7 +94,7 @@ async fn main() -> Result<(), anyhow::Error> { // Load tedge config from the provided location let tedge_config_location = - tedge_config::TEdgeConfigLocation::from_custom_root(config_plugin_opt.config_dir); + tedge_config::TEdgeConfigLocation::from_custom_root(&config_plugin_opt.config_dir); let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); let tedge_config = config_repository.load()?; @@ -110,7 +106,8 @@ async fn main() -> Result<(), anyhow::Error> { mqtt_port, &mut http_client, tmp_dir, - &config_plugin_opt.config_file, + &config_plugin_opt.config_dir, + DEFAULT_PLUGIN_CONFIG_FILE, ) .await } @@ -119,9 +116,11 @@ async fn run( mqtt_port: u16, http_client: &mut impl C8YHttpProxy, tmp_dir: PathBuf, - config_file_path: &Path, + config_dir: &Path, + config_file: &str, ) -> Result<(), anyhow::Error> { - let mut plugin_config = PluginConfig::new(config_file_path); + let config_file_path = config_dir.join(config_file); + let mut plugin_config = PluginConfig::new(&config_file_path); let mut mqtt_client = create_mqtt_client(mqtt_port).await?; @@ -137,22 +136,45 @@ async fn run( ); let () = mqtt_client.published.send(msg).await?; - // Mqtt message loop - process_mqtt_message( - &mut plugin_config, - &mut mqtt_client, - config_file_path, - http_client, - tmp_dir, - ) - .await?; - - mqtt_client.close().await; - - Ok(()) + let fs_notification_stream = fs_notify_stream(&[( + config_dir, + Some(config_file.to_string()), + &[FileEvent::Modified, FileEvent::Deleted, FileEvent::Created], + )])?; + pin_mut!(fs_notification_stream); + + loop { + tokio::select! { + message = mqtt_client.received.next() => { + if let Some(message) = message { + process_mqtt_message( + message, + &mut plugin_config, + &mut mqtt_client, + &config_file_path, + http_client, + tmp_dir.clone(), + ) + .await?; + } else { + // message is None and the connection has been closed + return Ok(()) + } + } + Some(Ok((path, mask))) = fs_notification_stream.next() => { + match mask { + FileEvent::Modified | FileEvent::Deleted | FileEvent::Created => { + plugin_config = PluginConfig::new(&path); + let message = plugin_config.to_supported_config_types_message()?; + mqtt_client.published.send(message).await?; + }, + } + }} + } } async fn process_mqtt_message( + message: Message, plugin_config: &mut PluginConfig, mqtt_client: &mut Connection, config_file_path: &Path, @@ -160,69 +182,67 @@ async fn process_mqtt_message( tmp_dir: PathBuf, ) -> Result<(), anyhow::Error> { let health_check_topics = health_check_topics("c8y-configuration-plugin"); - while let Some(message) = mqtt_client.received.next().await { - debug!("Received {:?}", message); - if health_check_topics.accept(&message) { - send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await; - } else if let Ok(payload) = message.payload_str() { - let result = match message.topic.name.as_str() { - "tedge/configuration_change/c8y-configuration-plugin" => { - // Reload the plugin config file - let plugin_config = PluginConfig::new(config_file_path); - // Resend the supported config types - let msg = plugin_config.to_supported_config_types_message()?; - mqtt_client.published.send(msg).await?; - Ok(()) - } - _ => { - match payload.split(',').next().unwrap_or_default() { - "524" => { - let maybe_config_download_request = - SmartRestConfigDownloadRequest::from_smartrest(payload); - if let Ok(config_download_request) = maybe_config_download_request { - handle_config_download_request( - plugin_config, - config_download_request, - tmp_dir.clone(), - mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Download SmartREST payload: {}", payload); - Ok(()) - } - } - "526" => { - // retrieve config file upload smartrest request from payload - let maybe_config_upload_request = - SmartRestConfigUploadRequest::from_smartrest(payload); - - if let Ok(config_upload_request) = maybe_config_upload_request { - // handle the config file upload request - handle_config_upload_request( - plugin_config, - config_upload_request, - mqtt_client, - http_client, - ) - .await - } else { - error!("Incorrect Upload SmartREST payload: {}", payload); - Ok(()) - } + debug!("Received {:?}", message); + if health_check_topics.accept(&message) { + send_health_status(&mut mqtt_client.published, "c8y-configuration-plugin").await; + } else if let Ok(payload) = message.payload_str() { + let result = match message.topic.name.as_str() { + "tedge/configuration_change/c8y-configuration-plugin" => { + // Reload the plugin config file + let plugin_config = PluginConfig::new(config_file_path); + // Resend the supported config types + let msg = plugin_config.to_supported_config_types_message()?; + mqtt_client.published.send(msg).await?; + Ok(()) + } + _ => { + match payload.split(',').next().unwrap_or_default() { + "524" => { + let maybe_config_download_request = + SmartRestConfigDownloadRequest::from_smartrest(payload); + if let Ok(config_download_request) = maybe_config_download_request { + handle_config_download_request( + plugin_config, + config_download_request, + tmp_dir.clone(), + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Download SmartREST payload: {}", payload); + Ok(()) } - _ => { - // Ignore operation messages not meant for this plugin + } + "526" => { + // retrieve config file upload smartrest request from payload + let maybe_config_upload_request = + SmartRestConfigUploadRequest::from_smartrest(payload); + + if let Ok(config_upload_request) = maybe_config_upload_request { + // handle the config file upload request + handle_config_upload_request( + plugin_config, + config_upload_request, + mqtt_client, + http_client, + ) + .await + } else { + error!("Incorrect Upload SmartREST payload: {}", payload); Ok(()) } } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } } - }; - - if let Err(err) = result { - error!("Handling of operation: '{payload}' failed with {err}"); } + }; + + if let Err(err) = result { + error!("Handling of operation: '{payload}' failed with {err}"); } } Ok(()) @@ -315,7 +335,8 @@ mod tests { broker.port, &mut http_client, tmp_dir.path().to_path_buf(), - PathBuf::from(test_config_path).as_path(), + tmp_dir.path(), + test_config_path, ) .await; }); |