diff options
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y/mapper.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 140 |
1 files changed, 125 insertions, 15 deletions
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index cb9a4f93..3842b3a5 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use crate::{ c8y::converter::CumulocityConverter, core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold}, @@ -47,19 +49,21 @@ impl TEdgeComponent for CumulocityMapper { CUMULOCITY_MAPPER_NAME } - async fn init(&self) -> Result<(), anyhow::Error> { + async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error> { info!("Initialize tedge mapper c8y"); - create_directories()?; - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let config_dir = cfg_dir.display().to_string(); + create_directories(&config_dir)?; + let operations = Operations::try_new(&format!("{config_dir}/operations"), "c8y")?; self.init_session(CumulocityMapper::subscriptions(&operations)?) .await?; Ok(()) } - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error> { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); + let config_dir = cfg_dir.display().to_string(); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let operations = Operations::try_new(format!("{config_dir}/operations"), "c8y")?; let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; http_proxy.init().await?; let device_name = tedge_config.query(DeviceIdSetting)?; @@ -87,24 +91,130 @@ impl TEdgeComponent for CumulocityMapper { } } -fn create_directories() -> Result<(), anyhow::Error> { +fn create_directories(config_dir: &str) -> Result<(), anyhow::Error> { create_directory_with_user_group( - "/etc/tedge/operations/c8y", - "tedge-mapper", - "tedge-mapper", + &format!("{config_dir}/operations/c8y"), + "tedge", + "tedge", 0o775, )?; create_file_with_user_group( - "/etc/tedge/operations/c8y/c8y_SoftwareUpdate", - "tedge-mapper", - "tedge-mapper", + &format!("{config_dir}/operations/c8y/c8y_SoftwareUpdate"), + "tedge", + "tedge", 0o644, )?; create_file_with_user_group( - "/etc/tedge/operations/c8y/c8y_Restart", - "tedge-mapper", - "tedge-mapper", + &format!("{config_dir}/operations/c8y/c8y_Restart"), + "tedge", + "tedge", 0o644, )?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + use c8y_api::http_proxy::MockC8yJwtTokenRetriever; + use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; + use mockito::mock; + use mqtt_tests::{assert_received_all_expected, test_mqtt_broker}; + use serde_json::json; + use std::time::Duration; + use test_case::test_case; + + const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5); + const CUMULOCITY_MAPPER_NAME_TEST: &str = "tedge-mapper-c8y-test"; + const DEVICE_ID: &str = "test-device"; + const DEVICE_NAME: &str = "test-user"; + const DEVICE_TYPE: &str = "test-thin-edge.io"; + const MQTT_HOST: &str = "127.0.0.1"; + + /// `test_tedge_mapper_with_mqtt_pub` will start tedge mapper and run the following tests: + /// + /// - receive pub message with wrong payload and expect an error message in tedge/errors topic + /// - receive pub message with a correct payload and expect to observe this on the right topic + #[test_case( + "tedge/measurements", + "tedge/errors", + "{", + "Invalid JSON: EOF while parsing an object at line 1 column 1: `{`" + )] // fail case + #[test_case( + "tedge/measurements", + "tedge/measurements", + r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#, + r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"# + )] // successs case + #[tokio::test] + async fn test_tedge_mapper_with_mqtt_pub( + pub_topic: &str, + sub_topic: &str, + payload: &str, + expected_msg: &str, + ) -> Result<(), anyhow::Error> { + // Mock endpoint to return C8Y internal id + let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": DEVICE_ID, "managedObject": { "id": "123" } }).to_string(), + ) + .create(); + + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + DEVICE_ID, + ); + + // mapper config + let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); + let operations = Operations::default(); + + let converter = Box::new(CumulocityConverter::new( + size_threshold, + DEVICE_NAME.into(), + DEVICE_TYPE.into(), + operations, + proxy, + )); + + let broker = test_mqtt_broker(); + + let mut mapper = create_mapper( + CUMULOCITY_MAPPER_NAME_TEST, + MQTT_HOST.into(), + broker.port, + converter, + ) + .await?; + + // subscribe to `sub_topic` + let mut messages = broker.messages_published_on(sub_topic).await; + + // run tedge_mapper in background + tokio::spawn(async move { + mapper + .run() + .instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST)) + .await + .unwrap(); + }); + + // publish `payload` to `pub_topic` + let () = broker.publish(pub_topic, payload).await?; + + // check the `messages` returned contain `expected_msg` + assert_received_all_expected(&mut messages, TEST_TIMEOUT_SECS, &[expected_msg]).await; + Ok(()) + } +} |