diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-04-12 17:36:53 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-04-12 18:02:39 +0530 |
commit | 19aea1a260196593c82ac84954b7f844a2fd209b (patch) | |
tree | 79f85e4414a840605ef706fcad52263bace2cb49 | |
parent | 6ff7da30dc2d1979b237ac9b014f741b65bfef88 (diff) |
Issue #1030 Support c8y_UploadConfigFile operation
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs | 54 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_deserializer.rs | 22 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/smartrest_serializer.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 10 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 151 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/smartrest.rs | 7 |
8 files changed, 227 insertions, 21 deletions
@@ -392,6 +392,7 @@ name = "c8y_configuration_plugin" version = "0.6.1" dependencies = [ "anyhow", + "c8y_api", "c8y_smartrest", "mqtt_channel", "serde", diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 9e75faf3..53fe8611 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -8,7 +8,7 @@ use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::Smar use mockall::automock; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, path::Path, time::Duration}; use tedge_config::{ C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, @@ -43,6 +43,12 @@ pub trait C8YHttpProxy: Send + Sync { &mut self, log_content: &str, ) -> Result<String, SMCumulocityMapperError>; + + async fn upload_config_file( + &mut self, + config_path: &Path, + config_content: &str, + ) -> Result<String, SMCumulocityMapperError>; } /// Define a C8y endpoint @@ -266,6 +272,25 @@ impl JwtAuthHttpProxy { ) } + fn create_event( + &self, + event_type: String, + event_text: Option<String>, + event_time: Option<OffsetDateTime>, + ) -> C8yCreateEvent { + let c8y_managed_object = C8yManagedObject { + id: self.end_point.c8y_internal_id.clone(), + }; + + C8yCreateEvent::new( + Some(c8y_managed_object), + event_type.clone(), + event_time.map_or(OffsetDateTime::now_utc(), |time| time), + event_text.map_or(event_type, |text| text), + HashMap::new(), + ) + } + async fn send_event_internal( &mut self, c8y_event: C8yCreateEvent, @@ -375,6 +400,33 @@ impl C8YHttpProxy for JwtAuthHttpProxy { let _response = self.http_con.execute(request).await?; Ok(binary_upload_event_url) } + + async fn upload_config_file( + &mut self, + config_path: &Path, + config_content: &str, + ) -> Result<String, SMCumulocityMapperError> { + let token = self.get_jwt_token().await?; + + let config_file_event = self.create_event(config_path.display().to_string(), None, None); + let event_response_id = self.send_event_internal(config_file_event).await?; + let binary_upload_event_url = self + .end_point + .get_url_for_event_binary_upload(&event_response_id); + + let request = self + .http_con + .post(&binary_upload_event_url) + .header("Accept", "application/json") + .header("Content-Type", "text/plain") + .body(config_content.to_string()) + .bearer_auth(token.token()) + .timeout(Duration::from_millis(10000)) + .build()?; + + let _response = self.http_con.execute(request).await?; + Ok(binary_upload_event_url) + } } #[cfg(test)] diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs index 244d77bc..c65f49b2 100644 --- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs @@ -242,6 +242,28 @@ impl SmartRestRestartRequest { } } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct SmartRestConfigUploadRequest { + pub message_id: String, + pub device: String, + pub config_type: String, +} + +impl SmartRestConfigUploadRequest { + pub fn from_smartrest(smartrest: &str) -> Result<Self, SmartRestDeserializerError> { + let mut rdr = ReaderBuilder::new() + .has_headers(false) + .flexible(true) + .from_reader(smartrest.as_bytes()); + + rdr.deserialize() + .next() + .ok_or_else(|| panic!("empty request")) + .unwrap() // does already panic before this, so this unwrap is only required for type lineup + .map_err(SmartRestDeserializerError::from) + } +} + type JwtToken = String; #[derive(Debug, Deserialize, PartialEq)] diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs index 7f548d74..b3ec3e44 100644 --- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs +++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs @@ -10,6 +10,7 @@ pub enum CumulocitySupportedOperations { C8ySoftwareUpdate, C8yLogFileRequest, C8yRestartRequest, + C8yUploadConfigFile, } impl From<CumulocitySupportedOperations> for &'static str { @@ -18,6 +19,7 @@ impl From<CumulocitySupportedOperations> for &'static str { CumulocitySupportedOperations::C8ySoftwareUpdate => "c8y_SoftwareUpdate", CumulocitySupportedOperations::C8yLogFileRequest => "c8y_LogfileRequest", CumulocitySupportedOperations::C8yRestartRequest => "c8y_Restart", + CumulocitySupportedOperations::C8yUploadConfigFile => "c8y_UploadConfigFile", } } } diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index bafe2b84..849c7563 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -18,7 +18,7 @@ use mqtt_channel::{Message, Topic}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; use serde_json::json; use serial_test::serial; -use std::time::Duration; +use std::{path::Path, time::Duration}; use test_case::test_case; use tokio::task::JoinHandle; @@ -828,6 +828,14 @@ impl C8YHttpProxy for FakeC8YHttpProxy { ) -> Result<String, SMCumulocityMapperError> { Ok("123".into()) } + + async fn upload_config_file( + &mut self, + _config_path: &Path, + _config_content: &str, + ) -> Result<String, SMCumulocityMapperError> { + Ok("fake/upload/url".into()) + } } async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> { diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 17a463e7..1a434fc3 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -9,6 +9,7 @@ description = "Thin.edge.io operation plugin for Cumulocity configuration manage [dependencies] anyhow = "1.0" +c8y_api = { path = "../../crates/core/c8y_api" } c8y_smartrest = { path = "../../crates/core/c8y_smartrest" } mqtt_channel = { path = "../../crates/common/mqtt_channel" } serde = { version = "1.0", features = ["derive"] } diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index ea98df54..30a8d289 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -2,11 +2,23 @@ mod config; mod smartrest; use crate::config::PluginConfig; -use c8y_smartrest::topic::C8yTopic; -use mqtt_channel::{SinkExt, StreamExt}; -use std::path::PathBuf; +use anyhow::Result; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_smartrest::{ + smartrest_deserializer::SmartRestConfigUploadRequest, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting, + SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful, + }, + topic::C8yTopic, +}; +use mqtt_channel::{Connection, Message, SinkExt, StreamExt}; +use std::{ + fs::read_to_string, + path::{Path, PathBuf}, +}; use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting}; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error}; const CONFIG_ROOT_PATH: &str = "/etc/tedge/c8y"; @@ -29,12 +41,106 @@ async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error> Ok(mqtt_client) } +/// creates an http client +pub async fn create_http_client() -> Result<JwtAuthHttpProxy, anyhow::Error> { + let config = get_tedge_config()?; + let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?; + let () = http_proxy.init().await?; + Ok(http_proxy) +} + +/// returns a c8y message specifying to set the upload config file operation status to executing. +/// +/// example message: '501,c8y_UploadConfigFile' +pub fn get_upload_config_file_executing_message() -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yUploadConfigFile) + .to_smartrest()?; + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y SmartREST message indicating the success of the upload config file operation. +/// +/// example message: '503,c8y_UploadConfigFile,https://{c8y.url}/etc...' +pub fn get_upload_config_file_successful_message( + binary_upload_event_url: &str, +) -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = + SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yUploadConfigFile) + .with_response_parameter(binary_upload_event_url) + .to_smartrest()?; + + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +/// returns a c8y SmartREST message indicating the failure of the upload config file operation. +/// +/// example message: '503,c8y_UploadConfigFile,https://{c8y.url}/etc...' +pub fn get_upload_config_file_failure_message( + failure_reason: String, +) -> Result<Message, anyhow::Error> { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let smartrest_set_operation_status = SmartRestSetOperationToFailed::new( + CumulocitySupportedOperations::C8yUploadConfigFile, + failure_reason, + ) + .to_smartrest()?; + + Ok(Message::new(&topic, smartrest_set_operation_status)) +} + +async fn handle_config_upload_request( + config_upload_request: SmartRestConfigUploadRequest, + mqtt_client: &mut Connection, + http_client: &mut JwtAuthHttpProxy, +) -> Result<()> { + // set config upload request to executing + let msg = get_upload_config_file_executing_message()?; + let () = mqtt_client.published.send(msg).await?; + + let upload_result = upload_config_file( + Path::new(config_upload_request.config_type.as_str()), + http_client, + ) + .await; + match upload_result { + Ok(upload_event_url) => { + let successful_message = get_upload_config_file_successful_message(&upload_event_url)?; + let () = mqtt_client.published.send(successful_message).await?; + } + Err(err) => { + let failed_message = get_upload_config_file_failure_message(err.to_string())?; + let () = mqtt_client.published.send(failed_message).await?; + } + } + + Ok(()) +} + +async fn upload_config_file( + config_file_path: &Path, + http_client: &mut JwtAuthHttpProxy, +) -> Result<String> { + // read the config file contents + let config_content = read_to_string(config_file_path)?; + + // upload config file + let upload_event_url = http_client + .upload_config_file(config_file_path, &config_content) + .await?; + + Ok(upload_event_url) +} + #[tokio::main] async fn main() -> Result<(), anyhow::Error> { tedge_utils::logging::initialise_tracing_subscriber(LOG_LEVEL_DEBUG); // Create required clients let mut mqtt_client = create_mqtt_client().await?; + let mut http_client = create_http_client().await?; let plugin_config = PluginConfig::new(PathBuf::from(CONFIG_ROOT_PATH)); @@ -45,16 +151,35 @@ async fn main() -> Result<(), anyhow::Error> { // Mqtt message loop while let Some(message) = mqtt_client.received.next().await { debug!("Received {:?}", message); - match message.payload_str()?.split(',').nth(0).unwrap_or_default() { - "524" => { - debug!("{}", message.payload_str()?); - todo!() // c8y_DownloadConfigFile - } - "526" => { - debug!("{}", message.payload_str()?); - todo!() // c8y_UploadConfigFile + if let Ok(payload) = message.payload_str() { + let result = match payload.split(',').next() { + Some("524") => { + debug!("{}", message.payload_str()?); + todo!() // c8y_DownloadConfigFile + } + Some("526") => { + debug!("{}", payload); + // retrieve config file upload smartrest request from payload + let config_upload_request = + SmartRestConfigUploadRequest::from_smartrest(payload)?; + + // handle the config file upload request + handle_config_upload_request( + config_upload_request, + &mut mqtt_client, + &mut http_client, + ) + .await + } + _ => { + // Ignore operation messages not meant for this plugin + Ok(()) + } + }; + + if let Err(err) = result { + error!("Handling of operation: '{}' failed with {}", payload, err); } - _ => {} } } diff --git a/plugins/c8y_configuration_plugin/src/smartrest.rs b/plugins/c8y_configuration_plugin/src/smartrest.rs index 6822f5a7..f1255f97 100644 --- a/plugins/c8y_configuration_plugin/src/smartrest.rs +++ b/plugins/c8y_configuration_plugin/src/smartrest.rs @@ -1,10 +1,5 @@ use crate::config::PluginConfig; -use c8y_smartrest::{ - smartrest_serializer::{ - SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToSuccessful, - }, - topic::C8yTopic, -}; +use c8y_smartrest::topic::C8yTopic; use mqtt_channel::Message; impl PluginConfig { |