diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-05-11 20:38:58 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-05-13 19:10:18 +0530 |
commit | bd98cc741db568b27d46dae4e4be3ca35d47b02f (patch) | |
tree | 45f2bb3f8e93476cec1b12f542fab209fa2cc95e /plugins/c8y_configuration_plugin | |
parent | 42859fa5a847179867f3dce2143cc533655bc2de (diff) |
Issue #1030 c8y_configuration_plugin integration tests
Diffstat (limited to 'plugins/c8y_configuration_plugin')
-rw-r--r-- | plugins/c8y_configuration_plugin/Cargo.toml | 5 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/download.rs | 6 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/main.rs | 106 | ||||
-rw-r--r-- | plugins/c8y_configuration_plugin/src/upload.rs | 91 |
4 files changed, 184 insertions, 24 deletions
diff --git a/plugins/c8y_configuration_plugin/Cargo.toml b/plugins/c8y_configuration_plugin/Cargo.toml index 75f6d895..d7fa4d91 100644 --- a/plugins/c8y_configuration_plugin/Cargo.toml +++ b/plugins/c8y_configuration_plugin/Cargo.toml @@ -33,5 +33,10 @@ tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] assert_matches = "1.5" +mockall = "0.11" +mockito = "0.31" +mqtt_tests = { path = "../../crates/tests/mqtt_tests" } +serial_test = "0.5" tempfile = "3.3" test-case = "2.0" +toml = "0.5" diff --git a/plugins/c8y_configuration_plugin/src/download.rs b/plugins/c8y_configuration_plugin/src/download.rs index 766a9238..b293926c 100644 --- a/plugins/c8y_configuration_plugin/src/download.rs +++ b/plugins/c8y_configuration_plugin/src/download.rs @@ -1,7 +1,7 @@ use crate::error::ConfigManagementError; use crate::smartrest::TryIntoOperationStatusMessage; use crate::{error, PluginConfig}; -use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_api::http_proxy::C8YHttpProxy; use c8y_smartrest::error::SmartRestSerializerError; use c8y_smartrest::smartrest_deserializer::SmartRestConfigDownloadRequest; use c8y_smartrest::smartrest_serializer::{ @@ -24,7 +24,7 @@ pub async fn handle_config_download_request( smartrest_request: SmartRestConfigDownloadRequest, tmp_dir: PathBuf, mqtt_client: &mut Connection, - http_client: &mut JwtAuthHttpProxy, + http_client: &mut impl C8YHttpProxy, ) -> Result<(), anyhow::Error> { let executing_message = DownloadConfigFileStatusMessage::executing()?; let () = mqtt_client.published.send(executing_message).await?; @@ -53,7 +53,7 @@ async fn download_config_file( plugin_config: &PluginConfig, smartrest_request: SmartRestConfigDownloadRequest, tmp_dir: PathBuf, - http_client: &mut JwtAuthHttpProxy, + http_client: &mut impl C8YHttpProxy, ) -> Result<(), anyhow::Error> { // Convert smartrest request to config download request struct let mut config_download_request = diff --git a/plugins/c8y_configuration_plugin/src/main.rs b/plugins/c8y_configuration_plugin/src/main.rs index 884c84c6..44307255 100644 --- a/plugins/c8y_configuration_plugin/src/main.rs +++ b/plugins/c8y_configuration_plugin/src/main.rs @@ -59,10 +59,7 @@ pub struct ConfigPluginOpt { pub config_file: PathBuf, } -async fn create_mqtt_client( - tedge_config: &TEdgeConfig, -) -> Result<mqtt_channel::Connection, anyhow::Error> { - let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); +async fn create_mqtt_client(mqtt_port: u16) -> Result<mqtt_channel::Connection, anyhow::Error> { let mqtt_config = mqtt_channel::Config::default() .with_port(mqtt_port) .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( @@ -97,14 +94,28 @@ async fn main() -> Result<(), anyhow::Error> { let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); let tedge_config = config_repository.load()?; + let plugin_config = PluginConfig::new(config_plugin_opt.config_file); + // Create required clients - let mut mqtt_client = create_mqtt_client(&tedge_config).await?; + let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let mut http_client = create_http_client(&tedge_config).await?; - let plugin_config = PluginConfig::new(config_plugin_opt.config_file); + let tmp_dir = tedge_config.query(TmpPathSetting)?.into(); + + run(plugin_config, mqtt_port, &mut http_client, tmp_dir).await +} + +async fn run( + plugin_config: PluginConfig, + mqtt_port: u16, + http_client: &mut impl C8YHttpProxy, + tmp_dir: PathBuf, +) -> Result<(), anyhow::Error> { + let mut mqtt_client = create_mqtt_client(mqtt_port).await?; // Publish supported configuration types let msg = plugin_config.to_supported_config_types_message()?; + debug!("Plugin init message: {:?}", msg); let () = mqtt_client.published.send(msg).await?; // Mqtt message loop @@ -116,14 +127,12 @@ async fn main() -> Result<(), anyhow::Error> { let config_download_request = SmartRestConfigDownloadRequest::from_smartrest(payload)?; - let tmp_dir = tedge_config.query(TmpPathSetting)?.into(); - handle_config_download_request( &plugin_config, config_download_request, - tmp_dir, + tmp_dir.clone(), &mut mqtt_client, - &mut http_client, + http_client, ) .await } @@ -136,7 +145,7 @@ async fn main() -> Result<(), anyhow::Error> { handle_config_upload_request( config_upload_request, &mut mqtt_client, - &mut http_client, + http_client, ) .await } @@ -185,3 +194,78 @@ fn create_operation_files(config_dir: &str) -> Result<(), anyhow::Error> { )?; Ok(()) } + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, path::Path, time::Duration}; + + use crate::config::FileEntry; + + use super::*; + use c8y_api::http_proxy::MockC8YHttpProxy; + use mockall::predicate; + + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial_test::serial] + async fn test_message_dispatch() -> anyhow::Result<()> { + let test_config_path = "/some/test/config"; + + let plugin_config = PluginConfig { + files: HashSet::from([FileEntry::new(test_config_path.to_string())]), + }; + + let broker = mqtt_tests::test_mqtt_broker(); + + let mut messages = broker.messages_published_on("c8y/s/us").await; + + let mut http_client = MockC8YHttpProxy::new(); + http_client + .expect_upload_config_file() + .with(predicate::eq(Path::new(test_config_path))) + .return_once(|_path| Ok("http://server/some/test/config/url".to_string())); + + let tmp_dir = tempfile::tempdir()?; + + // Run the plugin's runtime logic in an async task + tokio::spawn(async move { + let _ = run( + plugin_config, + broker.port, + &mut http_client, + tmp_dir.path().to_path_buf(), + ) + .await; + }); + + // Assert supported config types message(119) on plugin startup + mqtt_tests::assert_received_all_expected( + &mut messages, + TEST_TIMEOUT_MS, + &[format!("119,{test_config_path}")], + ) + .await; + + // Send a software upload request to the plugin + let _ = broker + .publish( + "c8y/s/ds", + format!("526,tedge-device,{test_config_path}").as_str(), + ) + .await?; + + // Assert the c8y_UploadConfigFile operation transitioning from EXECUTING(501) to SUCCESSFUL(503) with the uploaded config URL + mqtt_tests::assert_received_all_expected( + &mut messages, + TEST_TIMEOUT_MS, + &[ + "501,c8y_UploadConfigFile", + "503,c8y_UploadConfigFile,http://server/some/test/config/url", + ], + ) + .await; + + Ok(()) + } +} diff --git a/plugins/c8y_configuration_plugin/src/upload.rs b/plugins/c8y_configuration_plugin/src/upload.rs index 25a3292c..39302d92 100644 --- a/plugins/c8y_configuration_plugin/src/upload.rs +++ b/plugins/c8y_configuration_plugin/src/upload.rs @@ -1,6 +1,6 @@ use crate::smartrest::TryIntoOperationStatusMessage; use anyhow::Result; -use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; +use c8y_api::http_proxy::C8YHttpProxy; use c8y_smartrest::error::SmartRestSerializerError; use c8y_smartrest::smartrest_serializer::SmartRest; use c8y_smartrest::{ @@ -11,7 +11,7 @@ use c8y_smartrest::{ }, }; use mqtt_channel::{Connection, SinkExt}; -use std::{fs::read_to_string, path::Path}; +use std::path::Path; struct UploadConfigFileStatusMessage {} @@ -45,7 +45,7 @@ impl TryIntoOperationStatusMessage for UploadConfigFileStatusMessage { pub async fn handle_config_upload_request( config_upload_request: SmartRestConfigUploadRequest, mqtt_client: &mut Connection, - http_client: &mut JwtAuthHttpProxy, + http_client: &mut impl C8YHttpProxy, ) -> Result<()> { // set config upload request to executing let msg = UploadConfigFileStatusMessage::executing()?; @@ -73,24 +73,26 @@ pub async fn handle_config_upload_request( async fn upload_config_file( config_file_path: &Path, - http_client: &mut JwtAuthHttpProxy, + http_client: &mut impl C8YHttpProxy, ) -> Result<String> { - // read the config file contents - let config_content = read_to_string(config_file_path)?; - // upload config file - let upload_event_url = http_client - .upload_config_file(config_file_path, &config_content) - .await?; + let upload_event_url = http_client.upload_config_file(config_file_path).await?; Ok(upload_event_url) } #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; + use c8y_api::http_proxy::MockC8YHttpProxy; + use c8y_smartrest::topic::C8yTopic; + use mockall::predicate; use mqtt_channel::Topic; + const TEST_TIMEOUT_MS: Duration = Duration::from_millis(5000); + #[test] fn get_smartrest_executing() { let message = UploadConfigFileStatusMessage::executing().unwrap(); @@ -119,4 +121,73 @@ mod tests { "502,c8y_UploadConfigFile,\"failed reason\"\n" ); } + + #[tokio::test] + async fn test_upload_config_file() -> anyhow::Result<()> { + let config_path = Path::new("/some/temp/path"); + + let mut http_client = MockC8YHttpProxy::new(); + + http_client + .expect_upload_config_file() + .with(predicate::eq(config_path)) + .return_once(|_path| Ok("http://server/config/file/url".to_string())); + + assert_eq!( + upload_config_file(config_path, &mut http_client).await?, + "http://server/config/file/url" + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[serial_test::serial] + async fn test_handle_config_upload_request() -> anyhow::Result<()> { + let config_path = Path::new("/some/test/config"); + + let broker = mqtt_tests::test_mqtt_broker(); + let mqtt_config = mqtt_channel::Config::default() + .with_port(broker.port) + .with_subscriptions(mqtt_channel::TopicFilter::new_unchecked( + C8yTopic::SmartRestRequest.as_str(), + )); + let mut mqtt_client = mqtt_channel::Connection::new(&mqtt_config).await?; + + let mut messages = broker.messages_published_on("c8y/s/us").await; + + let mut http_client = MockC8YHttpProxy::new(); + http_client + .expect_upload_config_file() + .with(predicate::eq(config_path)) + .return_once(|_path| Ok("http://server/config/file/url".to_string())); + + let config_upload_request = SmartRestConfigUploadRequest { + message_id: "526".to_string(), + device: "thin-edge-device".to_string(), + config_type: "/some/test/config".to_string(), + }; + + tokio::spawn(async move { + let _ = handle_config_upload_request( + config_upload_request, + &mut mqtt_client, + &mut http_client, + ) + .await; + }); + + // Assert the c8y_UploadConfigFile operation transitioning from EXECUTING(501) to SUCCESSFUL(503) with the uploaded config URL + mqtt_tests::assert_received_all_expected( + &mut messages, + TEST_TIMEOUT_MS, + &[ + "501,c8y_UploadConfigFile", + "503,c8y_UploadConfigFile,http://server/config/file/url", + ], + ) + .await; + + Ok(()) + } } |