summaryrefslogtreecommitdiffstats
path: root/crates/core/c8y_api/src/http_proxy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/c8y_api/src/http_proxy.rs')
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs246
1 files changed, 181 insertions, 65 deletions
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index 9f853043..4c6dff98 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -1,9 +1,11 @@
use crate::json_c8y::{
- C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
+ C8yCreateEvent, C8yEventResponse, C8yManagedObject, C8yUpdateSoftwareListResponse,
+ InternalIdResponse,
};
use async_trait::async_trait;
use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse};
+use mockall::automock;
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
use reqwest::Url;
use std::time::Duration;
@@ -13,12 +15,12 @@ use tedge_config::{
};
use time::{format_description, OffsetDateTime};
-use serde::{Deserialize, Serialize};
use tracing::{error, info, instrument};
const RETRY_TIMEOUT_SECS: u64 = 60;
/// An HttpProxy handles http requests to C8y on behalf of the device.
+#[automock]
#[async_trait]
pub trait C8YHttpProxy: Send + Sync {
async fn init(&mut self) -> Result<(), SMCumulocityMapperError>;
@@ -27,6 +29,13 @@ pub trait C8YHttpProxy: Send + Sync {
async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+ async fn send_event(
+ &mut self,
+ event_type: &str,
+ text: &str,
+ time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError>;
+
async fn send_software_list_http(
&mut self,
c8y_software_list: &C8yUpdateSoftwareListResponse,
@@ -39,6 +48,7 @@ pub trait C8YHttpProxy: Send + Sync {
}
/// Define a C8y endpoint
+#[derive(Debug)]
pub struct C8yEndPoint {
c8y_host: String,
device_id: String,
@@ -55,10 +65,18 @@ impl C8yEndPoint {
}
}
+ fn get_base_url(&self) -> String {
+ let mut url_get_id = String::new();
+ if !self.c8y_host.starts_with("http") {
+ url_get_id.push_str("https://");
+ }
+ url_get_id.push_str(&self.c8y_host);
+
+ url_get_id
+ }
+
fn get_url_for_sw_list(&self) -> String {
- let mut url_update_swlist = String::new();
- url_update_swlist.push_str("https://");
- url_update_swlist.push_str(&self.c8y_host);
+ let mut url_update_swlist = self.get_base_url();
url_update_swlist.push_str("/inventory/managedObjects/");
url_update_swlist.push_str(&self.c8y_internal_id);
@@ -66,9 +84,7 @@ impl C8yEndPoint {
}
fn get_url_for_get_id(&self) -> String {
- let mut url_get_id = String::new();
- url_get_id.push_str("https://");
- url_get_id.push_str(&self.c8y_host);
+ let mut url_get_id = self.get_base_url();
url_get_id.push_str("/identity/externalIds/c8y_Serial/");
url_get_id.push_str(&self.device_id);
@@ -76,9 +92,7 @@ impl C8yEndPoint {
}
fn get_url_for_create_event(&self) -> String {
- let mut url_create_event = String::new();
- url_create_event.push_str("https://");
- url_create_event.push_str(&self.c8y_host);
+ let mut url_create_event = self.get_base_url();
url_create_event.push_str("/event/events/");
url_create_event
@@ -118,11 +132,46 @@ impl C8yEndPoint {
}
}
-#[derive(Debug, Deserialize, Serialize, PartialEq)]
-#[serde(rename_all = "camelCase")]
-/// used to retrieve the id of a log event
-pub struct SmartRestLogEvent {
- pub id: String,
+#[automock]
+#[async_trait]
+pub trait C8yJwtTokenRetriever: Send + Sync {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError>;
+}
+
+pub struct C8yMqttJwtTokenRetriever {
+ mqtt_con: mqtt_channel::Connection,
+}
+
+impl C8yMqttJwtTokenRetriever {
+ pub fn new(mqtt_con: mqtt_channel::Connection) -> Self {
+ C8yMqttJwtTokenRetriever { mqtt_con }
+ }
+}
+
+#[async_trait]
+impl C8yJwtTokenRetriever for C8yMqttJwtTokenRetriever {
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ let () = self
+ .mqtt_con
+ .published
+ .publish(mqtt_channel::Message::new(
+ &Topic::new_unchecked("c8y/s/uat"),
+ "".to_string(),
+ ))
+ .await?;
+ let token_smartrest = match tokio::time::timeout(
+ Duration::from_secs(10),
+ self.mqtt_con.received.next(),
+ )
+ .await
+ {
+ Ok(Some(msg)) => msg.payload_str()?.to_string(),
+ Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
+ Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
+ };
+
+ Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ }
}
/// An HttpProxy that uses MQTT to retrieve JWT tokens and authenticate the device
@@ -130,20 +179,20 @@ pub struct SmartRestLogEvent {
/// - Keep the connection info to c8y and the internal Id of the device
/// - Handle JWT requests
pub struct JwtAuthHttpProxy {
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
end_point: C8yEndPoint,
}
impl JwtAuthHttpProxy {
pub fn new(
- mqtt_con: mqtt_channel::Connection,
+ jwt_token_retriver: Box<dyn C8yJwtTokenRetriever>,
http_con: reqwest::Client,
c8y_host: &str,
device_id: &str,
) -> JwtAuthHttpProxy {
JwtAuthHttpProxy {
- mqtt_con,
+ jwt_token_retriver,
http_con,
end_point: C8yEndPoint {
c8y_host: c8y_host.into(),
@@ -171,31 +220,29 @@ impl JwtAuthHttpProxy {
// Ignore errors on this connection
let () = mqtt_con.errors.close();
+ let jwt_token_retriver = Box::new(C8yMqttJwtTokenRetriever::new(mqtt_con));
+
Ok(JwtAuthHttpProxy::new(
- mqtt_con, http_con, &c8y_host, &device_id,
+ jwt_token_retriver,
+ http_con,
+ &c8y_host,
+ &device_id,
))
}
async fn try_get_and_set_internal_id(&mut self) -> Result<(), SMCumulocityMapperError> {
- let token = self.get_jwt_token().await?;
- let url_get_id = self.end_point.get_url_for_get_id();
-
- self.end_point.c8y_internal_id = self
- .try_get_internal_id(&url_get_id, &token.token())
- .await?;
-
+ self.end_point.c8y_internal_id = self.try_get_internal_id().await?;
Ok(())
}
- async fn try_get_internal_id(
- &self,
- url_get_id: &str,
- token: &str,
- ) -> Result<String, SMCumulocityMapperError> {
+ async fn try_get_internal_id(&mut self) -> Result<String, SMCumulocityMapperError> {
+ let token = self.get_jwt_token().await?;
+ let url_get_id = self.end_point.get_url_for_get_id();
+
let internal_id = self
.http_con
.get(url_get_id)
- .bearer_auth(token)
+ .bearer_auth(token.token())
.send()
.await?;
let internal_id_response = internal_id.json::<InternalIdResponse>().await?;
@@ -204,26 +251,24 @@ impl JwtAuthHttpProxy {
Ok(internal_id)
}
- /// Make a POST request to /event/events and return the event id from response body.
- /// The event id is used to upload the binary.
fn create_log_event(&self) -> C8yCreateEvent {
- let local = OffsetDateTime::now_utc();
+ self.create_event("c8y_Logfile", "software-management", None)
+ }
+ fn create_event(&self, event_type: &str, text: &str, time: Option<String>) -> C8yCreateEvent {
let c8y_managed_object = C8yManagedObject {
id: self.end_point.c8y_internal_id.clone(),
};
- C8yCreateEvent::new(
- c8y_managed_object,
- "c8y_Logfile",
- &local
+ let time = time.unwrap_or_else(|| {
+ OffsetDateTime::now_utc()
.format(&format_description::well_known::Rfc3339)
- .unwrap(),
- "software-management",
- )
+ .unwrap()
+ });
+ C8yCreateEvent::new(c8y_managed_object, event_type, time.as_str(), text)
}
- async fn get_event_id(
+ async fn send_event_internal(
&mut self,
c8y_event: C8yCreateEvent,
) -> Result<String, SMCumulocityMapperError> {
@@ -240,7 +285,8 @@ impl JwtAuthHttpProxy {
.build()?;
let response = self.http_con.execute(request).await?;
- let event_response_body = response.json::<SmartRestLogEvent>().await?;
+ let _ = response.error_for_status_ref()?;
+ let event_response_body = response.json::<C8yEventResponse>().await?;
Ok(event_response_body.id)
}
@@ -271,26 +317,17 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
}
async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
- let () = self
- .mqtt_con
- .published
- .publish(mqtt_channel::Message::new(
- &Topic::new_unchecked("c8y/s/uat"),
- "".to_string(),
- ))
- .await?;
- let token_smartrest = match tokio::time::timeout(
- Duration::from_secs(10),
- self.mqtt_con.received.next(),
- )
- .await
- {
- Ok(Some(msg)) => msg.payload_str()?.to_string(),
- Ok(None) => return Err(SMCumulocityMapperError::InvalidMqttMessage),
- Err(_elapsed) => return Err(SMCumulocityMapperError::RequestTimeout),
- };
+ self.jwt_token_retriver.get_jwt_token().await
+ }
- Ok(SmartRestJwtResponse::try_new(&token_smartrest)?)
+ async fn send_event(
+ &mut self,
+ event_type: &str,
+ text: &str,
+ time: Option<String>,
+ ) -> Result<String, SMCumulocityMapperError> {
+ let c8y_event: C8yCreateEvent = self.create_event(event_type, text, time);
+ self.send_event_internal(c8y_event).await
}
async fn send_software_list_http(
@@ -320,7 +357,7 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
let token = self.get_jwt_token().await?;
let log_event = self.create_log_event();
- let event_response_id = self.get_event_id(log_event).await?;
+ let event_response_id = self.send_event_internal(log_event).await?;
let binary_upload_event_url = self
.end_point
.get_url_for_event_binary_upload(&event_response_id);
@@ -343,6 +380,9 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
#[cfg(test)]
mod tests {
use super::*;
+ use anyhow::Result;
+ use mockito::{mock, Matcher};
+ use serde_json::json;
use test_case::test_case;
#[test]
@@ -387,4 +427,80 @@ mod tests {
let c8y = C8yEndPoint::new("test.test.com", "test_device", "internal-id");
assert!(!c8y.url_is_in_my_tenant_domain(url));
}
+
+ #[tokio::test]
+ async fn get_internal_id() -> Result<()> {
+ let device_id = "test-device";
+ let internal_device_id = "1234";
+
+ let _mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": internal_device_id } })
+ .to_string(),
+ )
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ assert_eq!(http_proxy.try_get_internal_id().await?, internal_device_id);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn send_event() -> anyhow::Result<()> {
+ let device_id = "test-device";
+ let event_id = "456";
+
+ // Mock endpoint to return C8Y internal id
+ let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": device_id, "managedObject": { "id": "123" } }).to_string(),
+ )
+ .create();
+
+ let _create_event_mock = mock("POST", "/event/events/")
+ .match_body(Matcher::PartialJson(
+ json!({ "type": "clock_event", "text": "tick" }),
+ ))
+ .with_status(201)
+ .with_body(json!({ "id": event_id }).to_string())
+ .create();
+
+ // An JwtAuthHttpProxy ...
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let mut http_proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ device_id,
+ );
+
+ // ... creates the event and assert its id
+ assert_eq!(
+ http_proxy.send_event("clock_event", "tick", None).await?,
+ event_id
+ );
+
+ Ok(())
+ }
}