summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-04-12 17:36:53 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-04-12 18:02:39 +0530
commit19aea1a260196593c82ac84954b7f844a2fd209b (patch)
tree79f85e4414a840605ef706fcad52263bace2cb49
parent6ff7da30dc2d1979b237ac9b014f741b65bfef88 (diff)
Issue #1030 Support c8y_UploadConfigFile operation
-rw-r--r--Cargo.lock1
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs54
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_deserializer.rs22
-rw-r--r--crates/core/c8y_smartrest/src/smartrest_serializer.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs10
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml1
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs151
-rw-r--r--plugins/c8y_configuration_plugin/src/smartrest.rs7
8 files changed, 227 insertions, 21 deletions
diff --git a/Cargo.lock b/Cargo.lock
index bffb583e..8e309ced 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -392,6 +392,7 @@ name = "c8y_configuration_plugin"
version = "0.6.1"
dependencies = [
"anyhow",
+ "c8y_api",
"c8y_smartrest",
"mqtt_channel",
"serde",
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 9e75faf3..53fe8611 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -8,7 +8,7 @@ use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::Smar
use mockall::automock;
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, path::Path, time::Duration};
use tedge_config::{
C8yUrlSetting, ConfigSettingAccessor, ConfigSettingAccessorStringExt, DeviceIdSetting,
MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
@@ -43,6 +43,12 @@ pub trait C8YHttpProxy: Send + Sync {
&mut self,
log_content: &str,
) -> Result<String, SMCumulocityMapperError>;
+
+ async fn upload_config_file(
+ &mut self,
+ config_path: &Path,
+ config_content: &str,
+ ) -> Result<String, SMCumulocityMapperError>;
}
/// Define a C8y endpoint
@@ -266,6 +272,25 @@ impl JwtAuthHttpProxy {
)
}
+ fn create_event(
+ &self,
+ event_type: String,
+ event_text: Option<String>,
+ event_time: Option<OffsetDateTime>,
+ ) -> C8yCreateEvent {
+ let c8y_managed_object = C8yManagedObject {
+ id: self.end_point.c8y_internal_id.clone(),
+ };
+
+ C8yCreateEvent::new(
+ Some(c8y_managed_object),
+ event_type.clone(),
+ event_time.map_or(OffsetDateTime::now_utc(), |time| time),
+ event_text.map_or(event_type, |text| text),
+ HashMap::new(),
+ )
+ }
+
async fn send_event_internal(
&mut self,
c8y_event: C8yCreateEvent,
@@ -375,6 +400,33 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
let _response = self.http_con.execute(request).await?;
Ok(binary_upload_event_url)
}
+
+ async fn upload_config_file(
+ &mut self,
+ config_path: &Path,
+ config_content: &str,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+
+ let config_file_event = self.create_event(config_path.display().to_string(), None, None);
+ let event_response_id = self.send_event_internal(config_file_event).await?;
+ let binary_upload_event_url = self
+ .end_point
+ .get_url_for_event_binary_upload(&event_response_id);
+
+ let request = self
+ .http_con
+ .post(&binary_upload_event_url)
+ .header("Accept", "application/json")
+ .header("Content-Type", "text/plain")
+ .body(config_content.to_string())
+ .bearer_auth(token.token())
+ .timeout(Duration::from_millis(10000))
+ .build()?;
+
+ let _response = self.http_con.execute(request).await?;
+ Ok(binary_upload_event_url)
+ }
}
#[cfg(test)]
diff --git a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
index 244d77bc..c65f49b2 100644
--- a/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_deserializer.rs
@@ -242,6 +242,28 @@ impl SmartRestRestartRequest {
}
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+pub struct SmartRestConfigUploadRequest {
+ pub message_id: String,
+ pub device: String,
+ pub config_type: String,
+}
+
+impl SmartRestConfigUploadRequest {
+ pub fn from_smartrest(smartrest: &str) -> Result<Self, SmartRestDeserializerError> {
+ let mut rdr = ReaderBuilder::new()
+ .has_headers(false)
+ .flexible(true)
+ .from_reader(smartrest.as_bytes());
+
+ rdr.deserialize()
+ .next()
+ .ok_or_else(|| panic!("empty request"))
+ .unwrap() // does already panic before this, so this unwrap is only required for type lineup
+ .map_err(SmartRestDeserializerError::from)
+ }
+}
+
type JwtToken = String;
#[derive(Debug, Deserialize, PartialEq)]
diff --git a/crates/core/c8y_smartrest/src/smartrest_serializer.rs b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
index 7f548d74..b3ec3e44 100644
--- a/crates/core/c8y_smartrest/src/smartrest_serializer.rs
+++ b/crates/core/c8y_smartrest/src/smartrest_serializer.rs
@@ -10,6 +10,7 @@ pub enum CumulocitySupportedOperations {
C8ySoftwareUpdate,
C8yLogFileRequest,
C8yRestartRequest,
+ C8yUploadConfigFile,
}
impl From<CumulocitySupportedOperations> for &'static str {
@@ -18,6 +19,7 @@ impl From<CumulocitySupportedOperations> for &'static str {
CumulocitySupportedOperations::C8ySoftwareUpdate => "c8y_SoftwareUpdate",
CumulocitySupportedOperations::C8yLogFileRequest => "c8y_LogfileRequest",
CumulocitySupportedOperations::C8yRestartRequest => "c8y_Restart",
+ CumulocitySupportedOperations::C8yUploadConfigFile => "c8y_UploadConfigFile",
}
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index bafe2b84..849c7563 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -18,7 +18,7 @@ use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
use serde_json::json;
use serial_test::serial;
-use std::time::Duration;
+use std::{path::Path, time::Duration};
use test_case::test_case;
use tokio::task::JoinHandle;
@@ -828,6 +828,14 @@ impl C8YHttpProxy for FakeC8YHttpProxy {
) -> Result<String, SMCumulocityMapperError> {
Ok("123".into())
}
+
+ async fn upload_config_file(
+ &mut self,
+ _config_path: &Path,
+ _config_content: &str,
+ ) -> Result<String, SMCumulocityMapperError> {
+ Ok("fake/upload/url".into())
+ }
}
async fn start_c8y_mapper(mqtt_port: u16) -> Result<JoinHandle<()>, anyhow::Error> {
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index 17a463e7..1a434fc3 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -9,6 +9,7 @@ description = "Thin.edge.io operation plugin for Cumulocity configuration manage
[dependencies]
anyhow = "1.0"
+c8y_api = { path = "../../crates/core/c8y_api" }
c8y_smartrest = { path = "../../crates/core/c8y_smartrest" }
mqtt_channel = { path = "../../crates/common/mqtt_channel" }
serde = { version = "1.0", features = ["derive"] }
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index ea98df54..30a8d289 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -2,11 +2,23 @@ mod config;
mod smartrest;
use crate::config::PluginConfig;
-use c8y_smartrest::topic::C8yTopic;
-use mqtt_channel::{SinkExt, StreamExt};
-use std::path::PathBuf;
+use anyhow::Result;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_smartrest::{
+ smartrest_deserializer::SmartRestConfigUploadRequest,
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
+ SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
+ },
+ topic::C8yTopic,
+};
+use mqtt_channel::{Connection, Message, SinkExt, StreamExt};
+use std::{
+ fs::read_to_string,
+ path::{Path, PathBuf},
+};
use tedge_config::{get_tedge_config, ConfigSettingAccessor, MqttPortSetting};
-use tracing::{debug, error, info, instrument, warn};
+use tracing::{debug, error};
const CONFIG_ROOT_PATH: &str = "/etc/tedge/c8y";
@@ -29,12 +41,106 @@ async fn create_mqtt_client() -> Result<mqtt_channel::Connection, anyhow::Error>
Ok(mqtt_client)
}
+/// creates an http client
+pub async fn create_http_client() -> Result<JwtAuthHttpProxy, anyhow::Error> {
+ let config = get_tedge_config()?;
+ let mut http_proxy = JwtAuthHttpProxy::try_new(&config).await?;
+ let () = http_proxy.init().await?;
+ Ok(http_proxy)
+}
+
+/// returns a c8y message specifying to set the upload config file operation status to executing.
+///
+/// example message: '501,c8y_UploadConfigFile'
+pub fn get_upload_config_file_executing_message() -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToExecuting::new(CumulocitySupportedOperations::C8yUploadConfigFile)
+ .to_smartrest()?;
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+/// returns a c8y SmartREST message indicating the success of the upload config file operation.
+///
+/// example message: '503,c8y_UploadConfigFile,https://{c8y.url}/etc...'
+pub fn get_upload_config_file_successful_message(
+ binary_upload_event_url: &str,
+) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status =
+ SmartRestSetOperationToSuccessful::new(CumulocitySupportedOperations::C8yUploadConfigFile)
+ .with_response_parameter(binary_upload_event_url)
+ .to_smartrest()?;
+
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+/// returns a c8y SmartREST message indicating the failure of the upload config file operation.
+///
+/// example message: '503,c8y_UploadConfigFile,https://{c8y.url}/etc...'
+pub fn get_upload_config_file_failure_message(
+ failure_reason: String,
+) -> Result<Message, anyhow::Error> {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let smartrest_set_operation_status = SmartRestSetOperationToFailed::new(
+ CumulocitySupportedOperations::C8yUploadConfigFile,
+ failure_reason,
+ )
+ .to_smartrest()?;
+
+ Ok(Message::new(&topic, smartrest_set_operation_status))
+}
+
+async fn handle_config_upload_request(
+ config_upload_request: SmartRestConfigUploadRequest,
+ mqtt_client: &mut Connection,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<()> {
+ // set config upload request to executing
+ let msg = get_upload_config_file_executing_message()?;
+ let () = mqtt_client.published.send(msg).await?;
+
+ let upload_result = upload_config_file(
+ Path::new(config_upload_request.config_type.as_str()),
+ http_client,
+ )
+ .await;
+ match upload_result {
+ Ok(upload_event_url) => {
+ let successful_message = get_upload_config_file_successful_message(&upload_event_url)?;
+ let () = mqtt_client.published.send(successful_message).await?;
+ }
+ Err(err) => {
+ let failed_message = get_upload_config_file_failure_message(err.to_string())?;
+ let () = mqtt_client.published.send(failed_message).await?;
+ }
+ }
+
+ Ok(())
+}
+
+async fn upload_config_file(
+ config_file_path: &Path,
+ http_client: &mut JwtAuthHttpProxy,
+) -> Result<String> {
+ // read the config file contents
+ let config_content = read_to_string(config_file_path)?;
+
+ // upload config file
+ let upload_event_url = http_client
+ .upload_config_file(config_file_path, &config_content)
+ .await?;
+
+ Ok(upload_event_url)
+}
+
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
tedge_utils::logging::initialise_tracing_subscriber(LOG_LEVEL_DEBUG);
// Create required clients
let mut mqtt_client = create_mqtt_client().await?;
+ let mut http_client = create_http_client().await?;
let plugin_config = PluginConfig::new(PathBuf::from(CONFIG_ROOT_PATH));
@@ -45,16 +151,35 @@ async fn main() -> Result<(), anyhow::Error> {
// Mqtt message loop
while let Some(message) = mqtt_client.received.next().await {
debug!("Received {:?}", message);
- match message.payload_str()?.split(',').nth(0).unwrap_or_default() {
- "524" => {
- debug!("{}", message.payload_str()?);
- todo!() // c8y_DownloadConfigFile
- }
- "526" => {
- debug!("{}", message.payload_str()?);
- todo!() // c8y_UploadConfigFile
+ if let Ok(payload) = message.payload_str() {
+ let result = match payload.split(',').next() {
+ Some("524") => {
+ debug!("{}", message.payload_str()?);
+ todo!() // c8y_DownloadConfigFile
+ }
+ Some("526") => {
+ debug!("{}", payload);
+ // retrieve config file upload smartrest request from payload
+ let config_upload_request =
+ SmartRestConfigUploadRequest::from_smartrest(payload)?;
+
+ // handle the config file upload request
+ handle_config_upload_request(
+ config_upload_request,
+ &mut mqtt_client,
+ &mut http_client,
+ )
+ .await
+ }
+ _ => {
+ // Ignore operation messages not meant for this plugin
+ Ok(())
+ }
+ };
+
+ if let Err(err) = result {
+ error!("Handling of operation: '{}' failed with {}", payload, err);
}
- _ => {}
}
}
diff --git a/plugins/c8y_configuration_plugin/src/smartrest.rs b/plugins/c8y_configuration_plugin/src/smartrest.rs
index 6822f5a7..f1255f97 100644
--- a/plugins/c8y_configuration_plugin/src/smartrest.rs
+++ b/plugins/c8y_configuration_plugin/src/smartrest.rs
@@ -1,10 +1,5 @@
use crate::config::PluginConfig;
-use c8y_smartrest::{
- smartrest_serializer::{
- SmartRestSerializer, SmartRestSetOperationToExecuting, SmartRestSetOperationToSuccessful,
- },
- topic::C8yTopic,
-};
+use c8y_smartrest::topic::C8yTopic;
use mqtt_channel::Message;
impl PluginConfig {