summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorAlbin Suresh <albin.suresh@softwareag.com>2022-02-24 11:28:28 +0530
committerGitHub <noreply@github.com>2022-02-24 11:28:28 +0530
commit8701cf6c29a9b3295587a2cf9aad19eb66aa690b (patch)
treee49ce2e3f9de0c12a420e11c02aac11e02f75e5a /crates/core
parent4c4419400fdec8ed05a311ddc5790831f926ce07 (diff)
parent5c5e38639769def866981f4257190ff76fae481e (diff)
Merge pull request #859 from albinsuresh/feature/809/use-http-for-large-events
[#809] Cumulocity mapper to send large events > 16K size with HTTP
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/Cargo.toml4
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs246
-rw-r--r--crates/core/c8y_api/src/json_c8y.rs7
-rw-r--r--crates/core/tedge_mapper/Cargo.toml2
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs21
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs56
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs8
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs182
-rw-r--r--crates/core/tedge_mapper/src/core/error.rs18
-rw-r--r--crates/core/tedge_mapper/src/core/size_threshold.rs18
10 files changed, 385 insertions, 177 deletions
diff --git a/crates/core/c8y_api/Cargo.toml b/crates/core/c8y_api/Cargo.toml
index cf4a45c7..14ec1959 100644
--- a/crates/core/c8y_api/Cargo.toml
+++ b/crates/core/c8y_api/Cargo.toml
@@ -15,7 +15,7 @@ clock = { path = "../../common/clock" }
csv = "1.1"
download = { path = "../../common/download" }
futures = "0.3"
-mockall = "0.10"
+mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
@@ -30,5 +30,7 @@ toml = "0.5"
tracing = { version = "0.1", features = ["attributes", "log"] }
[dev-dependencies]
+anyhow = "1.0"
+mockito = "0.30"
tempfile = "3.3"
test-case = "1.2"
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 9f853043..4c6dff98 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -1,9 +1,11 @@
use crate::json_c8y::{
- C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
+ C8yCreateEvent, C8yEventResponse, C8yManagedObject, C8yUpdateSoftwareListResponse,
+ InternalIdResponse,
};
use async_trait::async_trait;
use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse};
+use mockall::automock;
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
use std::time::Duration;
@@ -13,12 +15,12 @@ use tedge_config::{
};
use time::{format_description, OffsetDateTime};
-use serde::{Deserialize, Serialize};
use tracing::{error, info, instrument};
const RETRY_TIMEOUT_SECS: u64 = 60;
/// An HttpProxy handles http requests to C8y on behalf of the device.
+#[automock]
#[async_trait]
pub trait C8YHttpProxy: Send + Sync {
async fn init(&mut self) -> Result<(), SMCumulocityMapperError>;
@@ -27,6 +29,13 @@ pub trait C8YHttpProxy: Send + Sync {
async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+ async fn send_event(
+ &mut self,
+ event_type: &str,
+ text: &str,
+ time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError>;
+
async fn send_software_list_http(
&mut self,
c8y_software_list: &C8yUpdateSoftwareListResponse,
@@ -39,6 +48,7 @@ pub trait C8YHttpProxy: Send + Sync {
}
/// Define a C8y endpoint
+#[derive(Debug)]
pub struct C8yEndPoint {
c8y_host: String,
device_id: String,
@@ -55,10 +65,18 @@ impl C8yEndPoint {
}
}
+ fn get_base_url(&self) -> String {
+ let mut url_get_id = String::new();
+ if !self.c8y_host.starts_with("http") {
+ url_get_id.push_str("https://");
+ }
+ url_get_id.push_str(&self.c8y_host);
+
+ url_get_id
+ }
+
fn get_url_for_sw_list(&self) -> String {
- let mut url_update_swlist = String::new();
- url_update_swlist.push_str("https://");
- url_update_swlist.push_str(&self.c8y_host);
+ let mut url_update_swlist = self.get_base_url();
url_update_swlist.push_str("/inventory/managedObjects/");
url_update_swlist.push_str(&self.c8y_internal_id);
@@ -66,9 +84,7 @@ impl C8yEndPoint {
}
fn get_url_for_get_id(&self) -> String {
- let mut url_get_id = String::new();
- url_get_id.push_str("https://");
- url_get_id.push_str(&self.c8y_host);
+ let mut url_get_id = self.get_base_url();
url_get_id.push_str("/identity/externalIds/c8y_Serial/");
url_get_id.push_str(&self.device_id);
@@ -76,9 +92,7 @@ impl C8yEndPoint {
}
fn get_url_for_create_event(&self) -> String {
- let mut url_create_event = String::new();
- url_create_event.push_str("https://");
- url_create_event.push_str(&self.c8y_host);
+ let mut url_create_event = self.get_base_url();
url_create_event.push_str("/event/events/");
url_create_event
@@ -118,11 +132,46 @@ impl C8yEndPoint {
}
}
-#[derive(Debug, Deserialize, Serialize, PartialEq)]
-#[serde(rename_all = "camelCase")]
-/// used to retrieve the id of a log event
-pub struct SmartRestLogEvent {
- pub id: String,
+#[automock]
+#[async_trait]
+pub trait C8yJwtTokenRetriever: Send + Sync {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+}
+
+pub struct C8yMqttJwtTokenRetriever {
+ mqtt_con: mqtt_channel::Connection,
+}
+
+impl C8yMqttJwtTokenRetriever {
+ pub fn new(mqtt_con: mqtt_channel::Connection) -> Self {
+ C8yMqttJwtTokenRetriever { mqtt_con }
+ }
+}
+
+#[async_trait]
+impl C8yJwtTokenRetriever for C8yMqttJwtTokenRetriever {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ let () = self
+ .mqtt_con
+ .published
+ .publish(mqtt_channel::Message::new(
+ &Topic::new_unchecked("c8y/s/uat"),
+ "".to_string(),
+ ))
+ .await?;
+ let token_smartrest = match tokio::time::timeout(
+ Duration::from_secs(10),
+ self.mqtt_con.received.next(),
+ )
+ .await
+ {
+ Ok(Some(msg)) => msg.payload_str()?.to_string(),
+ Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
+ Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
+ };
+
+ Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ }
}
/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device
@@ -130,20 +179,20 @@ pub struct SmartRestLogEvent {
/// - Keep the connection info to c8y and the internal Id of the device
/// - Handle JWT requests
pub struct JwtAuthHttpProxy {
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
end_point: C8yEndPoint,
}
impl JwtAuthHttpProxy {
pub fn new(
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
c8y_host: &str,
device_id: &str,
) -> JwtAuthHttpProxy {
JwtAuthHttpProxy {
- mqtt_con,
+ jwt_token_retriver,
http_con,
end_point: C8yEndPoint {
c8y_host: c8y_host.into(),
@@ -171,31 +220,29 @@ impl JwtAuthHttpProxy {
// Ignore errors on this connection
let () = mqtt_con.errors.close();
+ let jwt_token_retriver = Box::new(C8yMqttJwtTokenRetriever::new(mqtt_con));
+
Ok(JwtAuthHttpProxy::new(
- mqtt_con, http_con, &c8y_host, &device_id,
+ jwt_token_retriver,
+ http_con,
+ &c8y_host,
+ &device_id,
))
}
async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
- let token = self.get_jwt_token().await?;
- let url_get_id = self.end_point.get_url_for_get_id();
-
- self.end_point.c8y_internal_id = self
- .try_get_internal_id(&url_get_id, &token.token())
- .await?;
-
+ self.end_point.c8y_internal_id = self.try_get_internal_id().await?;
Ok(())
}
- async fn try_get_internal_id(
- &self,
- url_get_id: &str,
- token: &str,
- ) -> Result<String, SMCumulocityMapperError> {
+ async fn try_get_internal_id(&mut self) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+ let url_get_id = self.end_point.get_url_for_get_id();
+
let internal_id = self
.http_con
.get(url_get_id)
- .bearer_auth(token)
+ .bearer_auth(token.token())
.send()
.await?;
let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
@@ -204,26 +251,24 @@ impl JwtAuthHttpProxy {
Ok(internal_id)
}
- /// Make a POST request to /event/events and return the event id from response body.
- /// The event id is used to upload the binary.
fn create_log_event(&self) -> C8yCreateEvent {
- let local = OffsetDateTime::now_utc();
+ self.create_event("c8y_Logfile", "software-management", None)
+ }
+ fn create_event(&self, event_type: &str, text: &str, time: Option<String>) -> C8yCreateEvent {
let c8y_managed_object = C8yManagedObject {
id: self.end_point.c8y_internal_id.clone(),
};
- C8yCreateEvent::new(
- c8y_managed_object,
- "c8y_Logfile",
- &local
+ let time = time.unwrap_or_else(|| {
+ OffsetDateTime::now_utc()
.format(&format_description::well_known::Rfc3339)
- .unwrap(),
- "software-management",
- )
+ .unwrap()
+ });
+ C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text)
}
- async fn get_event_id(
+ async fn send_event_internal(
&mut self,
c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
@@ -240,7 +285,8 @@ impl JwtAuthHttpProxy {
.build()?;
let response = self.http_con.execute(request).await?;
- let event_response_body = response.json::<SmartRestLogEvent>().await?;
+ let _ = response.error_for_status_ref()?;
+ let event_response_body = response.json::<C8yEventResponse>().await?;
Ok(event_response_body.id)
}
@@ -271,26 +317,17 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
}
async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
- let () = self
- .mqtt_con
- .published
- .publish(mqtt_channel::Message::new(
- &Topic::new_unchecked("c8y/s/uat"),
- "".to_string(),
- ))
- .await?;
- let token_smartrest = match tokio::time::timeout(
- Duration::from_secs(10),
- self.mqtt_con.received.next(),
- )
- .await
- {
- Ok(Some(msg)) => msg.payload_str()?.to_string(),
- Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
- Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
- };
+ self.jwt_token_retriver.get_jwt_token().await
+ }
- Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ async fn send_event(
+ &mut self,
+ event_type: &str,
+ text: &str,
+ time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time);
+ self.send_event_internal(c8y_event).await
}
async fn send_software_list_http(
@@ -320,7 +357,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
let token = self.get_jwt_token().await?;
let log_event = self.create_log_event();
- let event_response_id = self.get_event_id(log_event).await?;
+ let event_response_id = self.send_event_internal(log_event).await?;
let binary_upload_event_url = self
.end_point
.get_url_for_event_binary_upload(&event_response_id);
@@ -343,6 +380,9 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
#[cfg(test)]
mod tests {
use super::*;
+ use anyhow::Result;
+ use mockito::{mock, Matcher};
+ use serde_json::json;
use test_case::test_case;
#[test]
@@ -387,4 +427,80 @@ mod tests {
let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id");
assert!(!c8y.url_is_in_my_tenant_domain(url));
}
+
+ #[tokio::test]
+ async fn get_internal_id() -> Result<()> {
+ let device_id = "test-device";
+ let internal_device_id = "1234";
+
+ let _mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": internal_device_id } })
+ .to_string(),
+ )
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ assert_eq!(http_proxy.try_get_internal_id().await?, internal_device_id);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn send_event() -> anyhow::Result<()> {
+ let device_id = "test-device";
+ let event_id = "456";
+
+ // Mock endpoint to return C8Y internal id
+ let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": "123" } }).to_string(),
+ )
+ .create();
+
+ let _create_event_mock = mock("POST", "/event/events/")
+ .match_body(Matcher::PartialJson(
+ json!({ "type": "clock_event", "text": "tick" }),
+ ))
+ .with_status(201)
+ .with_body(json!({ "id": event_id }).to_string())
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ // ... creates the event and assert its id
+ assert_eq!(
+ http_proxy.send_event("clock_event", "tick", None).await?,
+ event_id
+ );
+
+ Ok(())
+ }
}
diff --git a/crates/core/c8y_api/src/json_c8y.rs b/crates/core/c8y_api/src/json_c8y.rs
index 8566a84c..f1f0d2b7 100644
--- a/crates/core/c8y_api/src/json_c8y.rs
+++ b/crates/core/c8y_api/src/json_c8y.rs
@@ -17,6 +17,13 @@ pub struct C8yCreateEvent {
text: String,
}
+#[derive(Debug, Deserialize, Serialize, PartialEq)]
+#[serde(rename_all = "camelCase")]
+/// used to retrieve the id of a log event
+pub struct C8yEventResponse {
+ pub id: String,
+}
+
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct C8yManagedObject {
diff --git a/crates/core/tedge_mapper/Cargo.toml b/crates/core/tedge_mapper/Cargo.toml
index ffc8d6d3..ba44908d 100644
--- a/crates/core/tedge_mapper/Cargo.toml
+++ b/crates/core/tedge_mapper/Cargo.toml
@@ -37,7 +37,7 @@ csv = "1.1"
download = { path = "../../common/download" }
flockfile = { path = "../../common/flockfile" }
futures = "0.3"
-mockall = "0.10"
+mockall = "0.11"
mqtt_channel = { path = "../../common/mqtt_channel" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index b6c3fdd4..7989cfbd 100644
--- a/crates/core/tedge_mapper/src/az/converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -37,11 +37,10 @@ impl Converter for AzureConverter {
}
async fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
- let input = input.payload_str()?;
let () = self.size_threshold.validate(input)?;
let default_timestamp = self.add_timestamp.then(|| self.clock.now());
let mut serializer = ThinEdgeJsonSerializer::new_with_timestamp(default_timestamp);
- let () = thin_edge_json::parser::parse_str(input, &mut serializer)?;
+ let () = thin_edge_json::parser::parse_str(input.payload_str()?, &mut serializer)?;
let payload = serializer.into_string()?;
Ok(vec![(Message::new(&self.mapper_config.out_topic, payload))])
@@ -52,11 +51,7 @@ impl Converter for AzureConverter {
mod tests {
use crate::{
az::converter::AzureConverter,
- core::{
- converter::*,
- error::ConversionError,
- size_threshold::{SizeThreshold, SizeThresholdExceeded},
- },
+ core::{converter::*, error::ConversionError, size_threshold::SizeThreshold},
};
use assert_json_diff::*;
@@ -194,17 +189,17 @@ mod tests {
async fn exceeding_threshold_returns_error() {
let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1));
+ let _topic = "tedge/measurements".to_string();
let input = "ABC";
let result = converter.try_convert(&new_tedge_message(input)).await;
assert_matches!(
result,
- Err(ConversionError::FromSizeThresholdExceeded(
- SizeThresholdExceeded {
- actual_size: 3,
- threshold: 1
- }
- ))
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _topic,
+ actual_size: 3,
+ threshold: 1
+ })
);
}
}
diff --git a/crates/core/tedge_mapper/src/c8y/converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 899a763d..1616acc8 100644
--- a/crates/core/tedge_mapper/src/c8y/converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -9,7 +9,7 @@ use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
- event::serialize_event,
+ event::{self},
operations::Operations,
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
@@ -29,6 +29,7 @@ use std::{
process::Stdio,
};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use time::{format_description::well_known::Rfc3339, OffsetDateTime};
use tracing::{debug, info, log::error};
use super::{
@@ -144,12 +145,49 @@ where
Ok(vec)
}
- fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
+ async fn try_convert_event(
+ &mut self,
+ input: &Message,
+ ) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- let smartrest_alarm = serialize_event(tedge_event)?;
- let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
- Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ // If the message size is well within the Cumulocity MQTT size limit, use MQTT to send the mapped event as well
+ if input.payload_bytes().len() < self.size_threshold.0 {
+ let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+
+ Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
+ } else {
+ // If the message size is larger than the MQTT size limit, use HTTP to send the mapped event
+ let (event_text, event_time) = match tedge_event.data {
+ None => {
+ let message = tedge_event.name.clone();
+ let time = OffsetDateTime::now_utc().format(&Rfc3339)?;
+
+ (message, time)
+ }
+ Some(event_data) => {
+ let message = event_data
+ .message
+ .unwrap_or_else(|| tedge_event.name.clone());
+ let time = event_data.time.map_or_else(
+ || OffsetDateTime::now_utc().format(&Rfc3339),
+ |timestamp| timestamp.format(&Rfc3339),
+ )?;
+ (message, time)
+ }
+ };
+
+ let _ = self
+ .http_proxy
+ .send_event(
+ tedge_event.name.as_str(),
+ event_text.as_str(),
+ Some(event_time),
+ )
+ .await?;
+ Ok(vec![])
+ }
}
}
@@ -164,20 +202,22 @@ where
&self.mapper_config
}
async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
- let () = self.size_threshold.validate(message.payload_str()?)?;
-
match &message.topic {
topic if topic.name.starts_with("tedge/measurements") => {
+ let () = self.size_threshold.validate(message)?;
self.try_convert_measurement(message)
}
topic if topic.name.starts_with("tedge/alarms") => {
+ let () = self.size_threshold.validate(message)?;
self.alarm_converter.try_convert_alarm(message)
}
topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
self.alarm_converter.process_internal_alarm(message);
Ok(vec![])
}
- topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message),
+ topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => {
+ self.try_convert_event(message).await
+ }
topic => match topic.clone().try_into() {
Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => {
debug!("Software list");
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 315449fe..a5df4d0b 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -5,7 +5,7 @@ use crate::{
use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
-use c8y_api::http_proxy::JwtAuthHttpProxy;
+use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
use mqtt_channel::{Config, TopicFilter};
use tedge_config::{
@@ -16,6 +16,7 @@ use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
+const MQTT_MESSAGE_SIZE_THRESHOLD: usize = 16 * 1024;
pub struct CumulocityMapper {}
@@ -65,10 +66,11 @@ impl CumulocityMapper {
#[async_trait]
impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
- let size_threshold = SizeThreshold(16 * 1024);
+ let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
+ let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
+ http_proxy.init().await?;
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
diff --git a/crates/core/tedge_mapper/src/c8y/tests.rs b/crates/core/tedge_mapper/src/c8y/tests.rs
index b300bf0f..7cbed97a 100644
--- a/crates/core/tedge_mapper/src/c8y/tests.rs
+++ b/crates/core/tedge_mapper/src/c8y/tests.rs
@@ -1,11 +1,21 @@
-use crate::core::{converter::Converter, mapper::create_mapper, size_threshold::SizeThreshold};
-use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use crate::core::{
+ converter::Converter, error::ConversionError, mapper::create_mapper,
+ size_threshold::SizeThreshold,
+};
+use anyhow::Result;
+use assert_matches::assert_matches;
+use c8y_api::{
+ http_proxy::{C8YHttpProxy, MockC8YHttpProxy},
+ json_c8y::C8yUpdateSoftwareListResponse,
+};
use c8y_smartrest::{
error::SMCumulocityMapperError, operations::Operations,
smartrest_deserializer::SmartRestJwtResponse,
};
+use mockall::predicate;
use mqtt_channel::{Message, Topic};
use mqtt_tests::test_mqtt_server::MqttProcessHandler;
+use serde_json::json;
use serial_test::serial;
use std::time::Duration;
use test_case::test_case;
@@ -463,19 +473,7 @@ async fn c8y_mapper_syncs_pending_alarms_on_startup() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn test_sync_alarms() {
- let size_threshold = SizeThreshold(16 * 1024);
- let device_name = String::from("test");
- let device_type = String::from("test_type");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = CumulocityConverter::new(
- size_threshold,
- device_name,
- device_type,
- operations,
- http_proxy,
- );
+ let mut converter = create_c8y_converter();
let alarm_topic = "tedge/alarms/critical/temperature_alarm";
let alarm_payload = r#"{ "message": "Temperature very high" }"#;
@@ -529,18 +527,7 @@ async fn test_sync_alarms() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_thin_edge_json_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -573,18 +560,7 @@ async fn convert_thin_edge_json_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
@@ -619,18 +595,7 @@ async fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[serial]
async fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let device_name = String::from("test");
- let device_type = String::from("test");
- let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
-
- let mut converter = Box::new(CumulocityConverter::new(
- SizeThreshold(16 * 1024),
- device_name,
- device_type,
- operations,
- http_proxy,
- ));
+ let mut converter = create_c8y_converter();
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -694,30 +659,91 @@ fn extract_child_id(in_topic: &str, expected_child_id: Option<String>) {
}
}
-#[test]
-fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+#[tokio::test]
+async fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
+ let mut converter = create_c8y_converter();
+
+ let alarm_topic = "tedge/alarms/critical/temperature_alarm";
+ let big_message = create_packet(1024 * 20);
+ let alarm_payload = json!({ "message": big_message }).to_string();
+ let alarm_message = Message::new(&Topic::new_unchecked(alarm_topic), alarm_payload);
+
+ assert_matches!(
+ converter.try_convert(&alarm_message).await,
+ Err(ConversionError::SizeThresholdExceeded {
+ topic: _,
+ actual_size: 20494,
+ threshold: 16384
+ })
+ );
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn convert_event() -> Result<()> {
let size_threshold = SizeThreshold(16 * 1024);
let device_name = String::from("test");
- let device_type = String::from("test");
+ let device_type = String::from("test_type");
let operations = Operations::default();
- let http_proxy = FakeC8YHttpProxy {};
+ let http_proxy = MockC8YHttpProxy::new();
- let converter = CumulocityConverter::new(
+ let mut converter = CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
);
- let buffer = create_packet(1024 * 20)