diff options
author | Albin Suresh <albin.suresh@softwareag.com> | 2022-02-17 21:55:59 +0530 |
---|---|---|
committer | Albin Suresh <albin.suresh@softwareag.com> | 2022-02-23 22:47:50 +0530 |
commit | 7616196c806ab6fecf63812442a02f6460785e2f (patch) | |
tree | 4d12a9e4005998e9c14a9c7e12a6c5af998af7cf /crates/core | |
parent | 2280acd7450c2c45434e2770db86c3f28b70debd (diff) |
Unit test C8YHTTPProxy with mock httpserver crate
Diffstat (limited to 'crates/core')
-rw-r--r-- | crates/core/c8y_api/Cargo.toml | 4 | ||||
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs | 205 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs | 2 |
3 files changed, 159 insertions, 52 deletions
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml index cf4a45c7..14ec1959 100644 --- a/crates/core/c8y_api/Cargo.toml +++ b/crates/core/c8y_api/Cargo.toml @@ -15,7 +15,7 @@ clock = { path = "../../common/clock" } csv = "1.1" download = { path = "../../common/download" } futures = "0.3" -mockall = "0.10" +mockall = "0.11" mqtt_channel = { path = "../../common/mqtt_channel" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } @@ -30,5 +30,7 @@ toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] +anyhow = "1.0" +mockito = "0.30" tempfile = "3.3" test-case = "1.2" diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 00260ca0..4c6dff98 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -5,7 +5,6 @@ use crate::json_c8y::{ use async_trait::async_trait; use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse}; -use futures::TryFutureExt; use mockall::automock; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; @@ -49,6 +48,7 @@ pub trait C8YHttpProxy: Send + Sync { } /// Define a C8y endpoint +#[derive(Debug)] pub struct C8yEndPoint { c8y_host: String, device_id: String, @@ -65,10 +65,18 @@ impl C8yEndPoint { } } + fn get_base_url(&self) -> String { + let mut url_get_id = String::new(); + if !self.c8y_host.starts_with("http") { + url_get_id.push_str("https://"); + } + url_get_id.push_str(&self.c8y_host); + + url_get_id + } + fn get_url_for_sw_list(&self) -> String { - let mut url_update_swlist = String::new(); - url_update_swlist.push_str("https://"); - url_update_swlist.push_str(&self.c8y_host); + let mut url_update_swlist = self.get_base_url(); url_update_swlist.push_str("/inventory/managedObjects/"); url_update_swlist.push_str(&self.c8y_internal_id); @@ -76,9 +84,7 @@ impl C8yEndPoint { } fn get_url_for_get_id(&self) -> String { - let mut url_get_id = String::new(); - url_get_id.push_str("https://"); - url_get_id.push_str(&self.c8y_host); + let mut url_get_id = self.get_base_url(); url_get_id.push_str("/identity/externalIds/c8y_Serial/"); url_get_id.push_str(&self.device_id); @@ -86,9 +92,7 @@ impl C8yEndPoint { } fn get_url_for_create_event(&self) -> String { - let mut url_create_event = String::new(); - url_create_event.push_str("https://"); - url_create_event.push_str(&self.c8y_host); + let mut url_create_event = self.get_base_url(); url_create_event.push_str("/event/events/"); url_create_event @@ -128,25 +132,67 @@ impl C8yEndPoint { } } +#[automock] +#[async_trait] +pub trait C8yJwtTokenRetriever: Send + Sync { + async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>; +} + +pub struct C8yMqttJwtTokenRetriever { + mqtt_con: mqtt_channel::Connection, +} + +impl C8yMqttJwtTokenRetriever { + pub fn new(mqtt_con: mqtt_channel::Connection) -> Self { + C8yMqttJwtTokenRetriever { mqtt_con } + } +} + +#[async_trait] +impl C8yJwtTokenRetriever for C8yMqttJwtTokenRetriever { + async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> { + let () = self + .mqtt_con + .published + .publish(mqtt_channel::Message::new( + &Topic::new_unchecked("c8y/s/uat"), + "".to_string(), + )) + .await?; + let token_smartrest = match tokio::time::timeout( + Duration::from_secs(10), + self.mqtt_con.received.next(), + ) + .await + { + Ok(Some(msg)) => msg.payload_str()?.to_string(), + Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), + Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), + }; + + Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) + } +} + /// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device /// /// - Keep the connection info to c8y and the internal Id of the device /// - Handle JWT requests pub struct JwtAuthHttpProxy { - mqtt_con: mqtt_channel::Connection, + jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>, http_con: reqwest::Client, end_point: C8yEndPoint, } impl JwtAuthHttpProxy { pub fn new( - mqtt_con: mqtt_channel::Connection, + jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>, http_con: reqwest::Client, c8y_host: &str, device_id: &str, ) -> JwtAuthHttpProxy { JwtAuthHttpProxy { - mqtt_con, + jwt_token_retriver, http_con, end_point: C8yEndPoint { c8y_host: c8y_host.into(), @@ -174,31 +220,29 @@ impl JwtAuthHttpProxy { // Ignore errors on this connection let () = mqtt_con.errors.close(); + let jwt_token_retriver = Box::new(C8yMqttJwtTokenRetriever::new(mqtt_con)); + Ok(JwtAuthHttpProxy::new( - mqtt_con, http_con, &c8y_host, &device_id, + jwt_token_retriver, + http_con, + &c8y_host, + &device_id, )) } async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { - let token = self.get_jwt_token().await?; - let url_get_id = self.end_point.get_url_for_get_id(); - - self.end_point.c8y_internal_id = self - .try_get_internal_id(&url_get_id, &token.token()) - .await?; - + self.end_point.c8y_internal_id = self.try_get_internal_id().await?; Ok(()) } - async fn try_get_internal_id( - &self, - url_get_id: &str, - token: &str, - ) -> Result<String, SMCumulocityMapperError> { + async fn try_get_internal_id(&mut self) -> Result<String, SMCumulocityMapperError> { + let token = self.get_jwt_token().await?; + let url_get_id = self.end_point.get_url_for_get_id(); + let internal_id = self .http_con .get(url_get_id) - .bearer_auth(token) + .bearer_auth(token.token()) .send() .await?; let internal_id_response = internal_id.json::<InternalIdResponse>().await?; @@ -224,7 +268,7 @@ impl JwtAuthHttpProxy { C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text) } - async fn send_event( + async fn send_event_internal( &mut self, c8y_event: C8yCreateEvent, ) -> Result<String, SMCumulocityMapperError> { @@ -241,6 +285,7 @@ impl JwtAuthHttpProxy { .build()?; let response = self.http_con.execute(request).await?; + let _ = response.error_for_status_ref()?; let event_response_body = response.json::<C8yEventResponse>().await?; Ok(event_response_body.id) @@ -272,26 +317,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { } async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> { - let () = self - .mqtt_con - .published - .publish(mqtt_channel::Message::new( - &Topic::new_unchecked("c8y/s/uat"), - "".to_string(), - )) - .await?; - let token_smartrest = match tokio::time::timeout( - Duration::from_secs(10), - self.mqtt_con.received.next(), - ) - .await - { - Ok(Some(msg)) => msg.payload_str()?.to_string(), - Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), - Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), - }; - - Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) + self.jwt_token_retriver.get_jwt_token().await } async fn send_event( @@ -301,7 +327,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { time: Option<String>, ) -> Result<String, SMCumulocityMapperError> { let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time); - self.send_event(c8y_event).await + self.send_event_internal(c8y_event).await } async fn send_software_list_http( @@ -331,7 +357,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { let token = self.get_jwt_token().await?; let log_event = self.create_log_event(); - let event_response_id = self.send_event(log_event).await?; + let event_response_id = self.send_event_internal(log_event).await?; let binary_upload_event_url = self .end_point .get_url_for_event_binary_upload(&event_response_id); @@ -354,6 +380,9 @@ impl C8YHttpProxy for JwtAuthHttpProxy { #[cfg(test)] mod tests { use super::*; + use anyhow::Result; + use mockito::{mock, Matcher}; + use serde_json::json; use test_case::test_case; #[test] @@ -398,4 +427,80 @@ mod tests { let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); assert!(!c8y.url_is_in_my_tenant_domain(url)); } + + #[tokio::test] + async fn get_internal_id() -> Result<()> { + let device_id = "test-device"; + let internal_device_id = "1234"; + + let _mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": device_id, "managedObject": { "id": internal_device_id } }) + .to_string(), + ) + .create(); + + // An JwtAuthHttpProxy ... + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let mut http_proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + device_id, + ); + + assert_eq!(http_proxy.try_get_internal_id().await?, internal_device_id); + + Ok(()) + } + + #[tokio::test] + async fn send_event() -> anyhow::Result<()> { + let device_id = "test-device"; + let event_id = "456"; + + // Mock endpoint to return C8Y internal id + let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": device_id, "managedObject": { "id": "123" } }).to_string(), + ) + .create(); + + let _create_event_mock = mock("POST", "/event/events/") + .match_body(Matcher::PartialJson( + json!({ "type": "clock_event", "text": "tick" }), + )) + .with_status(201) + .with_body(json!({ "id": event_id }).to_string()) + .create(); + + // An JwtAuthHttpProxy ... + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let mut http_proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + device_id, + ); + + // ... creates the event and assert its id + assert_eq!( + http_proxy.send_event("clock_event", "tick", None).await?, + event_id + ); + + Ok(()) + } } diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 9477037b..9f989eae 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -1,7 +1,7 @@ use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold}; use anyhow::Result; use c8y_api::{ - http_proxy::{C8YHttpProxy, JwtAuthHttpProxy, MockC8YHttpProxy}, + http_proxy::{C8YHttpProxy, C8yMqttJwtTokenRetriever, JwtAuthHttpProxy, MockC8YHttpProxy}, json_c8y::C8yUpdateSoftwareListResponse, }; use c8y_smartrest::{ |