summaryrefslogtreecommitdiffstats
path: root/plugins/c8y_configuration_plugin
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-05-11 20:38:58 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-05-13 19:10:18 +0530
commitbd98cc741db568b27d46dae4e4be3ca35d47b02f (patch)
tree45f2bb3f8e93476cec1b12f542fab209fa2cc95e /plugins/c8y_configuration_plugin
parent42859fa5a847179867f3dce2143cc533655bc2de (diff)
Issue #1030 c8y_configuration_plugin integration tests
Diffstat (limited to 'plugins/c8y_configuration_plugin')
-rw-r--r--plugins/c8y_configuration_plugin/Cargo.toml5
-rw-r--r--plugins/c8y_configuration_plugin/src/download.rs6
-rw-r--r--plugins/c8y_configuration_plugin/src/main.rs106
-rw-r--r--plugins/c8y_configuration_plugin/src/upload.rs91
4 files changed, 184 insertions, 24 deletions
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml
index 75f6d895..d7fa4d91 100644
--- a/plugins/c8y_configuration_plugin/Cargo.toml
+++ b/plugins/c8y_configuration_plugin/Cargo.toml
@@ -33,5 +33,10 @@ tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
assert_matches = "1.5"
+mockall = "0.11"
+mockito = "0.31"
+mqtt_tests = { path = "../../crates/tests/mqtt_tests" }
+serial_test = "0.5"
tempfile = "3.3"
test-case = "2.0"
+toml = "0.5"
diff --git a/plugins/c8y_configuration_plugin/src/download.rs b/plugins/c8y_configuration_plugin/src/download.rs
index 766a9238..b293926c 100644
--- a/plugins/c8y_configuration_plugin/src/download.rs
+++ b/plugins/c8y_configuration_plugin/src/download.rs
@@ -1,7 +1,7 @@
use crate::error::ConfigManagementError;
use crate::smartrest::TryIntoOperationStatusMessage;
use crate::{error, PluginConfig};
-use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_api::http_proxy::C8YHttpProxy;
use c8y_smartrest::error::SmartRestSerializerError;
use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest;
use c8y_smartrest::smartrest_serializer::{
@@ -24,7 +24,7 @@ pub async fn handle_config_download_request(
smartrest_request: SmartRestConfigDownloadRequest,
tmp_dir: PathBuf,
mqtt_client: &mut Connection,
- http_client: &mut JwtAuthHttpProxy,
+ http_client: &mut impl C8YHttpProxy,
) -> Result<(), anyhow::Error> {
let executing_message = DownloadConfigFileStatusMessage::executing()?;
let () = mqtt_client.published.send(executing_message).await?;
@@ -53,7 +53,7 @@ async fn download_config_file(
plugin_config: &PluginConfig,
smartrest_request: SmartRestConfigDownloadRequest,
tmp_dir: PathBuf,
- http_client: &mut JwtAuthHttpProxy,
+ http_client: &mut impl C8YHttpProxy,
) -> Result<(), anyhow::Error> {
// Convert smartrest request to config download request struct
let mut config_download_request =
diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs
index 884c84c6..44307255 100644
--- a/plugins/c8y_configuration_plugin/src/main.rs
+++ b/plugins/c8y_configuration_plugin/src/main.rs
@@ -59,10 +59,7 @@ pub struct ConfigPluginOpt {
pub config_file: PathBuf,
}
-async fn create_mqtt_client(
- tedge_config: &TEdgeConfig,
-) -> Result<mqtt_channel::Connection, anyhow::Error> {
- let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, anyhow::Error> {
let mqtt_config = mqtt_channel::Config::default()
.with_port(mqtt_port)
.with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
@@ -97,14 +94,28 @@ async fn main() -> Result<(), anyhow::Error> {
let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
let tedge_config = config_repository.load()?;
+ let plugin_config = PluginConfig::new(config_plugin_opt.config_file);
+
// Create required clients
- let mut mqtt_client = create_mqtt_client(&tedge_config).await?;
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mut http_client = create_http_client(&tedge_config).await?;
- let plugin_config = PluginConfig::new(config_plugin_opt.config_file);
+ let tmp_dir = tedge_config.query(TmpPathSetting)?.into();
+
+ run(plugin_config, mqtt_port, &mut http_client, tmp_dir).await
+}
+
+async fn run(
+ plugin_config: PluginConfig,
+ mqtt_port: u16,
+ http_client: &mut impl C8YHttpProxy,
+ tmp_dir: PathBuf,
+) -> Result<(), anyhow::Error> {
+ let mut mqtt_client = create_mqtt_client(mqtt_port).await?;
// Publish supported configuration types
let msg = plugin_config.to_supported_config_types_message()?;
+ debug!("Plugin init message: {:?}", msg);
let () = mqtt_client.published.send(msg).await?;
// Mqtt message loop
@@ -116,14 +127,12 @@ async fn main() -> Result<(), anyhow::Error> {
let config_download_request =
SmartRestConfigDownloadRequest::from_smartrest(payload)?;
- let tmp_dir = tedge_config.query(TmpPathSetting)?.into();
-
handle_config_download_request(
&plugin_config,
config_download_request,
- tmp_dir,
+ tmp_dir.clone(),
&mut mqtt_client,
- &mut http_client,
+ http_client,
)
.await
}
@@ -136,7 +145,7 @@ async fn main() -> Result<(), anyhow::Error> {
handle_config_upload_request(
config_upload_request,
&mut mqtt_client,
- &mut http_client,
+ http_client,
)
.await
}
@@ -185,3 +194,78 @@ fn create_operation_files(config_dir: &str) -> Result<(), anyhow::Error> {
)?;
Ok(())
}
+
+#[cfg(test)]
+mod tests {
+ use std::{collections::HashSet, path::Path, time::Duration};
+
+ use crate::config::FileEntry;
+
+ use super::*;
+ use c8y_api::http_proxy::MockC8YHttpProxy;
+ use mockall::predicate;
+
+ const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ #[serial_test::serial]
+ async fn test_message_dispatch() -> anyhow::Result<()> {
+ let test_config_path = "/some/test/config";
+
+ let plugin_config = PluginConfig {
+ files: HashSet::from([FileEntry::new(test_config_path.to_string())]),
+ };
+
+ let broker = mqtt_tests::test_mqtt_broker();
+
+ let mut messages = broker.messages_published_on("c8y/s/us").await;
+
+ let mut http_client = MockC8YHttpProxy::new();
+ http_client
+ .expect_upload_config_file()
+ .with(predicate::eq(Path::new(test_config_path)))
+ .return_once(|_path| Ok("http://server/some/test/config/url".to_string()));
+
+ let tmp_dir = tempfile::tempdir()?;
+
+ // Run the plugin's runtime logic in an async task
+ tokio::spawn(async move {
+ let _ = run(
+ plugin_config,
+ broker.port,
+ &mut http_client,
+ tmp_dir.path().to_path_buf(),
+ )
+ .await;
+ });
+
+ // Assert supported config types message(119) on plugin startup
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &[format!("119,{test_config_path}")],
+ )
+ .await;
+
+ // Send a software upload request to the plugin
+ let _ = broker
+ .publish(
+ "c8y/s/ds",
+ format!("526,tedge-device,{test_config_path}").as_str(),
+ )
+ .await?;
+
+ // Assert the c8y_UploadConfigFile operation transitioning from EXECUTING(501) to SUCCESSFUL(503) with the uploaded config URL
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &[
+ "501,c8y_UploadConfigFile",
+ "503,c8y_UploadConfigFile,http://server/some/test/config/url",
+ ],
+ )
+ .await;
+
+ Ok(())
+ }
+}
diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs
index 25a3292c..39302d92 100644
--- a/plugins/c8y_configuration_plugin/src/upload.rs
+++ b/plugins/c8y_configuration_plugin/src/upload.rs
@@ -1,6 +1,6 @@
use crate::smartrest::TryIntoOperationStatusMessage;
use anyhow::Result;
-use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
+use c8y_api::http_proxy::C8YHttpProxy;
use c8y_smartrest::error::SmartRestSerializerError;
use c8y_smartrest::smartrest_serializer::SmartRest;
use c8y_smartrest::{
@@ -11,7 +11,7 @@ use c8y_smartrest::{
},
};
use mqtt_channel::{Connection, SinkExt};
-use std::{fs::read_to_string, path::Path};
+use std::path::Path;
struct UploadConfigFileStatusMessage {}
@@ -45,7 +45,7 @@ impl TryIntoOperationStatusMessage for UploadConfigFileStatusMessage {
pub async fn handle_config_upload_request(
config_upload_request: SmartRestConfigUploadRequest,
mqtt_client: &mut Connection,
- http_client: &mut JwtAuthHttpProxy,
+ http_client: &mut impl C8YHttpProxy,
) -> Result<()> {
// set config upload request to executing
let msg = UploadConfigFileStatusMessage::executing()?;
@@ -73,24 +73,26 @@ pub async fn handle_config_upload_request(
async fn upload_config_file(
config_file_path: &Path,
- http_client: &mut JwtAuthHttpProxy,
+ http_client: &mut impl C8YHttpProxy,
) -> Result<String> {
- // read the config file contents
- let config_content = read_to_string(config_file_path)?;
-
// upload config file
- let upload_event_url = http_client
- .upload_config_file(config_file_path, &config_content)
- .await?;
+ let upload_event_url = http_client.upload_config_file(config_file_path).await?;
Ok(upload_event_url)
}
#[cfg(test)]
mod tests {
+ use std::time::Duration;
+
use super::*;
+ use c8y_api::http_proxy::MockC8YHttpProxy;
+ use c8y_smartrest::topic::C8yTopic;
+ use mockall::predicate;
use mqtt_channel::Topic;
+ const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000);
+
#[test]
fn get_smartrest_executing() {
let message = UploadConfigFileStatusMessage::executing().unwrap();
@@ -119,4 +121,73 @@ mod tests {
"502,c8y_UploadConfigFile,\"failed reason\"\n"
);
}
+
+ #[tokio::test]
+ async fn test_upload_config_file() -> anyhow::Result<()> {
+ let config_path = Path::new("/some/temp/path");
+
+ let mut http_client = MockC8YHttpProxy::new();
+
+ http_client
+ .expect_upload_config_file()
+ .with(predicate::eq(config_path))
+ .return_once(|_path| Ok("http://server/config/file/url".to_string()));
+
+ assert_eq!(
+ upload_config_file(config_path, &mut http_client).await?,
+ "http://server/config/file/url"
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ #[serial_test::serial]
+ async fn test_handle_config_upload_request() -> anyhow::Result<()> {
+ let config_path = Path::new("/some/test/config");
+
+ let broker = mqtt_tests::test_mqtt_broker();
+ let mqtt_config = mqtt_channel::Config::default()
+ .with_port(broker.port)
+ .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked(
+ C8yTopic::SmartRestRequest.as_str(),
+ ));
+ let mut mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?;
+
+ let mut messages = broker.messages_published_on("c8y/s/us").await;
+
+ let mut http_client = MockC8YHttpProxy::new();
+ http_client
+ .expect_upload_config_file()
+ .with(predicate::eq(config_path))
+ .return_once(|_path| Ok("http://server/config/file/url".to_string()));
+
+ let config_upload_request = SmartRestConfigUploadRequest {
+ message_id: "526".to_string(),
+ device: "thin-edge-device".to_string(),
+ config_type: "/some/test/config".to_string(),
+ };
+
+ tokio::spawn(async move {
+ let _ = handle_config_upload_request(
+ config_upload_request,
+ &mut mqtt_client,
+ &mut http_client,
+ )
+ .await;
+ });
+
+ // Assert the c8y_UploadConfigFile operation transitioning from EXECUTING(501) to SUCCESSFUL(503) with the uploaded config URL
+ mqtt_tests::assert_received_all_expected(
+ &mut messages,
+ TEST_TIMEOUT_MS,
+ &[
+ "501,c8y_UploadConfigFile",
+ "503,c8y_UploadConfigFile,http://server/config/file/url",
+ ],
+ )
+ .await;
+
+ Ok(())
+ }
}