diff options
author | Rina Fujino <18257209+rina23q@users.noreply.github.com> | 2022-04-21 00:49:24 +0200 |
---|---|---|
committer | Rina Fujino <18257209+rina23q@users.noreply.github.com> | 2022-04-21 00:49:24 +0200 |
commit | 37a35f6f7f7504dafab917a1926b17a7afbcffa4 (patch) | |
tree | 0308e43835fbfaf51d9f54d9a871347dcd2dd428 | |
parent | 24e35787d843002980fb622f89634d1a731ea860 (diff) |
Add downloading config files support
Signed-off-by: Rina Fujino <18257209+rina23q@users.noreply.github.com>
-rw-r--r-- | Cargo.lock | 4 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_deserializer.rs | 39 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_serializer.rs | 4 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 4 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/config.rs | 2 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/download.rs | 268 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/error.rs | 28 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 25 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/smartrest.rs | 28 |
9 files changed, 391 insertions, 11 deletions
@@ -392,10 +392,14 @@ name = "c8y_configuration_plugin" version = "0.6.1" dependencies = [ "anyhow", + "assert_matches", "c8y_api", "c8y_smartrest", + "csv", + "download", "mqtt_channel", "serde", + "serde_json", "tedge_config", "tedge_utils", "tempfile", diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index c65f49b2..3f9c9567 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -216,8 +216,7 @@ impl SmartRestLogRequest { rdr.deserialize() .next() - .ok_or_else(|| panic!("empty request")) - .unwrap() // does already panic before this, so this unwrap is only required for type lineup + .ok_or(SmartRestDeserializerError::EmptyRequest)? .map_err(SmartRestDeserializerError::from) } } @@ -264,6 +263,29 @@ impl SmartRestConfigUploadRequest { } } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct SmartRestConfigDownloadRequest { + pub message_id: String, + pub device: String, + pub url: String, + pub config_type: String, +} + +// TODO: make it generic. We have many from_smartrest() repeating the same code. +impl SmartRestConfigDownloadRequest { + pub fn from_smartrest(smartrest: &str) -> Result<Self, SmartRestDeserializerError> { + let mut rdr = ReaderBuilder::new() + .has_headers(false) + .flexible(true) + .from_reader(smartrest.as_bytes()); + + rdr.deserialize() + .next() + .ok_or(SmartRestDeserializerError::EmptyRequest)? + .map_err(SmartRestDeserializerError::from) + } +} + type JwtToken = String; #[derive(Debug, Deserialize, PartialEq)] @@ -620,6 +642,19 @@ mod tests { assert!(log.is_ok()); } + #[test] + fn deserialize_smartrest_config_download_request_operation() { + let smartrest = "524,deviceId,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/tedge.toml".to_string(); + let request = SmartRestConfigDownloadRequest::from_smartrest(&smartrest).unwrap(); + let expected_output = SmartRestConfigDownloadRequest { + message_id: "524".to_string(), + device: "deviceId".to_string(), + url: "https://test.cumulocity.com/inventory/binaries/70208".to_string(), + config_type: "/etc/tedge/tedge.toml".to_string(), + }; + assert_eq!(request, expected_output); + } + #[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")] #[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")] #[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")] diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs index b3ec3e44..cd3867b5 100644 --- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs @@ -3,7 +3,7 @@ use agent_interface::{OperationStatus, SoftwareUpdateResponse}; use csv::{QuoteStyle, WriterBuilder}; use serde::{Deserialize, Serialize, Serializer}; -type SmartRest = String; +pub type SmartRest = String; #[derive(Debug)] pub enum CumulocitySupportedOperations { @@ -11,6 +11,7 @@ pub enum CumulocitySupportedOperations { C8yLogFileRequest, C8yRestartRequest, C8yUploadConfigFile, + C8yDownloadConfigFile, } impl From<CumulocitySupportedOperations> for &'static str { @@ -20,6 +21,7 @@ impl From<CumulocitySupportedOperations> for &'static str { CumulocitySupportedOperations::C8yLogFileRequest => "c8y_LogfileRequest", CumulocitySupportedOperations::C8yRestartRequest => "c8y_Restart", CumulocitySupportedOperations::C8yUploadConfigFile => "c8y_UploadConfigFile", + CumulocitySupportedOperations::C8yDownloadConfigFile => "c8y_DownloadConfigFile", } } } diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 1a434fc3..818d1b42 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -11,8 +11,11 @@ description = "Thin.edge.io operation plugin for Cumulocity configuration manage anyhow = "1.0" c8y_api = { path = "../../crates/core/c8y_api" } c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } +csv = "1.1" +download = { path = "../../crates/common/download" } mqtt_channel = { path = "../../crates/common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" tedge_config = { path = "../../crates/common/tedge_config" } tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] } thiserror = "1.0" @@ -21,5 +24,6 @@ toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] +assert_matches = "1.5" tempfile = "3.3" test-case = "2.0" diff --git a/plugins/c8y_configuration_plugin/src/config.rs b/plugins/c8y_configuration_plugin/src/config.rs index 8e444ee6..0485cf62 100644 --- a/plugins/c8y_configuration_plugin/src/config.rs +++ b/plugins/c8y_configuration_plugin/src/config.rs @@ -3,7 +3,7 @@ use std::fs; use std::path::PathBuf; use tracing::{info, warn}; -pub const PLUGIN_CONFIG_FILE: &str = "c8y_configuration_plugin.toml"; +pub const PLUGIN_CONFIG_FILE: &str = "c8y-configuration-plugin.toml"; #[derive(Deserialize, Debug, PartialEq)] #[serde(deny_unknown_fields)] diff --git a/plugins/c8y_configuration_plugin/src/download.rs b/plugins/c8y_configuration_plugin/src/download.rs new file mode 100644 index 00000000..49045c6d --- /dev/null +++ b/plugins/c8y_configuration_plugin/src/download.rs @@ -0,0 +1,268 @@ +use crate::error::ConfigDownloadError; +use crate::smartrest::GetSmartRestMessage; +use crate::{error, PluginConfig}; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::error::SmartRestSerializerError; +use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest; +use c8y_smartrest::smartrest_serializer::{ + CumulocitySupportedOperations, SmartRest, SmartRestSerializer, + SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, + SmartRestSetOperationToSuccessful, +}; +use download::{Auth, DownloadInfo, Downloader}; +use mqtt_channel::{Connection, Message, SinkExt, Topic}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::fs; +use std::path::PathBuf; +use tedge_config::{get_tedge_config, ConfigSettingAccessor, TmpPathDefaultSetting}; + +const BROADCASTING_TOPIC: &str = "filemanagement/changes"; + +pub async fn handle_config_download_request( + plugin_config: &PluginConfig, + smartrest_request: SmartRestConfigDownloadRequest, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + let executing_message = GetDownloadConfigFileMessage::executing()?; + let () = mqtt_client.published.send(executing_message).await?; + + // Add validation if the config_type exists in + let changed_file = smartrest_request.config_type.clone(); + + match download_config_file(plugin_config, smartrest_request, http_client).await { + Ok(_) => { + let successful_message = GetDownloadConfigFileMessage::successful()?; + let () = mqtt_client.published.send(successful_message).await?; + + let notification_message = get_file_change_notification_message(changed_file); + let () = mqtt_client.published.send(notification_message).await?; + Ok(()) + } + Err(err) => { + let failed_message = GetDownloadConfigFileMessage::failed(err.to_string())?; + let () = mqtt_client.published.send(failed_message).await?; + Err(err) + } + } +} + +async fn download_config_file( + plugin_config: &PluginConfig, + smartrest_request: SmartRestConfigDownloadRequest, + http_client: &mut JwtAuthHttpProxy, +) -> Result<(), anyhow::Error> { + // Convert smartrest request to config download request struct + let mut config_download_request = + ConfigDownloadRequest::try_new(smartrest_request, plugin_config)?; + + // Confirm that the file has write access before any http request attempt + let () = config_download_request.has_write_access()?; + + // If the provided url is c8y, add auth + if http_client.url_is_in_my_tenant_domain(config_download_request.download_info.url()) { + let token = http_client.get_jwt_token().await?; + config_download_request.download_info.auth = Some(Auth::new_bearer(&token.token())); + } + + // Download a file to tmp dir + let downloader = config_download_request.create_downloader(); + let () = downloader + .download(&config_download_request.download_info) + .await?; + + // Move the downloaded file to the final destination + let () = config_download_request.move_file()?; + + Ok(()) +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct ConfigDownloadRequest { + pub download_info: DownloadInfo, + pub destination_path: PathBuf, + pub tmp_dir: PathBuf, + pub file_name: String, +} + +impl ConfigDownloadRequest { + fn try_new( + request: SmartRestConfigDownloadRequest, + plugin_config: &PluginConfig, + ) -> Result<Self, ConfigDownloadError> { + // Check if the requested config type is in the plugin config list + if !plugin_config.files.contains(&request.config_type) { + return Err(ConfigDownloadError::InvalidRequestedConfigType { + path: request.config_type, + }); + } + + let destination_path = PathBuf::from(request.config_type); + let tedge_config = get_tedge_config()?; + let tmp_dir = tedge_config.query(TmpPathDefaultSetting)?.into(); + let file_name = Self::get_filename(destination_path.clone())?; + + Ok(Self { + download_info: DownloadInfo { + url: request.url, + auth: None, + }, + destination_path, + tmp_dir, + file_name, + }) + } + + fn get_filename(path: PathBuf) -> Result<String, ConfigDownloadError> { + let filename = path + .file_name() + .ok_or_else(|| ConfigDownloadError::FileNameNotFound { path: path.clone() })? + .to_str() + .ok_or_else(|| ConfigDownloadError::InvalidFileName { path: path.clone() })? + .to_string(); + Ok(filename) + } + + fn has_write_access(&self) -> Result<(), ConfigDownloadError> { + let metadata = fs::metadata(&self.destination_path).map_err(|_| { + ConfigDownloadError::FileNotAccessible { + path: self.destination_path.clone(), + } + })?; + if metadata.permissions().readonly() { + Err(error::ConfigDownloadError::ReadOnlyFile { + path: self.destination_path.clone(), + })? + } else { + Ok(()) + } + } + + fn create_downloader(&self) -> Downloader { + Downloader::new(&self.file_name, &None, &self.tmp_dir) + } + + fn move_file(&self) -> Result<(), ConfigDownloadError> { + let src = &self.tmp_dir.join(&self.file_name); + let dest = &self.destination_path; + let _ = fs::copy(src, dest).map_err(|_| ConfigDownloadError::FileCopyFailed { + src: src.to_path_buf(), + dest: dest.to_path_buf(), + })?; + Ok(()) + } +} + +pub fn get_file_change_notification_message(config_type: String) -> Message { + let notification = json!({ "changedFile": config_type }).to_string(); + Message::new(&Topic::new_unchecked(BROADCASTING_TOPIC), notification) +} + +struct GetDownloadConfigFileMessage {} + +impl GetSmartRestMessage for GetDownloadConfigFileMessage { + fn status_executing() -> Result<SmartRest, SmartRestSerializerError> { + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yDownloadConfigFile) + .to_smartrest() + } + + fn status_successful() -> Result<SmartRest, SmartRestSerializerError> { + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yDownloadConfigFile) + .to_smartrest() + } + + fn status_failed(failure_reason: String) -> Result<SmartRest, SmartRestSerializerError> { + SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yDownloadConfigFile, + failure_reason, + ) + .to_smartrest() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::*; + + #[test] + fn create_config_download_request() -> Result<(), anyhow::Error> { + let payload = "524,rina0005,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/tedge.toml"; + let smartrest_request = SmartRestConfigDownloadRequest::from_smartrest(payload)?; + let plugin_config = PluginConfig { + files: vec![ + "/etc/tedge/tedge.toml".to_string(), + "/etc/tedge/mosquitto-conf/c8y-bridge.conf".to_string(), + "/etc/tedge/mosquitto-conf/tedge-mosquitto.conf".to_string(), + "/etc/mosquitto/mosquitto.conf".to_string(), + ], + }; + let config_download_request = + ConfigDownloadRequest::try_new(smartrest_request, &plugin_config)?; + assert_eq!( + config_download_request, + ConfigDownloadRequest { + download_info: DownloadInfo { + url: "https://test.cumulocity.com/inventory/binaries/70208".to_string(), + auth: None + }, + destination_path: PathBuf::from("/etc/tedge/tedge.toml"), + tmp_dir: PathBuf::from("/tmp"), + file_name: "tedge.toml".to_string() + } + ); + Ok(()) + } + + #[test] + fn requested_config_does_not_match_config_plugin() -> Result<(), anyhow::Error> { + let payload = "524,rina0005,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/not_in_config.toml"; + let smartrest_request = SmartRestConfigDownloadRequest::from_smartrest(payload)?; + let plugin_config = PluginConfig { + files: vec![ + "/etc/tedge/tedge.toml".to_string(), + "/etc/tedge/mosquitto-conf/c8y-bridge.conf".to_string(), + "/etc/tedge/mosquitto-conf/tedge-mosquitto.conf".to_string(), + "/etc/mosquitto/mosquitto.conf".to_string(), + ], + }; + let config_download_request = + ConfigDownloadRequest::try_new(smartrest_request, &plugin_config); + assert_matches!( + config_download_request, + Err(ConfigDownloadError::InvalidRequestedConfigType { .. }) + ); + Ok(()) + } + + #[test] + fn get_smartrest_executing() { + let message = GetDownloadConfigFileMessage::executing().unwrap(); + assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap()); + assert_eq!( + message.payload_str().unwrap(), + "501,c8y_DownloadConfigFile\n" + ); + } + + #[test] + fn get_smartrest_successful() { + let message = GetDownloadConfigFileMessage::successful().unwrap(); + assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap()); + assert_eq!( + message.payload_str().unwrap(), + "503,c8y_DownloadConfigFile,\n" + ); + } + + #[test] + fn get_smartrest_failed() { + let message = GetDownloadConfigFileMessage::failed("failed reason".to_string()).unwrap(); + assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap()); + assert_eq!( + message.payload_str().unwrap(), + "502,c8y_DownloadConfigFile,\"failed reason\"\n" + ); + } +} diff --git a/plugins/c8y_configuration_plugin/src/error.rs b/plugins/c8y_configuration_plugin/src/error.rs new file mode 100644 index 00000000..bb84dc30 --- /dev/null +++ b/plugins/c8y_configuration_plugin/src/error.rs @@ -0,0 +1,28 @@ +use std::path::PathBuf; + +#[derive(thiserror::Error, Debug)] +pub enum ConfigDownloadError { + #[error("The file is read-only {path:?}")] + ReadOnlyFile { path: PathBuf }, + + #[error("The file name is not found from {path:?}")] + FileNameNotFound { path: PathBuf }, + + #[error("The file name is invalid. {path:?}")] + InvalidFileName { path: PathBuf }, + + #[error("The file is not accessible. {path:?}")] + FileNotAccessible { path: PathBuf }, + + #[error("Failed to copy a file from {src:?} to {dest:?}")] + FileCopyFailed { src: PathBuf, dest: PathBuf }, + + #[error("The requested config_type {path} doesn't match the plugin config.")] + InvalidRequestedConfigType { path: String }, + + #[error(transparent)] + FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError), + + #[error(transparent)] + FromConfigSetting(#[from] tedge_config::ConfigSettingError), +} diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 30a8d289..a1f59a73 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -1,9 +1,13 @@ mod config; +mod download; +mod error; mod smartrest; use crate::config::PluginConfig; +use crate::download::handle_config_download_request; use anyhow::Result; use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest; use c8y_smartrest::{ smartrest_deserializer::SmartRestConfigUploadRequest, smartrest_serializer::{ @@ -26,7 +30,7 @@ const CONFIG_ROOT_PATH: &str = "/etc/tedge/c8y"; const LOG_LEVEL_DEBUG: bool = false; #[cfg(debug_assertions)] -const LOG_LEVEL_DEBUG: bool = true; +const LOG_LEVEL_DEBUG: bool = false; async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error> { let tedge_config = get_tedge_config()?; @@ -41,7 +45,6 @@ async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error> Ok(mqtt_client) } -/// creates an http client pub async fn create_http_client() -> Result<JwtAuthHttpProxy, anyhow::Error> { let config = get_tedge_config()?; let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?; @@ -152,12 +155,20 @@ async fn main() -> Result<(), anyhow::Error> { while let Some(message) = mqtt_client.received.next().await { debug!("Received {:?}", message); if let Ok(payload) = message.payload_str() { - let result = match payload.split(',').next() { - Some("524") => { - debug!("{}", message.payload_str()?); - todo!() // c8y_DownloadConfigFile + let result = match payload.split(',').next().unwrap_or_default() { + "524" => { + debug!("{}", payload); + let config_download_request = + SmartRestConfigDownloadRequest::from_smartrest(payload)?; + handle_config_download_request( + &plugin_config, + config_download_request, + &mut mqtt_client, + &mut http_client, + ) + .await } - Some("526") => { + "526" => { debug!("{}", payload); // retrieve config file upload smartrest request from payload let config_upload_request = diff --git a/plugins/c8y_configuration_plugin/src/smartrest.rs b/plugins/c8y_configuration_plugin/src/smartrest.rs index f1255f97..5d525673 100644 --- a/plugins/c8y_configuration_plugin/src/smartrest.rs +++ b/plugins/c8y_configuration_plugin/src/smartrest.rs @@ -1,4 +1,6 @@ use crate::config::PluginConfig; +use c8y_smartrest::error::SmartRestSerializerError; +use c8y_smartrest::smartrest_serializer::SmartRest; use c8y_smartrest::topic::C8yTopic; use mqtt_channel::Message; @@ -20,6 +22,32 @@ impl PluginConfig { } } +pub trait GetSmartRestMessage { + fn executing() -> Result<Message, SmartRestSerializerError> { + let status = Self::status_executing()?; + Ok(Self::create_message(status)) + } + + fn successful() -> Result<Message, SmartRestSerializerError> { + let status = Self::status_successful()?; + Ok(Self::create_message(status)) + } + + fn failed(failure_reason: String) -> Result<Message, SmartRestSerializerError> { + let status = Self::status_failed(failure_reason)?; + Ok(Self::create_message(status)) + } + + fn create_message(payload: SmartRest) -> Message { + let topic = C8yTopic::SmartRestResponse.to_topic().unwrap(); // never fail + Message::new(&topic, payload) + } + + fn status_executing() -> Result<SmartRest, SmartRestSerializerError>; + fn status_successful() -> Result<SmartRest, SmartRestSerializerError>; + fn status_failed(failure_reason: String) -> Result<SmartRest, SmartRestSerializerError>; +} + #[cfg(test)] mod tests { use super::*; |