summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorinitard <solo@softwareag.com>2022-04-08 08:46:49 +0100
committerinitard <solo@softwareag.com>2022-04-13 09:48:40 +0100
commit6fa381f99a8f5b16bcda963afb4034a31e086db6 (patch)
treea77841a36e53ea0f3da3f67d0f9ca72dc5abed19 /crates/core
parent924980f9f850ee6c08f207d71de983cf54a46f32 (diff)
moving python tedge mapper tests to rust #958
- removed tedge_mapper_c8y_negative - removed tedge_mapper_c8y_positive - added the same tests in rust using mqtt_tests Signed-off-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/tedge_mapper/Cargo.toml1
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs106
2 files changed, 107 insertions, 0 deletions
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index 64bdbd86..7fe78242 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -57,6 +57,7 @@ tracing = { version = "0.1", features = ["attributes", "log"] }
assert_matches = "1.5"
assert-json-diff = "2.0"
serde = "1.0"
+mockito = "0.31"
mqtt_tests = { path = "../../tests/mqtt_tests" }
serde_json = "1.0"
serial_test = "0.5"
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index cb9a4f93..98301dda 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -108,3 +108,109 @@ fn create_directories() -> Result<(), anyhow::Error> {
)?;
Ok(())
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use c8y_api::http_proxy::MockC8yJwtTokenRetriever;
+ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
+ use mockito::mock;
+ use mqtt_tests::{assert_received_all_expected, test_mqtt_broker};
+ use serde_json::json;
+ use std::time::Duration;
+ use test_case::test_case;
+
+ const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5);
+ const CUMULOCITY_MAPPER_NAME_TEST: &str = "tedge-mapper-c8y-test";
+ const DEVICE_ID: &str = "test-device";
+ const DEVICE_NAME: &str = "test-user";
+ const DEVICE_TYPE: &str = "test-thin-edge.io";
+ const MQTT_HOST: &str = "127.0.0.1";
+
+ /// `test_tedge_mapper_with_mqtt_pub` will start tedge mapper and run the following tests:
+ ///
+ /// - receive pub message with wrong payload and expect an error message in tedge/errors topic
+ /// - receive pub message with a correct payload and expect to observe this on the right topic
+ #[test_case(
+ "tedge/measurements",
+ "tedge/errors",
+ "{",
+ "Invalid JSON: EOF while parsing an object at line 1 column 1: `{`"
+ )] // fail case
+ #[test_case(
+ "tedge/measurements",
+ "tedge/measurements",
+ r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#,
+ r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#
+ )] // successs case
+ #[tokio::test]
+ async fn test_tedge_mapper_with_mqtt_pub(
+ pub_topic: &str,
+ sub_topic: &str,
+ payload: &str,
+ expected_msg: &str,
+ ) -> Result<(), anyhow::Error> {
+ // Mock endpoint to return C8Y internal id
+ let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": DEVICE_ID, "managedObject": { "id": "123" } }).to_string(),
+ )
+ .create();
+
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ DEVICE_ID,
+ );
+
+ // mapper config
+ let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
+ let operations = Operations::default();
+
+ let converter = Box::new(CumulocityConverter::new(
+ size_threshold,
+ DEVICE_NAME.into(),
+ DEVICE_TYPE.into(),
+ operations,
+ proxy,
+ ));
+
+ let broker = test_mqtt_broker();
+
+ let mut mapper = create_mapper(
+ CUMULOCITY_MAPPER_NAME_TEST,
+ MQTT_HOST.into(),
+ broker.port,
+ converter,
+ )
+ .await?;
+
+ // subscribe to `sub_topic`
+ let mut messages = broker.messages_published_on(sub_topic).await;
+
+ // run tedge_mapper in background
+ tokio::spawn(async move {
+ mapper
+ .run()
+ .instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST))
+ .await
+ .unwrap();
+ });
+
+ // publish `payload` to `pub_topic`
+ let () = broker.publish(pub_topic, payload).await?;
+
+ // check the `messages` returned contain `expected_msg`
+ assert_received_all_expected(&mut messages, TEST_TIMEOUT_SECS, &[expected_msg]).await;
+ Ok(())
+ }
+}