summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRina Fujino <18257209+rina23q@users.noreply.github.com>2022-04-21 00:49:24 +0200
committerRina Fujino <18257209+rina23q@users.noreply.github.com>2022-04-21 00:49:24 +0200
commit37a35f6f7f7504dafab917a1926b17a7afbcffa4 (patch)
tree0308e43835fbfaf51d9f54d9a871347dcd2dd428
parent24e35787d843002980fb622f89634d1a731ea860 (diff)
Add downloading config files support
Signed-off-by: Rina Fujino <18257209+rina23q@users.noreply.github.com>
-rw-r--r--Cargo.lock4
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs39
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs4
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml4
-rw-r--r--plugins/c8y_configuration_plugin/src/config.rs2
-rw-r--r--plugins/c8y_configuration_plugin/src/download.rs268
-rw-r--r--plugins/c8y_configuration_plugin/src/error.rs28
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs25
-rw-r--r--plugins/c8y_configuration_plugin/src/smartrest.rs28
9 files changed, 391 insertions, 11 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8e309ced..87577319 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -392,10 +392,14 @@ name = "c8y_configuration_plugin"
version = "0.6.1"
dependencies = [
"anyhow",
+ "assert_matches",
"c8y_api",
"c8y_smartrest",
+ "csv",
+ "download",
"mqtt_channel",
"serde",
+ "serde_json",
"tedge_config",
"tedge_utils",
"tempfile",
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index c65f49b2..3f9c9567 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -216,8 +216,7 @@ impl SmartRestLogRequest {
rdr.deserialize()
.next()
- .ok_or_else(|| panic!("empty request"))
- .unwrap() // does already panic before this, so this unwrap is only required for type lineup
+ .ok_or(SmartRestDeserializerError::EmptyRequest)?
.map_err(SmartRestDeserializerError::from)
}
}
@@ -264,6 +263,29 @@ impl SmartRestConfigUploadRequest {
}
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub struct SmartRestConfigDownloadRequest {
+ pub message_id: String,
+ pub device: String,
+ pub url: String,
+ pub config_type: String,
+}
+
+// TODO: make it generic. We have many from_smartrest() repeating the same code.
+impl SmartRestConfigDownloadRequest {
+ pub fn from_smartrest(smartrest: &str) -> Result<Self, SmartRestDeserializerError> {
+ let mut rdr = ReaderBuilder::new()
+ .has_headers(false)
+ .flexible(true)
+ .from_reader(smartrest.as_bytes());
+
+ rdr.deserialize()
+ .next()
+ .ok_or(SmartRestDeserializerError::EmptyRequest)?
+ .map_err(SmartRestDeserializerError::from)
+ }
+}
+
type JwtToken = String;
#[derive(Debug, Deserialize, PartialEq)]
@@ -620,6 +642,19 @@ mod tests {
assert!(log.is_ok());
}
+ #[test]
+ fn deserialize_smartrest_config_download_request_operation() {
+ let smartrest = "524,deviceId,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/tedge.toml".to_string();
+ let request = SmartRestConfigDownloadRequest::from_smartrest(&smartrest).unwrap();
+ let expected_output = SmartRestConfigDownloadRequest {
+ message_id: "524".to_string(),
+ device: "deviceId".to_string(),
+ url: "https://test.cumulocity.com/inventory/binaries/70208".to_string(),
+ config_type: "/etc/tedge/tedge.toml".to_string(),
+ };
+ assert_eq!(request, expected_output);
+ }
+
#[test_case("/path/to/software-list-2021-10-27T10:44:44Z.log")]
#[test_case("/path/to/tedge/agent/software-update-2021-10-25T07:45:41Z.log")]
#[test_case("/path/to/another-variant-2021-10-25T07:45:41Z.log")]
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index b3ec3e44..cd3867b5 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -3,7 +3,7 @@ use agent_interface::{OperationStatus, SoftwareUpdateResponse};
use csv::{QuoteStyle, WriterBuilder};
use serde::{Deserialize, Serialize, Serializer};
-type SmartRest = String;
+pub type SmartRest = String;
#[derive(Debug)]
pub enum CumulocitySupportedOperations {
@@ -11,6 +11,7 @@ pub enum CumulocitySupportedOperations {
C8yLogFileRequest,
C8yRestartRequest,
C8yUploadConfigFile,
+ C8yDownloadConfigFile,
}
impl From<CumulocitySupportedOperations> for &'static str {
@@ -20,6 +21,7 @@ impl From<CumulocitySupportedOperations> for &'static str {
CumulocitySupportedOperations::C8yLogFileRequest => "c8y_LogfileRequest",
CumulocitySupportedOperations::C8yRestartRequest => "c8y_Restart",
CumulocitySupportedOperations::C8yUploadConfigFile => "c8y_UploadConfigFile",
+ CumulocitySupportedOperations::C8yDownloadConfigFile => "c8y_DownloadConfigFile",
}
}
}
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index 1a434fc3..818d1b42 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -11,8 +11,11 @@ description = "Thin.edge.io operation plugin for Cumulocity configuration manage
anyhow = "1.0"
c8y_api = { path = "../../crates/core/c8y_api" }
c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
+csv = "1.1"
+download = { path = "../../crates/common/download" }
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
tedge_config = { path = "../../crates/common/tedge_config" }
tedge_utils = { path = "../../crates/common/tedge_utils", features = ["logging"] }
thiserror = "1.0"
@@ -21,5 +24,6 @@ toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
+assert_matches = "1.5"
tempfile = "3.3"
test-case = "2.0"
diff --git a/plugins/c8y_configuration_plugin/src/config.rs b/plugins/c8y_configuration_plugin/src/config.rs
index 8e444ee6..0485cf62 100644
--- a/plugins/c8y_configuration_plugin/src/config.rs
+++ b/plugins/c8y_configuration_plugin/src/config.rs
@@ -3,7 +3,7 @@ use std::fs;
use std::path::PathBuf;
use tracing::{info, warn};
-pub const PLUGIN_CONFIG_FILE: &str = "c8y_configuration_plugin.toml";
+pub const PLUGIN_CONFIG_FILE: &str = "c8y-configuration-plugin.toml";
#[derive(Deserialize, Debug, PartialEq)]
#[serde(deny_unknown_fields)]
diff --git a/plugins/c8y_configuration_plugin/src/download.rs b/plugins/c8y_configuration_plugin/src/download.rs
new file mode 100644
index 00000000..49045c6d
--- /dev/null
+++ b/plugins/c8y_configuration_plugin/src/download.rs
@@ -0,0 +1,268 @@
+use crate::error::ConfigDownloadError;
+use crate::smartrest::GetSmartRestMessage;
+use crate::{error, PluginConfig};
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::error::SmartRestSerializerError;
+use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest;
+use c8y_smartrest::smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRest, SmartRestSerializer,
+ SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed,
+ SmartRestSetOperationToSuccessful,
+};
+use download::{Auth, DownloadInfo, Downloader};
+use mqtt_channel::{Connection, Message, SinkExt, Topic};
+use serde::{Deserialize, Serialize};
+use serde_json::json;
+use std::fs;
+use std::path::PathBuf;
+use tedge_config::{get_tedge_config, ConfigSettingAccessor, TmpPathDefaultSetting};
+
+const BROADCASTING_TOPIC: &str = "filemanagement/changes";
+
+pub async fn handle_config_download_request(
+ plugin_config: &PluginConfig,
+ smartrest_request: SmartRestConfigDownloadRequest,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ let executing_message = GetDownloadConfigFileMessage::executing()?;
+ let () = mqtt_client.published.send(executing_message).await?;
+
+ // Add validation if the config_type exists in
+ let changed_file = smartrest_request.config_type.clone();
+
+ match download_config_file(plugin_config, smartrest_request, http_client).await {
+ Ok(_) => {
+ let successful_message = GetDownloadConfigFileMessage::successful()?;
+ let () = mqtt_client.published.send(successful_message).await?;
+
+ let notification_message = get_file_change_notification_message(changed_file);
+ let () = mqtt_client.published.send(notification_message).await?;
+ Ok(())
+ }
+ Err(err) => {
+ let failed_message = GetDownloadConfigFileMessage::failed(err.to_string())?;
+ let () = mqtt_client.published.send(failed_message).await?;
+ Err(err)
+ }
+ }
+}
+
+async fn download_config_file(
+ plugin_config: &PluginConfig,
+ smartrest_request: SmartRestConfigDownloadRequest,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<(), anyhow::Error> {
+ // Convert smartrest request to config download request struct
+ let mut config_download_request =
+ ConfigDownloadRequest::try_new(smartrest_request, plugin_config)?;
+
+ // Confirm that the file has write access before any http request attempt
+ let () = config_download_request.has_write_access()?;
+
+ // If the provided url is c8y, add auth
+ if http_client.url_is_in_my_tenant_domain(config_download_request.download_info.url()) {
+ let token = http_client.get_jwt_token().await?;
+ config_download_request.download_info.auth = Some(Auth::new_bearer(&token.token()));
+ }
+
+ // Download a file to tmp dir
+ let downloader = config_download_request.create_downloader();
+ let () = downloader
+ .download(&config_download_request.download_info)
+ .await?;
+
+ // Move the downloaded file to the final destination
+ let () = config_download_request.move_file()?;
+
+ Ok(())
+}
+
+#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
+pub struct ConfigDownloadRequest {
+ pub download_info: DownloadInfo,
+ pub destination_path: PathBuf,
+ pub tmp_dir: PathBuf,
+ pub file_name: String,
+}
+
+impl ConfigDownloadRequest {
+ fn try_new(
+ request: SmartRestConfigDownloadRequest,
+ plugin_config: &PluginConfig,
+ ) -> Result<Self, ConfigDownloadError> {
+ // Check if the requested config type is in the plugin config list
+ if !plugin_config.files.contains(&request.config_type) {
+ return Err(ConfigDownloadError::InvalidRequestedConfigType {
+ path: request.config_type,
+ });
+ }
+
+ let destination_path = PathBuf::from(request.config_type);
+ let tedge_config = get_tedge_config()?;
+ let tmp_dir = tedge_config.query(TmpPathDefaultSetting)?.into();
+ let file_name = Self::get_filename(destination_path.clone())?;
+
+ Ok(Self {
+ download_info: DownloadInfo {
+ url: request.url,
+ auth: None,
+ },
+ destination_path,
+ tmp_dir,
+ file_name,
+ })
+ }
+
+ fn get_filename(path: PathBuf) -> Result<String, ConfigDownloadError> {
+ let filename = path
+ .file_name()
+ .ok_or_else(|| ConfigDownloadError::FileNameNotFound { path: path.clone() })?
+ .to_str()
+ .ok_or_else(|| ConfigDownloadError::InvalidFileName { path: path.clone() })?
+ .to_string();
+ Ok(filename)
+ }
+
+ fn has_write_access(&self) -> Result<(), ConfigDownloadError> {
+ let metadata = fs::metadata(&self.destination_path).map_err(|_| {
+ ConfigDownloadError::FileNotAccessible {
+ path: self.destination_path.clone(),
+ }
+ })?;
+ if metadata.permissions().readonly() {
+ Err(error::ConfigDownloadError::ReadOnlyFile {
+ path: self.destination_path.clone(),
+ })?
+ } else {
+ Ok(())
+ }
+ }
+
+ fn create_downloader(&self) -> Downloader {
+ Downloader::new(&self.file_name, &None, &self.tmp_dir)
+ }
+
+ fn move_file(&self) -> Result<(), ConfigDownloadError> {
+ let src = &self.tmp_dir.join(&self.file_name);
+ let dest = &self.destination_path;
+ let _ = fs::copy(src, dest).map_err(|_| ConfigDownloadError::FileCopyFailed {
+ src: src.to_path_buf(),
+ dest: dest.to_path_buf(),
+ })?;
+ Ok(())
+ }
+}
+
+pub fn get_file_change_notification_message(config_type: String) -> Message {
+ let notification = json!({ "changedFile": config_type }).to_string();
+ Message::new(&Topic::new_unchecked(BROADCASTING_TOPIC), notification)
+}
+
+struct GetDownloadConfigFileMessage {}
+
+impl GetSmartRestMessage for GetDownloadConfigFileMessage {
+ fn status_executing() -> Result<SmartRest, SmartRestSerializerError> {
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yDownloadConfigFile)
+ .to_smartrest()
+ }
+
+ fn status_successful() -> Result<SmartRest, SmartRestSerializerError> {
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yDownloadConfigFile)
+ .to_smartrest()
+ }
+
+ fn status_failed(failure_reason: String) -> Result<SmartRest, SmartRestSerializerError> {
+ SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yDownloadConfigFile,
+ failure_reason,
+ )
+ .to_smartrest()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use assert_matches::*;
+
+ #[test]
+ fn create_config_download_request() -> Result<(), anyhow::Error> {
+ let payload = "524,rina0005,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/tedge.toml";
+ let smartrest_request = SmartRestConfigDownloadRequest::from_smartrest(payload)?;
+ let plugin_config = PluginConfig {
+ files: vec![
+ "/etc/tedge/tedge.toml".to_string(),
+ "/etc/tedge/mosquitto-conf/c8y-bridge.conf".to_string(),
+ "/etc/tedge/mosquitto-conf/tedge-mosquitto.conf".to_string(),
+ "/etc/mosquitto/mosquitto.conf".to_string(),
+ ],
+ };
+ let config_download_request =
+ ConfigDownloadRequest::try_new(smartrest_request, &plugin_config)?;
+ assert_eq!(
+ config_download_request,
+ ConfigDownloadRequest {
+ download_info: DownloadInfo {
+ url: "https://test.cumulocity.com/inventory/binaries/70208".to_string(),
+ auth: None
+ },
+ destination_path: PathBuf::from("/etc/tedge/tedge.toml"),
+ tmp_dir: PathBuf::from("/tmp"),
+ file_name: "tedge.toml".to_string()
+ }
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn requested_config_does_not_match_config_plugin() -> Result<(), anyhow::Error> {
+ let payload = "524,rina0005,https://test.cumulocity.com/inventory/binaries/70208,/etc/tedge/not_in_config.toml";
+ let smartrest_request = SmartRestConfigDownloadRequest::from_smartrest(payload)?;
+ let plugin_config = PluginConfig {
+ files: vec![
+ "/etc/tedge/tedge.toml".to_string(),
+ "/etc/tedge/mosquitto-conf/c8y-bridge.conf".to_string(),
+ "/etc/tedge/mosquitto-conf/tedge-mosquitto.conf".to_string(),
+ "/etc/mosquitto/mosquitto.conf".to_string(),
+ ],
+ };
+ let config_download_request =
+ ConfigDownloadRequest::try_new(smartrest_request, &plugin_config);
+ assert_matches!(
+ config_download_request,
+ Err(ConfigDownloadError::InvalidRequestedConfigType { .. })
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn get_smartrest_executing() {
+ let message = GetDownloadConfigFileMessage::executing().unwrap();
+ assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap());
+ assert_eq!(
+ message.payload_str().unwrap(),
+ "501,c8y_DownloadConfigFile\n"
+ );
+ }
+
+ #[test]
+ fn get_smartrest_successful() {
+ let message = GetDownloadConfigFileMessage::successful().unwrap();
+ assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap());
+ assert_eq!(
+ message.payload_str().unwrap(),
+ "503,c8y_DownloadConfigFile,\n"
+ );
+ }
+
+ #[test]
+ fn get_smartrest_failed() {
+ let message = GetDownloadConfigFileMessage::failed("failed reason".to_string()).unwrap();
+ assert_eq!(message.topic, Topic::new("c8y/s/us").unwrap());
+ assert_eq!(
+ message.payload_str().unwrap(),
+ "502,c8y_DownloadConfigFile,\"failed reason\"\n"
+ );
+ }
+}
diff --git a/plugins/c8y_configuration_plugin/src/error.rs b/plugins/c8y_configuration_plugin/src/error.rs
new file mode 100644
index 00000000..bb84dc30
--- /dev/null
+++ b/plugins/c8y_configuration_plugin/src/error.rs
@@ -0,0 +1,28 @@
+use std::path::PathBuf;
+
+#[derive(thiserror::Error, Debug)]
+pub enum ConfigDownloadError {
+ #[error("The file is read-only {path:?}")]
+ ReadOnlyFile { path: PathBuf },
+
+ #[error("The file name is not found from {path:?}")]
+ FileNameNotFound { path: PathBuf },
+
+ #[error("The file name is invalid. {path:?}")]
+ InvalidFileName { path: PathBuf },
+
+ #[error("The file is not accessible. {path:?}")]
+ FileNotAccessible { path: PathBuf },
+
+ #[error("Failed to copy a file from {src:?} to {dest:?}")]
+ FileCopyFailed { src: PathBuf, dest: PathBuf },
+
+ #[error("The requested config_type {path} doesn't match the plugin config.")]
+ InvalidRequestedConfigType { path: String },
+
+ #[error(transparent)]
+ FromTEdgeConfig(#[from] tedge_config::TEdgeConfigError),
+
+ #[error(transparent)]
+ FromConfigSetting(#[from] tedge_config::ConfigSettingError),
+}
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 30a8d289..a1f59a73 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -1,9 +1,13 @@
mod config;
+mod download;
+mod error;
mod smartrest;
use crate::config::PluginConfig;
+use crate::download::handle_config_download_request;
use anyhow::Result;
use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest;
use c8y_smartrest::{
smartrest_deserializer::SmartRestConfigUploadRequest,
smartrest_serializer::{
@@ -26,7 +30,7 @@ const CONFIG_ROOT_PATH: &str = "/etc/tedge/c8y";
const LOG_LEVEL_DEBUG: bool = false;
#[cfg(debug_assertions)]
-const LOG_LEVEL_DEBUG: bool = true;
+const LOG_LEVEL_DEBUG: bool = false;
async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error> {
let tedge_config = get_tedge_config()?;
@@ -41,7 +45,6 @@ async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error>
Ok(mqtt_client)
}
-/// creates an http client
pub async fn create_http_client() -> Result<JwtAuthHttpProxy, anyhow::Error> {
let config = get_tedge_config()?;
let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?;
@@ -152,12 +155,20 @@ async fn main() -> Result<(), anyhow::Error> {
while let Some(message) = mqtt_client.received.next().await {
debug!("Received {:?}", message);
if let Ok(payload) = message.payload_str() {
- let result = match payload.split(',').next() {
- Some("524") => {
- debug!("{}", message.payload_str()?);
- todo!() // c8y_DownloadConfigFile
+ let result = match payload.split(',').next().unwrap_or_default() {
+ "524" => {
+ debug!("{}", payload);
+ let config_download_request =
+ SmartRestConfigDownloadRequest::from_smartrest(payload)?;
+ handle_config_download_request(
+ &plugin_config,
+ config_download_request,
+ &mut mqtt_client,
+ &mut http_client,
+ )
+ .await
}
- Some("526") => {
+ "526" => {
debug!("{}", payload);
// retrieve config file upload smartrest request from payload
let config_upload_request =
diff --git a/plugins/c8y_configuration_plugin/src/smartrest.rs b/plugins/c8y_configuration_plugin/src/smartrest.rs
index f1255f97..5d525673 100644
--- a/plugins/c8y_configuration_plugin/src/smartrest.rs
+++ b/plugins/c8y_configuration_plugin/src/smartrest.rs
@@ -1,4 +1,6 @@
use crate::config::PluginConfig;
+use c8y_smartrest::error::SmartRestSerializerError;
+use c8y_smartrest::smartrest_serializer::SmartRest;
use c8y_smartrest::topic::C8yTopic;
use mqtt_channel::Message;
@@ -20,6 +22,32 @@ impl PluginConfig {
}
}
+pub trait GetSmartRestMessage {
+ fn executing() -> Result<Message, SmartRestSerializerError> {
+ let status = Self::status_executing()?;
+ Ok(Self::create_message(status))
+ }
+
+ fn successful() -> Result<Message, SmartRestSerializerError> {
+ let status = Self::status_successful()?;
+ Ok(Self::create_message(status))
+ }
+
+ fn failed(failure_reason: String) -> Result<Message, SmartRestSerializerError> {
+ let status = Self::status_failed(failure_reason)?;
+ Ok(Self::create_message(status))
+ }
+
+ fn create_message(payload: SmartRest) -> Message {
+ let topic = C8yTopic::SmartRestResponse.to_topic().unwrap(); // never fail
+ Message::new(&topic, payload)
+ }
+
+ fn status_executing() -> Result<SmartRest, SmartRestSerializerError>;
+ fn status_successful() -> Result<SmartRest, SmartRestSerializerError>;
+ fn status_failed(failure_reason: String) -> Result<SmartRest, SmartRestSerializerError>;
+}
+
#[cfg(test)]
mod tests {
use super::*;