From 2280acd7450c2c45434e2770db86c3f28b70debd Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Fri, 11 Feb 2022 15:28:14 +0530 Subject: [#809] Cumulocity mapper to send large events > 16K size with HTTP --- crates/core/c8y_api/src/http_proxy.rs | 55 +++++++++++------- crates/core/c8y_api/src/json_c8y.rs | 7 +++ crates/core/tedge_mapper/Cargo.toml | 2 +- crates/core/tedge_mapper/src/c8y/converter.rs | 38 +++++++++--- crates/core/tedge_mapper/src/c8y/mapper.rs | 5 +- crates/core/tedge_mapper/src/c8y/tests.rs | 84 ++++++++++++++++++++++++++- crates/core/tedge_mapper/src/core/error.rs | 5 +- 7 files changed, 160 insertions(+), 36 deletions(-) (limited to 'crates/core') diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 9f853043..00260ca0 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -1,9 +1,12 @@ use crate::json_c8y::{ - C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, + C8yCreateEvent, C8yEventResponse, C8yManagedObject, C8yUpdateSoftwareListResponse, + InternalIdResponse, }; use async_trait::async_trait; use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse}; +use futures::TryFutureExt; +use mockall::automock; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; use std::time::Duration; @@ -13,12 +16,12 @@ use tedge_config::{ }; use time::{format_description, OffsetDateTime}; -use serde::{Deserialize, Serialize}; use tracing::{error, info, instrument}; const RETRY_TIMEOUT_SECS: u64 = 60; /// An HttpProxy handles http requests to C8y on behalf of the device. +#[automock] #[async_trait] pub trait C8YHttpProxy: Send + Sync { async fn init(&mut self) -> Result<(), SMCumulocityMapperError>; @@ -27,6 +30,13 @@ pub trait C8YHttpProxy: Send + Sync { async fn get_jwt_token(&mut self) -> Result; + async fn send_event( + &mut self, + event_type: &str, + text: &str, + time: Option, + ) -> Result; + async fn send_software_list_http( &mut self, c8y_software_list: &C8yUpdateSoftwareListResponse, @@ -118,13 +128,6 @@ impl C8yEndPoint { } } -#[derive(Debug, Deserialize, Serialize, PartialEq)] -#[serde(rename_all = "camelCase")] -/// used to retrieve the id of a log event -pub struct SmartRestLogEvent { - pub id: String, -} - /// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device /// /// - Keep the connection info to c8y and the internal Id of the device @@ -204,26 +207,24 @@ impl JwtAuthHttpProxy { Ok(internal_id) } - /// Make a POST request to /event/events and return the event id from response body. - /// The event id is used to upload the binary. fn create_log_event(&self) -> C8yCreateEvent { - let local = OffsetDateTime::now_utc(); + self.create_event("c8y_Logfile", "software-management", None) + } + fn create_event(&self, event_type: &str, text: &str, time: Option) -> C8yCreateEvent { let c8y_managed_object = C8yManagedObject { id: self.end_point.c8y_internal_id.clone(), }; - C8yCreateEvent::new( - c8y_managed_object, - "c8y_Logfile", - &local + let time = time.unwrap_or_else(|| { + OffsetDateTime::now_utc() .format(&format_description::well_known::Rfc3339) - .unwrap(), - "software-management", - ) + .unwrap() + }); + C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text) } - async fn get_event_id( + async fn send_event( &mut self, c8y_event: C8yCreateEvent, ) -> Result { @@ -240,7 +241,7 @@ impl JwtAuthHttpProxy { .build()?; let response = self.http_con.execute(request).await?; - let event_response_body = response.json::().await?; + let event_response_body = response.json::().await?; Ok(event_response_body.id) } @@ -293,6 +294,16 @@ impl C8YHttpProxy for JwtAuthHttpProxy { Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) } + async fn send_event( + &mut self, + event_type: &str, + text: &str, + time: Option, + ) -> Result { + let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time); + self.send_event(c8y_event).await + } + async fn send_software_list_http( &mut self, c8y_software_list: &C8yUpdateSoftwareListResponse, @@ -320,7 +331,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { let token = self.get_jwt_token().await?; let log_event = self.create_log_event(); - let event_response_id = self.get_event_id(log_event).await?; + let event_response_id = self.send_event(log_event).await?; let binary_upload_event_url = self .end_point .get_url_for_event_binary_upload(&event_response_id); diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs index 8566a84c..f1f0d2b7 100644 --- a/crates/core/c8y_api/src/json_c8y.rs +++ b/crates/core/c8y_api/src/json_c8y.rs @@ -17,6 +17,13 @@ pub struct C8yCreateEvent { text: String, } +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(rename_all = "camelCase")] +/// used to retrieve the id of a log event +pub struct C8yEventResponse { + pub id: String, +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct C8yManagedObject { diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml index ffc8d6d3..ba44908d 100644 --- a/crates/core/tedge_mapper/Cargo.toml +++ b/crates/core/tedge_mapper/Cargo.toml @@ -37,7 +37,7 @@ csv = "1.1" download = { path = "../../common/download" } flockfile = { path = "../../common/flockfile" } futures = "0.3" -mockall = "0.10" +mockall = "0.11" mqtt_channel = { path = "../../common/mqtt_channel" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 899a763d..15b250a1 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -9,7 +9,7 @@ use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse} use c8y_smartrest::{ alarm, error::SmartRestDeserializerError, - event::serialize_event, + event::{self}, operations::Operations, smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, smartrest_serializer::{ @@ -144,12 +144,32 @@ where Ok(vec) } - fn try_convert_event(&mut self, input: &Message) -> Result, ConversionError> { + async fn try_convert_event( + &mut self, + input: &Message, + ) -> Result, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; - let smartrest_alarm = serialize_event(tedge_event)?; - let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); + match self.size_threshold.validate(input.payload_str()?) { + // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well + Ok(()) => { + let smartrest_alarm = event::serialize_event(tedge_event)?; + let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + } + // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event + Err(_) => { + let event_text = tedge_event + .data + .and_then(|data| data.message) + .unwrap_or_else(|| "generic event".into()); + let _ = self + .http_proxy + .send_event(tedge_event.name.as_str(), event_text.as_str(), None) + .await?; + Ok(vec![]) + } + } } } @@ -164,20 +184,22 @@ where &self.mapper_config } async fn try_convert(&mut self, message: &Message) -> Result, ConversionError> { - let () = self.size_threshold.validate(message.payload_str()?)?; - match &message.topic { topic if topic.name.starts_with("tedge/measurements") => { + let () = self.size_threshold.validate(message.payload_str()?)?; self.try_convert_measurement(message) } topic if topic.name.starts_with("tedge/alarms") => { + let () = self.size_threshold.validate(message.payload_str()?)?; self.alarm_converter.try_convert_alarm(message) } topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { self.alarm_converter.process_internal_alarm(message); Ok(vec![]) } - topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message), + topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => { + self.try_convert_event(message).await + } topic => match topic.clone().try_into() { Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => { debug!("Software list"); diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 315449fe..d5ab3092 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -5,7 +5,7 @@ use crate::{ use agent_interface::topic::ResponseTopic; use async_trait::async_trait; -use c8y_api::http_proxy::JwtAuthHttpProxy; +use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; use mqtt_channel::{Config, TopicFilter}; use tedge_config::{ @@ -68,7 +68,8 @@ impl TEdgeComponent for CumulocityMapper { let size_threshold = SizeThreshold(16 * 1024); let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; + let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; + http_proxy.init().await?; let device_name = tedge_config.query(DeviceIdSetting)?; let device_type = tedge_config.query(DeviceTypeSetting)?; let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index aaca2009..9477037b 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -1,11 +1,17 @@ use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold}; -use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}; +use anyhow::Result; +use c8y_api::{ + http_proxy::{C8YHttpProxy, JwtAuthHttpProxy, MockC8YHttpProxy}, + json_c8y::C8yUpdateSoftwareListResponse, +}; use c8y_smartrest::{ error::SMCumulocityMapperError, operations::Operations, smartrest_deserializer::SmartRestJwtResponse, }; -use mqtt_channel::{Message, Topic}; +use mockall::predicate; +use mqtt_channel::{Connection, Message, Topic, TopicFilter}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; +use serde_json::json; use serial_test::serial; use std::time::Duration; use test_case::test_case; @@ -706,6 +712,71 @@ fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_convert_event() -> Result<()> { + let size_threshold = SizeThreshold(32 * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + let operations = Operations::new(); + let http_proxy = MockC8YHttpProxy::new(); + + let mut converter = CumulocityConverter::new( + size_threshold, + device_name, + device_type, + operations, + http_proxy, + ); + + let event_topic = "tedge/events/click_event"; + let event_payload = r#"{ "message": "Someone clicked" }"#; + let event_message = Message::new(&Topic::new_unchecked(event_topic), event_payload); + + let converted_events = converter.convert(&event_message).await; + assert_eq!(converted_events.len(), 1); + let converted_event = converted_events.get(0).unwrap(); + assert_eq!(converted_event.topic.name, "c8y/s/us"); + assert!(converted_event + .payload_str()? + .starts_with("400,click_event")); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_convert_big_event() { + let mqtt_packet_limit = 16; + let size_threshold = SizeThreshold(mqtt_packet_limit * 1024); + let device_name = String::from("test"); + let device_type = String::from("test_type"); + let operations = Operations::new(); + + let mut http_proxy = MockC8YHttpProxy::new(); + http_proxy + .expect_send_event() + .with( + predicate::eq("click_event"), + predicate::always(), + predicate::always(), + ) + .returning(|_, _, _| Ok("123".into())); + + let mut converter = CumulocityConverter::new( + size_threshold, + device_name, + device_type, + operations, + http_proxy, + ); + + let event_topic = "tedge/events/click_event"; + let big_event_message = create_packet((mqtt_packet_limit + 1) * 1024); // Event payload > size_threshold + let big_event_payload = json!({ "message": big_event_message }).to_string(); + let big_event_message = Message::new(&Topic::new_unchecked(event_topic), big_event_payload); + + assert!(converter.convert(&big_event_message).await.is_empty()); +} + fn create_packet(size: usize) -> String { let data: String = "Some data!".into(); let loops = size / data.len(); @@ -745,6 +816,15 @@ impl C8YHttpProxy for FakeC8YHttpProxy { ) -> Result { Ok("fake/upload/url".into()) } + + async fn send_event( + &mut self, + _event_type: &str, + _text: &str, + _time: Option, + ) -> Result { + Ok("123".into()) + } } async fn start_c8y_mapper(mqtt_port: u16) -> Result, anyhow::Error> { diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index 9b13d134..c21792b4 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -33,7 +33,10 @@ pub enum ConversionError { FromCumulocityJsonError(#[from] c8y_translator::json::CumulocityJsonError), #[error(transparent)] - FromCumulocityCumulocityMapperError(#[from] CumulocityMapperError), + FromCumulocityMapperError(#[from] CumulocityMapperError), + + #[error(transparent)] + FromCumulocitySmartRestMapperError(#[from] c8y_smartrest::error::SMCumulocityMapperError), #[error(transparent)] FromThinEdgeJsonSerialization(#[from] ThinEdgeJsonSerializationError), -- cgit v1.2.3 From 7616196c806ab6fecf63812442a02f6460785e2f Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Thu, 17 Feb 2022 21:55:59 +0530 Subject: Unit test C8YHTTPProxy with mock httpserver crate --- crates/core/c8y_api/Cargo.toml | 4 +- crates/core/c8y_api/src/http_proxy.rs | 205 ++++++++++++++++++++++-------- crates/core/tedge_mapper/src/c8y/tests.rs | 2 +- 3 files changed, 159 insertions(+), 52 deletions(-) (limited to 'crates/core') diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml index cf4a45c7..14ec1959 100644 --- a/crates/core/c8y_api/Cargo.toml +++ b/crates/core/c8y_api/Cargo.toml @@ -15,7 +15,7 @@ clock = { path = "../../common/clock" } csv = "1.1" download = { path = "../../common/download" } futures = "0.3" -mockall = "0.10" +mockall = "0.11" mqtt_channel = { path = "../../common/mqtt_channel" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } @@ -30,5 +30,7 @@ toml = "0.5" tracing = { version = "0.1", features = ["attributes", "log"] } [dev-dependencies] +anyhow = "1.0" +mockito = "0.30" tempfile = "3.3" test-case = "1.2" diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index 00260ca0..4c6dff98 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -5,7 +5,6 @@ use crate::json_c8y::{ use async_trait::async_trait; use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse}; -use futures::TryFutureExt; use mockall::automock; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; use reqwest::Url; @@ -49,6 +48,7 @@ pub trait C8YHttpProxy: Send + Sync { } /// Define a C8y endpoint +#[derive(Debug)] pub struct C8yEndPoint { c8y_host: String, device_id: String, @@ -65,10 +65,18 @@ impl C8yEndPoint { } } + fn get_base_url(&self) -> String { + let mut url_get_id = String::new(); + if !self.c8y_host.starts_with("http") { + url_get_id.push_str("https://"); + } + url_get_id.push_str(&self.c8y_host); + + url_get_id + } + fn get_url_for_sw_list(&self) -> String { - let mut url_update_swlist = String::new(); - url_update_swlist.push_str("https://"); - url_update_swlist.push_str(&self.c8y_host); + let mut url_update_swlist = self.get_base_url(); url_update_swlist.push_str("/inventory/managedObjects/"); url_update_swlist.push_str(&self.c8y_internal_id); @@ -76,9 +84,7 @@ impl C8yEndPoint { } fn get_url_for_get_id(&self) -> String { - let mut url_get_id = String::new(); - url_get_id.push_str("https://"); - url_get_id.push_str(&self.c8y_host); + let mut url_get_id = self.get_base_url(); url_get_id.push_str("/identity/externalIds/c8y_Serial/"); url_get_id.push_str(&self.device_id); @@ -86,9 +92,7 @@ impl C8yEndPoint { } fn get_url_for_create_event(&self) -> String { - let mut url_create_event = String::new(); - url_create_event.push_str("https://"); - url_create_event.push_str(&self.c8y_host); + let mut url_create_event = self.get_base_url(); url_create_event.push_str("/event/events/"); url_create_event @@ -128,25 +132,67 @@ impl C8yEndPoint { } } +#[automock] +#[async_trait] +pub trait C8yJwtTokenRetriever: Send + Sync { + async fn get_jwt_token(&mut self) -> Result; +} + +pub struct C8yMqttJwtTokenRetriever { + mqtt_con: mqtt_channel::Connection, +} + +impl C8yMqttJwtTokenRetriever { + pub fn new(mqtt_con: mqtt_channel::Connection) -> Self { + C8yMqttJwtTokenRetriever { mqtt_con } + } +} + +#[async_trait] +impl C8yJwtTokenRetriever for C8yMqttJwtTokenRetriever { + async fn get_jwt_token(&mut self) -> Result { + let () = self + .mqtt_con + .published + .publish(mqtt_channel::Message::new( + &Topic::new_unchecked("c8y/s/uat"), + "".to_string(), + )) + .await?; + let token_smartrest = match tokio::time::timeout( + Duration::from_secs(10), + self.mqtt_con.received.next(), + ) + .await + { + Ok(Some(msg)) => msg.payload_str()?.to_string(), + Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), + Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), + }; + + Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) + } +} + /// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device /// /// - Keep the connection info to c8y and the internal Id of the device /// - Handle JWT requests pub struct JwtAuthHttpProxy { - mqtt_con: mqtt_channel::Connection, + jwt_token_retriver: Box, http_con: reqwest::Client, end_point: C8yEndPoint, } impl JwtAuthHttpProxy { pub fn new( - mqtt_con: mqtt_channel::Connection, + jwt_token_retriver: Box, http_con: reqwest::Client, c8y_host: &str, device_id: &str, ) -> JwtAuthHttpProxy { JwtAuthHttpProxy { - mqtt_con, + jwt_token_retriver, http_con, end_point: C8yEndPoint { c8y_host: c8y_host.into(), @@ -174,31 +220,29 @@ impl JwtAuthHttpProxy { // Ignore errors on this connection let () = mqtt_con.errors.close(); + let jwt_token_retriver = Box::new(C8yMqttJwtTokenRetriever::new(mqtt_con)); + Ok(JwtAuthHttpProxy::new( - mqtt_con, http_con, &c8y_host, &device_id, + jwt_token_retriver, + http_con, + &c8y_host, + &device_id, )) } async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> { - let token = self.get_jwt_token().await?; - let url_get_id = self.end_point.get_url_for_get_id(); - - self.end_point.c8y_internal_id = self - .try_get_internal_id(&url_get_id, &token.token()) - .await?; - + self.end_point.c8y_internal_id = self.try_get_internal_id().await?; Ok(()) } - async fn try_get_internal_id( - &self, - url_get_id: &str, - token: &str, - ) -> Result { + async fn try_get_internal_id(&mut self) -> Result { + let token = self.get_jwt_token().await?; + let url_get_id = self.end_point.get_url_for_get_id(); + let internal_id = self .http_con .get(url_get_id) - .bearer_auth(token) + .bearer_auth(token.token()) .send() .await?; let internal_id_response = internal_id.json::().await?; @@ -224,7 +268,7 @@ impl JwtAuthHttpProxy { C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text) } - async fn send_event( + async fn send_event_internal( &mut self, c8y_event: C8yCreateEvent, ) -> Result { @@ -241,6 +285,7 @@ impl JwtAuthHttpProxy { .build()?; let response = self.http_con.execute(request).await?; + let _ = response.error_for_status_ref()?; let event_response_body = response.json::().await?; Ok(event_response_body.id) @@ -272,26 +317,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { } async fn get_jwt_token(&mut self) -> Result { - let () = self - .mqtt_con - .published - .publish(mqtt_channel::Message::new( - &Topic::new_unchecked("c8y/s/uat"), - "".to_string(), - )) - .await?; - let token_smartrest = match tokio::time::timeout( - Duration::from_secs(10), - self.mqtt_con.received.next(), - ) - .await - { - Ok(Some(msg)) => msg.payload_str()?.to_string(), - Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage), - Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout), - }; - - Ok(SmartRestJwtResponse::try_new(&token_smartrest)?) + self.jwt_token_retriver.get_jwt_token().await } async fn send_event( @@ -301,7 +327,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { time: Option, ) -> Result { let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time); - self.send_event(c8y_event).await + self.send_event_internal(c8y_event).await } async fn send_software_list_http( @@ -331,7 +357,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy { let token = self.get_jwt_token().await?; let log_event = self.create_log_event(); - let event_response_id = self.send_event(log_event).await?; + let event_response_id = self.send_event_internal(log_event).await?; let binary_upload_event_url = self .end_point .get_url_for_event_binary_upload(&event_response_id); @@ -354,6 +380,9 @@ impl C8YHttpProxy for JwtAuthHttpProxy { #[cfg(test)] mod tests { use super::*; + use anyhow::Result; + use mockito::{mock, Matcher}; + use serde_json::json; use test_case::test_case; #[test] @@ -398,4 +427,80 @@ mod tests { let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id"); assert!(!c8y.url_is_in_my_tenant_domain(url)); } + + #[tokio::test] + async fn get_internal_id() -> Result<()> { + let device_id = "test-device"; + let internal_device_id = "1234"; + + let _mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": device_id, "managedObject": { "id": internal_device_id } }) + .to_string(), + ) + .create(); + + // An JwtAuthHttpProxy ... + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let mut http_proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + device_id, + ); + + assert_eq!(http_proxy.try_get_internal_id().await?, internal_device_id); + + Ok(()) + } + + #[tokio::test] + async fn send_event() -> anyhow::Result<()> { + let device_id = "test-device"; + let event_id = "456"; + + // Mock endpoint to return C8Y internal id + let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": device_id, "managedObject": { "id": "123" } }).to_string(), + ) + .create(); + + let _create_event_mock = mock("POST", "/event/events/") + .match_body(Matcher::PartialJson( + json!({ "type": "clock_event", "text": "tick" }), + )) + .with_status(201) + .with_body(json!({ "id": event_id }).to_string()) + .create(); + + // An JwtAuthHttpProxy ... + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let mut http_proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + device_id, + ); + + // ... creates the event and assert its id + assert_eq!( + http_proxy.send_event("clock_event", "tick", None).await?, + event_id + ); + + Ok(()) + } } diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 9477037b..9f989eae 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -1,7 +1,7 @@ use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold}; use anyhow::Result; use c8y_api::{ - http_proxy::{C8YHttpProxy, JwtAuthHttpProxy, MockC8YHttpProxy}, + http_proxy::{C8YHttpProxy, C8yMqttJwtTokenRetriever, JwtAuthHttpProxy, MockC8YHttpProxy}, json_c8y::C8yUpdateSoftwareListResponse, }; use c8y_smartrest::{ -- cgit v1.2.3 From 5c5e38639769def866981f4257190ff76fae481e Mon Sep 17 00:00:00 2001 From: Albin Suresh Date: Wed, 23 Feb 2022 20:19:49 +0530 Subject: Improve error message of mqtt message size validation --- crates/core/tedge_mapper/src/az/converter.rs | 21 ++-- crates/core/tedge_mapper/src/c8y/converter.rs | 58 ++++++---- crates/core/tedge_mapper/src/c8y/mapper.rs | 3 +- crates/core/tedge_mapper/src/c8y/tests.rs | 128 +++++++-------------- crates/core/tedge_mapper/src/core/error.rs | 13 ++- .../core/tedge_mapper/src/core/size_threshold.rs | 18 ++- 6 files changed, 109 insertions(+), 132 deletions(-) (limited to 'crates/core') diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs index b6c3fdd4..7989cfbd 100644 --- a/crates/core/tedge_mapper/src/az/converter.rs +++ b/crates/core/tedge_mapper/src/az/converter.rs @@ -37,11 +37,10 @@ impl Converter for AzureConverter { } async fn try_convert(&mut self, input: &Message) -> Result, Self::Error> { - let input = input.payload_str()?; let () = self.size_threshold.validate(input)?; let default_timestamp = self.add_timestamp.then(|| self.clock.now()); let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp); - let () = thin_edge_json::parser::parse_str(input, &mut serializer)?; + let () = thin_edge_json::parser::parse_str(input.payload_str()?, &mut serializer)?; let payload = serializer.into_string()?; Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))]) @@ -52,11 +51,7 @@ impl Converter for AzureConverter { mod tests { use crate::{ az::converter::AzureConverter, - core::{ - converter::*, - error::ConversionError, - size_threshold::{SizeThreshold, SizeThresholdExceeded}, - }, + core::{converter::*, error::ConversionError, size_threshold::SizeThreshold}, }; use assert_json_diff::*; @@ -194,17 +189,17 @@ mod tests { async fn exceeding_threshold_returns_error() { let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1)); + let _topic = "tedge/measurements".to_string(); let input = "ABC"; let result = converter.try_convert(&new_tedge_message(input)).await; assert_matches!( result, - Err(ConversionError::FromSizeThresholdExceeded( - SizeThresholdExceeded { - actual_size: 3, - threshold: 1 - } - )) + Err(ConversionError::SizeThresholdExceeded { + topic: _topic, + actual_size: 3, + threshold: 1 + }) ); } } diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 15b250a1..1616acc8 100644 --- a/crates/core/tedge_mapper/src/c8y/converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -29,6 +29,7 @@ use std::{ process::Stdio, }; use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tracing::{debug, info, log::error}; use super::{ @@ -149,26 +150,43 @@ where input: &Message, ) -> Result, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; - match self.size_threshold.validate(input.payload_str()?) { - // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well - Ok(()) => { - let smartrest_alarm = event::serialize_event(tedge_event)?; - let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); - Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) - } + // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well + if input.payload_bytes().len() < self.size_threshold.0 { + let smartrest_alarm = event::serialize_event(tedge_event)?; + let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); + + Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) + } else { // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event - Err(_) => { - let event_text = tedge_event - .data - .and_then(|data| data.message) - .unwrap_or_else(|| "generic event".into()); - let _ = self - .http_proxy - .send_event(tedge_event.name.as_str(), event_text.as_str(), None) - .await?; - Ok(vec![]) - } + let (event_text, event_time) = match tedge_event.data { + None => { + let message = tedge_event.name.clone(); + let time = OffsetDateTime::now_utc().format(&Rfc3339)?; + + (message, time) + } + Some(event_data) => { + let message = event_data + .message + .unwrap_or_else(|| tedge_event.name.clone()); + let time = event_data.time.map_or_else( + || OffsetDateTime::now_utc().format(&Rfc3339), + |timestamp| timestamp.format(&Rfc3339), + )?; + (message, time) + } + }; + + let _ = self + .http_proxy + .send_event( + tedge_event.name.as_str(), + event_text.as_str(), + Some(event_time), + ) + .await?; + Ok(vec![]) } } } @@ -186,11 +204,11 @@ where async fn try_convert(&mut self, message: &Message) -> Result, ConversionError> { match &message.topic { topic if topic.name.starts_with("tedge/measurements") => { - let () = self.size_threshold.validate(message.payload_str()?)?; + let () = self.size_threshold.validate(message)?; self.try_convert_measurement(message) } topic if topic.name.starts_with("tedge/alarms") => { - let () = self.size_threshold.validate(message.payload_str()?)?; + let () = self.size_threshold.validate(message)?; self.alarm_converter.try_convert_alarm(message) } topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index d5ab3092..a5df4d0b 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -16,6 +16,7 @@ use tracing::{info, info_span, Instrument}; use super::topic::C8yTopic; const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y"; +const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024; pub struct CumulocityMapper {} @@ -65,7 +66,7 @@ impl CumulocityMapper { #[async_trait] impl TEdgeComponent for CumulocityMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { - let size_threshold = SizeThreshold(16 * 1024); + let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs index 9f989eae..e4a68e29 100644 --- a/crates/core/tedge_mapper/src/c8y/tests.rs +++ b/crates/core/tedge_mapper/src/c8y/tests.rs @@ -1,7 +1,11 @@ -use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold}; +use crate::core::{ + converter::Converter, error::ConversionError, mapper::create_mapper, + size_threshold::SizeThreshold, +}; use anyhow::Result; +use assert_matches::assert_matches; use c8y_api::{ - http_proxy::{C8YHttpProxy, C8yMqttJwtTokenRetriever, JwtAuthHttpProxy, MockC8YHttpProxy}, + http_proxy::{C8YHttpProxy, MockC8YHttpProxy}, json_c8y::C8yUpdateSoftwareListResponse, }; use c8y_smartrest::{ @@ -9,7 +13,7 @@ use c8y_smartrest::{ smartrest_deserializer::SmartRestJwtResponse, }; use mockall::predicate; -use mqtt_channel::{Connection, Message, Topic, TopicFilter}; +use mqtt_channel::{Message, Topic}; use mqtt_tests::test_mqtt_server::MqttProcessHandler; use serde_json::json; use serial_test::serial; @@ -457,19 +461,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn test_sync_alarms() { - let size_threshold = SizeThreshold(16 * 1024); - let device_name = String::from("test"); - let device_type = String::from("test_type"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = CumulocityConverter::new( - size_threshold, - device_name, - device_type, - operations, - http_proxy, - ); + let mut converter = create_c8y_converter(); let alarm_topic = "tedge/alarms/critical/temperature_alarm"; let alarm_payload = r#"{ "message": "Temperature very high" }"#; @@ -523,18 +515,7 @@ async fn test_sync_alarms() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_thin_edge_json_with_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -567,18 +548,7 @@ async fn convert_thin_edge_json_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_topic = "tedge/measurements/child1"; let in_invalid_payload = r#"{"temp": invalid}"#; @@ -613,18 +583,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[serial] async fn convert_two_thin_edge_json_messages_given_different_child_id() { - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; - - let mut converter = Box::new(CumulocityConverter::new( - SizeThreshold(16 * 1024), - device_name, - device_type, - operations, - http_proxy, - )); + let mut converter = create_c8y_converter(); let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; // First message from "child1" @@ -688,36 +647,32 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option) { } } -#[test] -fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { - let size_threshold = SizeThreshold(16 * 1024); - let device_name = String::from("test"); - let device_type = String::from("test"); - let operations = Operations::default(); - let http_proxy = FakeC8YHttpProxy {}; +#[tokio::test] +async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { + let mut converter = create_c8y_converter(); - let converter = CumulocityConverter::new( - size_threshold, - device_name, - device_type, - operations, - http_proxy, - ); - let buffer = create_packet(1024 * 20); - let err = converter.size_threshold.validate(&buffer).unwrap_err(); - assert_eq!( - err.to_string(), - "The input size 20480 is too big. The threshold is 16384." + let alarm_topic = "tedge/alarms/critical/temperature_alarm"; + let big_message = create_packet(1024 * 20); + let alarm_payload = json!({ "message": big_message }).to_string(); + let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload); + + assert_matches!( + converter.try_convert(&alarm_message).await, + Err(ConversionError::SizeThresholdExceeded { + topic: _, + actual_size: 20494, + threshold: 16384 + }) ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_convert_event() -> Result<()> { - let size_threshold = SizeThreshold(32 * 1024); +async fn convert_event() -> Result<()> { + let size_threshold = SizeThreshold(16 * 1024); let device_name = String::from("test"); let device_type = String::from("test_type"); - let operations = Operations::new(); + let operations = Operations::default(); let http_proxy = MockC8YHttpProxy::new(); let mut converter = CumulocityConverter::new( @@ -749,7 +704,7 @@ async fn test_convert_big_event() { let size_threshold = SizeThreshold(mqtt_packet_limit * 1024); let device_name = String::from("test"); let device_type = String::from("test_type"); - let operations = Operations::new(); + let operations = Operations::default(); let mut http_proxy = MockC8YHttpProxy::new(); http_proxy @@ -828,26 +783,29 @@ impl C8YHttpProxy for FakeC8YHttpProxy { } async fn start_c8y_mapper(mqtt_port: u16) -> Result, anyhow::Error> { + let converter = create_c8y_converter(); + let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, Box::new(converter)).await?; + + let mapper_task = tokio::spawn(async move { + let _ = mapper.run().await; + }); + Ok(mapper_task) +} + +fn create_c8y_converter() -> CumulocityConverter { + let size_threshold = SizeThreshold(16 * 1024); let device_name = "test-device".into(); let device_type = "test-device-type".into(); - let size_threshold = SizeThreshold(16 * 1024); let operations = Operations::default(); let http_proxy = FakeC8YHttpProxy {}; - let converter = Box::new(CumulocityConverter::new( + CumulocityConverter::new( size_threshold, device_name, device_type, operations, http_proxy, - )); - - let mut mapper = create_mapper("c8y-mapper-test", mqtt_port, converter).await?; - - let mapper_task = tokio::spawn(async move { - let _ = mapper.run().await; - }); - Ok(mapper_task) + ) } fn remove_whitespace(s: &str) -> String { diff --git a/crates/core/tedge_mapper/src/core/error.rs b/crates/core/tedge_mapper/src/core/error.rs index c21792b4..e8fac94b 100644 --- a/crates/core/tedge_mapper/src/core/error.rs +++ b/crates/core/tedge_mapper/src/core/error.rs @@ -1,4 +1,4 @@ -use crate::{c8y::error::CumulocityMapperError, core::size_threshold::SizeThresholdExceeded}; +use crate::c8y::error::CumulocityMapperError; use c8y_smartrest::error::OperationsError; use mqtt_channel::MqttError; @@ -54,8 +54,12 @@ pub enum ConversionError { #[error(transparent)] FromThinEdgeJsonParser(#[from] thin_edge_json::parser::ThinEdgeJsonParserError), - #[error(transparent)] - FromSizeThresholdExceeded(#[from] SizeThresholdExceeded), + #[error("The size of the message received on {topic} is {actual_size} which is greater than the threshold size of {threshold}.")] + SizeThresholdExceeded { + topic: String, + actual_size: usize, + threshold: usize, + }, #[error("The given Child ID '{id}' is invalid.")] InvalidChildId { id: String }, @@ -83,4 +87,7 @@ pub enum ConversionError { #[error(transparent)] FromUtf8Error(#[from] std::string::FromUtf8Error), + + #[error(transparent)] + FromTimeFormatError(#[from] time::error::Format), } diff --git a/crates/core/tedge_mapper/src/core/size_threshold.rs b/crates/core/tedge_mapper/src/core/size_threshold.rs index fb6e3d48..da1ee108 100644 --- a/crates/core/tedge_mapper/src/core/size_threshold.rs +++ b/crates/core/tedge_mapper/src/core/size_threshold.rs @@ -1,12 +1,17 @@ +use mqtt_channel::Message; + +use super::error::ConversionError; + #[derive(Debug)] pub struct SizeThreshold(pub usize); impl SizeThreshold { - pub fn validate(&self, input: &str) -> Result<(), SizeThresholdExceeded> { - let actual_size = input.len(); + pub fn validate(&self, input: &Message) -> Result<(), ConversionError> { + let actual_size = input.payload_bytes().len(); let threshold = self.0; if actual_size > threshold { - Err(SizeThresholdExceeded { + Err(ConversionError::SizeThresholdExceeded { + topic: input.topic.name.clone(), actual_size, threshold, }) @@ -15,10 +20,3 @@ impl SizeThreshold { } } } - -#[derive(thiserror::Error, Debug)] -#[error("The input size {actual_size} is too big. The threshold is {threshold}.")] -pub struct SizeThresholdExceeded { - pub actual_size: usize, - pub threshold: usize, -} -- cgit v1.2.3