summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-02-17 21:55:59 +0530
committerAlbin Suresh <albin.suresh@softwareag.com>2022-02-23 22:47:50 +0530
commit7616196c806ab6fecf63812442a02f6460785e2f (patch)
tree4d12a9e4005998e9c14a9c7e12a6c5af998af7cf /crates/core
parent2280acd7450c2c45434e2770db86c3f28b70debd (diff)
Unit test C8YHTTPProxy with mock httpserver crate
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/Cargo.toml4
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs205
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs2
3 files changed, 159 insertions, 52 deletions
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml
index cf4a45c7..14ec1959 100644
--- a/crates/core/c8y_api/Cargo.toml
+++ b/crates/core/c8y_api/Cargo.toml
@@ -15,7 +15,7 @@ clock = { path = "../../common/clock" }
csv = "1.1"
download = { path = "../../common/download" }
futures = "0.3"
-mockall = "0.10"
+mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
@@ -30,5 +30,7 @@ toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
+anyhow = "1.0"
+mockito = "0.30"
tempfile = "3.3"
test-case = "1.2"
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 00260ca0..4c6dff98 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -5,7 +5,6 @@ use crate::json_c8y::{
use async_trait::async_trait;
use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse};
-use futures::TryFutureExt;
use mockall::automock;
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
@@ -49,6 +48,7 @@ pub trait C8YHttpProxy: Send + Sync {
}
/// Define a C8y endpoint
+#[derive(Debug)]
pub struct C8yEndPoint {
c8y_host: String,
device_id: String,
@@ -65,10 +65,18 @@ impl C8yEndPoint {
}
}
+ fn get_base_url(&self) -> String {
+ let mut url_get_id = String::new();
+ if !self.c8y_host.starts_with("http") {
+ url_get_id.push_str("https://");
+ }
+ url_get_id.push_str(&self.c8y_host);
+
+ url_get_id
+ }
+
fn get_url_for_sw_list(&self) -> String {
- let mut url_update_swlist = String::new();
- url_update_swlist.push_str("https://");
- url_update_swlist.push_str(&self.c8y_host);
+ let mut url_update_swlist = self.get_base_url();
url_update_swlist.push_str("/inventory/managedObjects/");
url_update_swlist.push_str(&self.c8y_internal_id);
@@ -76,9 +84,7 @@ impl C8yEndPoint {
}
fn get_url_for_get_id(&self) -> String {
- let mut url_get_id = String::new();
- url_get_id.push_str("https://");
- url_get_id.push_str(&self.c8y_host);
+ let mut url_get_id = self.get_base_url();
url_get_id.push_str("/identity/externalIds/c8y_Serial/");
url_get_id.push_str(&self.device_id);
@@ -86,9 +92,7 @@ impl C8yEndPoint {
}
fn get_url_for_create_event(&self) -> String {
- let mut url_create_event = String::new();
- url_create_event.push_str("https://");
- url_create_event.push_str(&self.c8y_host);
+ let mut url_create_event = self.get_base_url();
url_create_event.push_str("/event/events/");
url_create_event
@@ -128,25 +132,67 @@ impl C8yEndPoint {
}
}
+#[automock]
+#[async_trait]
+pub trait C8yJwtTokenRetriever: Send + Sync {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+}
+
+pub struct C8yMqttJwtTokenRetriever {
+ mqtt_con: mqtt_channel::Connection,
+}
+
+impl C8yMqttJwtTokenRetriever {
+ pub fn new(mqtt_con: mqtt_channel::Connection) -> Self {
+ C8yMqttJwtTokenRetriever { mqtt_con }
+ }
+}
+
+#[async_trait]
+impl C8yJwtTokenRetriever for C8yMqttJwtTokenRetriever {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ let () = self
+ .mqtt_con
+ .published
+ .publish(mqtt_channel::Message::new(
+ &Topic::new_unchecked("c8y/s/uat"),
+ "".to_string(),
+ ))
+ .await?;
+ let token_smartrest = match tokio::time::timeout(
+ Duration::from_secs(10),
+ self.mqtt_con.received.next(),
+ )
+ .await
+ {
+ Ok(Some(msg)) => msg.payload_str()?.to_string(),
+ Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
+ Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
+ };
+
+ Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ }
+}
+
/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device
///
/// - Keep the connection info to c8y and the internal Id of the device
/// - Handle JWT requests
pub struct JwtAuthHttpProxy {
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
end_point: C8yEndPoint,
}
impl JwtAuthHttpProxy {
pub fn new(
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
c8y_host: &str,
device_id: &str,
) -> JwtAuthHttpProxy {
JwtAuthHttpProxy {
- mqtt_con,
+ jwt_token_retriver,
http_con,
end_point: C8yEndPoint {
c8y_host: c8y_host.into(),
@@ -174,31 +220,29 @@ impl JwtAuthHttpProxy {
// Ignore errors on this connection
let () = mqtt_con.errors.close();
+ let jwt_token_retriver = Box::new(C8yMqttJwtTokenRetriever::new(mqtt_con));
+
Ok(JwtAuthHttpProxy::new(
- mqtt_con, http_con, &c8y_host, &device_id,
+ jwt_token_retriver,
+ http_con,
+ &c8y_host,
+ &device_id,
))
}
async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
- let token = self.get_jwt_token().await?;
- let url_get_id = self.end_point.get_url_for_get_id();
-
- self.end_point.c8y_internal_id = self
- .try_get_internal_id(&url_get_id, &token.token())
- .await?;
-
+ self.end_point.c8y_internal_id = self.try_get_internal_id().await?;
Ok(())
}
- async fn try_get_internal_id(
- &self,
- url_get_id: &str,
- token: &str,
- ) -> Result<String, SMCumulocityMapperError> {
+ async fn try_get_internal_id(&mut self) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+ let url_get_id = self.end_point.get_url_for_get_id();
+
let internal_id = self
.http_con
.get(url_get_id)
- .bearer_auth(token)
+ .bearer_auth(token.token())
.send()
.await?;
let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
@@ -224,7 +268,7 @@ impl JwtAuthHttpProxy {
C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text)
}
- async fn send_event(
+ async fn send_event_internal(
&mut self,
c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
@@ -241,6 +285,7 @@ impl JwtAuthHttpProxy {
.build()?;
let response = self.http_con.execute(request).await?;
+ let _ = response.error_for_status_ref()?;
let event_response_body = response.json::<C8yEventResponse>().await?;
Ok(event_response_body.id)
@@ -272,26 +317,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
}
async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
- let () = self
- .mqtt_con
- .published
- .publish(mqtt_channel::Message::new(
- &Topic::new_unchecked("c8y/s/uat"),
- "".to_string(),
- ))
- .await?;
- let token_smartrest = match tokio::time::timeout(
- Duration::from_secs(10),
- self.mqtt_con.received.next(),
- )
- .await
- {
- Ok(Some(msg)) => msg.payload_str()?.to_string(),
- Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
- Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
- };
-
- Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ self.jwt_token_retriver.get_jwt_token().await
}
async fn send_event(
@@ -301,7 +327,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
time: Option<String>,
) -> Result<String, SMCumulocityMapperError> {
let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time);
- self.send_event(c8y_event).await
+ self.send_event_internal(c8y_event).await
}
async fn send_software_list_http(
@@ -331,7 +357,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
let token = self.get_jwt_token().await?;
let log_event = self.create_log_event();
- let event_response_id = self.send_event(log_event).await?;
+ let event_response_id = self.send_event_internal(log_event).await?;
let binary_upload_event_url = self
.end_point
.get_url_for_event_binary_upload(&event_response_id);
@@ -354,6 +380,9 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
#[cfg(test)]
mod tests {
use super::*;
+ use anyhow::Result;
+ use mockito::{mock, Matcher};
+ use serde_json::json;
use test_case::test_case;
#[test]
@@ -398,4 +427,80 @@ mod tests {
let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id");
assert!(!c8y.url_is_in_my_tenant_domain(url));
}
+
+ #[tokio::test]
+ async fn get_internal_id() -> Result<()> {
+ let device_id = "test-device";
+ let internal_device_id = "1234";
+
+ let _mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": internal_device_id } })
+ .to_string(),
+ )
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ assert_eq!(http_proxy.try_get_internal_id().await?, internal_device_id);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn send_event() -> anyhow::Result<()> {
+ let device_id = "test-device";
+ let event_id = "456";
+
+ // Mock endpoint to return C8Y internal id
+ let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": "123" } }).to_string(),
+ )
+ .create();
+
+ let _create_event_mock = mock("POST", "/event/events/")
+ .match_body(Matcher::PartialJson(
+ json!({ "type": "clock_event", "text": "tick" }),
+ ))
+ .with_status(201)
+ .with_body(json!({ "id": event_id }).to_string())
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ // ... creates the event and assert its id
+ assert_eq!(
+ http_proxy.send_event("clock_event", "tick", None).await?,
+ event_id
+ );
+
+ Ok(())
+ }
}
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index 9477037b..9f989eae 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -1,7 +1,7 @@
use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold};
use anyhow::Result;
use c8y_api::{
- http_proxy::{C8YHttpProxy, JwtAuthHttpProxy, MockC8YHttpProxy},
+ http_proxy::{C8YHttpProxy, C8yMqttJwtTokenRetriever, JwtAuthHttpProxy, MockC8YHttpProxy},
json_c8y::C8yUpdateSoftwareListResponse,
};
use c8y_smartrest::{