summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src
diff options
context:
space:
mode:
authorRina Fujino <18257209+rina23q@users.noreply.github.com>2022-04-26 15:00:06 +0200
committerRina Fujino <18257209+rina23q@users.noreply.github.com>2022-04-26 15:00:06 +0200
commit224cb2f75bc0d897d2a266895eae48fc7107e83e (patch)
treedaca3d154535ac4f3840371ff462347689201d75 /crates/core/tedge_mapper/src
parentf5b2f5b224831cf47b454c202026c85aa1d0c7de (diff)
parent42dcd4ec556dba3ec2987895bd79c566c3c6546a (diff)
Merge branch 'main' into feature/1030/configuration-management
Diffstat (limited to 'crates/core/tedge_mapper/src')
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs17
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs140
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs10
-rw-r--r--crates/core/tedge_mapper/src/core/component.rs6
-rw-r--r--crates/core/tedge_mapper/src/main.rs6
5 files changed, 152 insertions, 27 deletions
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index 0d998eff..7f608f08 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -1,3 +1,5 @@
+use std::path::Path;
+
use crate::{
az::converter::AzureConverter,
core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold},
@@ -26,12 +28,13 @@ impl TEdgeComponent for AzureMapper {
AZURE_MAPPER_NAME
}
- async fn init(&self) -> Result<(), anyhow::Error> {
+ async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error> {
info!("Initialize tedge mapper az");
+ let config_dir = cfg_dir.display().to_string();
create_directory_with_user_group(
- "/etc/tedge/operations/az",
- "tedge-mapper",
- "tedge-mapper",
+ &format!("{config_dir}/operations/az"),
+ "tedge",
+ "tedge",
0o775,
)?;
@@ -39,7 +42,11 @@ impl TEdgeComponent for AzureMapper {
Ok(())
}
- async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ async fn start(
+ &self,
+ tedge_config: TEdgeConfig,
+ _config_dir: &Path,
+ ) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index cb9a4f93..3842b3a5 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -1,3 +1,5 @@
+use std::path::Path;
+
use crate::{
c8y::converter::CumulocityConverter,
core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold},
@@ -47,19 +49,21 @@ impl TEdgeComponent for CumulocityMapper {
CUMULOCITY_MAPPER_NAME
}
- async fn init(&self) -> Result<(), anyhow::Error> {
+ async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error> {
info!("Initialize tedge mapper c8y");
- create_directories()?;
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let config_dir = cfg_dir.display().to_string();
+ create_directories(&config_dir)?;
+ let operations = Operations::try_new(&format!("{config_dir}/operations"), "c8y")?;
self.init_session(CumulocityMapper::subscriptions(&operations)?)
.await?;
Ok(())
}
- async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
+ let config_dir = cfg_dir.display().to_string();
- let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
+ let operations = Operations::try_new(format!("{config_dir}/operations"), "c8y")?;
let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
http_proxy.init().await?;
let device_name = tedge_config.query(DeviceIdSetting)?;
@@ -87,24 +91,130 @@ impl TEdgeComponent for CumulocityMapper {
}
}
-fn create_directories() -> Result<(), anyhow::Error> {
+fn create_directories(config_dir: &str) -> Result<(), anyhow::Error> {
create_directory_with_user_group(
- "/etc/tedge/operations/c8y",
- "tedge-mapper",
- "tedge-mapper",
+ &format!("{config_dir}/operations/c8y"),
+ "tedge",
+ "tedge",
0o775,
)?;
create_file_with_user_group(
- "/etc/tedge/operations/c8y/c8y_SoftwareUpdate",
- "tedge-mapper",
- "tedge-mapper",
+ &format!("{config_dir}/operations/c8y/c8y_SoftwareUpdate"),
+ "tedge",
+ "tedge",
0o644,
)?;
create_file_with_user_group(
- "/etc/tedge/operations/c8y/c8y_Restart",
- "tedge-mapper",
- "tedge-mapper",
+ &format!("{config_dir}/operations/c8y/c8y_Restart"),
+ "tedge",
+ "tedge",
0o644,
)?;
Ok(())
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use c8y_api::http_proxy::MockC8yJwtTokenRetriever;
+ use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse;
+ use mockito::mock;
+ use mqtt_tests::{assert_received_all_expected, test_mqtt_broker};
+ use serde_json::json;
+ use std::time::Duration;
+ use test_case::test_case;
+
+ const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5);
+ const CUMULOCITY_MAPPER_NAME_TEST: &str = "tedge-mapper-c8y-test";
+ const DEVICE_ID: &str = "test-device";
+ const DEVICE_NAME: &str = "test-user";
+ const DEVICE_TYPE: &str = "test-thin-edge.io";
+ const MQTT_HOST: &str = "127.0.0.1";
+
+ /// `test_tedge_mapper_with_mqtt_pub` will start tedge mapper and run the following tests:
+ ///
+ /// - receive pub message with wrong payload and expect an error message in tedge/errors topic
+ /// - receive pub message with a correct payload and expect to observe this on the right topic
+ #[test_case(
+ "tedge/measurements",
+ "tedge/errors",
+ "{",
+ "Invalid JSON: EOF while parsing an object at line 1 column 1: `{`"
+ )] // fail case
+ #[test_case(
+ "tedge/measurements",
+ "tedge/measurements",
+ r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#,
+ r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#
+ )] // successs case
+ #[tokio::test]
+ async fn test_tedge_mapper_with_mqtt_pub(
+ pub_topic: &str,
+ sub_topic: &str,
+ payload: &str,
+ expected_msg: &str,
+ ) -> Result<(), anyhow::Error> {
+ // Mock endpoint to return C8Y internal id
+ let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device")
+ .with_status(200)
+ .with_body(
+ json!({ "externalId": DEVICE_ID, "managedObject": { "id": "123" } }).to_string(),
+ )
+ .create();
+
+ let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new());
+ jwt_token_retriver
+ .expect_get_jwt_token()
+ .returning(|| Ok(SmartRestJwtResponse::default()));
+
+ let http_client = reqwest::ClientBuilder::new().build().unwrap();
+ let proxy = JwtAuthHttpProxy::new(
+ jwt_token_retriver,
+ http_client,
+ mockito::server_url().as_str(),
+ DEVICE_ID,
+ );
+
+ // mapper config
+ let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
+ let operations = Operations::default();
+
+ let converter = Box::new(CumulocityConverter::new(
+ size_threshold,
+ DEVICE_NAME.into(),
+ DEVICE_TYPE.into(),
+ operations,
+ proxy,
+ ));
+
+ let broker = test_mqtt_broker();
+
+ let mut mapper = create_mapper(
+ CUMULOCITY_MAPPER_NAME_TEST,
+ MQTT_HOST.into(),
+ broker.port,
+ converter,
+ )
+ .await?;
+
+ // subscribe to `sub_topic`
+ let mut messages = broker.messages_published_on(sub_topic).await;
+
+ // run tedge_mapper in background
+ tokio::spawn(async move {
+ mapper
+ .run()
+ .instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST))
+ .await
+ .unwrap();
+ });
+
+ // publish `payload` to `pub_topic`
+ let () = broker.publish(pub_topic, payload).await?;
+
+ // check the `messages` returned contain `expected_msg`
+ assert_received_all_expected(&mut messages, TEST_TIMEOUT_SECS, &[expected_msg]).await;
+ Ok(())
+ }
+}
diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs
index a4c6f4c5..ce4b1201 100644
--- a/crates/core/tedge_mapper/src/collectd/mapper.rs
+++ b/crates/core/tedge_mapper/src/collectd/mapper.rs
@@ -1,3 +1,5 @@
+use std::path::Path;
+
use crate::{
collectd::monitor::{DeviceMonitor, DeviceMonitorConfig},
core::component::TEdgeComponent,
@@ -23,7 +25,7 @@ impl TEdgeComponent for CollectdMapper {
COLLECTD_MAPPER_NAME
}
- async fn init(&self) -> Result<(), anyhow::Error> {
+ async fn init(&self, _cfg_dir: &Path) -> Result<(), anyhow::Error> {
info!("Initialize tedge mapper collectd");
self.init_session(TopicFilter::new(
DeviceMonitorConfig::default().mqtt_source_topic,
@@ -32,7 +34,11 @@ impl TEdgeComponent for CollectdMapper {
Ok(())
}
- async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ async fn start(
+ &self,
+ tedge_config: TEdgeConfig,
+ _config_dir: &Path,
+ ) -> Result<(), anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs
index 786c78c8..c39d797c 100644
--- a/crates/core/tedge_mapper/src/core/component.rs
+++ b/crates/core/tedge_mapper/src/core/component.rs
@@ -1,3 +1,5 @@
+use std::path::Path;
+
use async_trait::async_trait;
use mqtt_channel::TopicFilter;
use tedge_config::{
@@ -8,8 +10,8 @@ use tracing::info;
#[async_trait]
pub trait TEdgeComponent: Sync + Send {
fn session_name(&self) -> &str;
- async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>;
- async fn init(&self) -> Result<(), anyhow::Error>;
+ async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error>;
+ async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error>;
async fn init_session(&self, mqtt_topics: TopicFilter) -> Result<(), anyhow::Error> {
mqtt_channel::init_session(&self.get_mqtt_config()?.with_subscriptions(mqtt_topics))
.await?;
diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs
index aabcff27..4b4a9f47 100644
--- a/crates/core/tedge_mapper/src/main.rs
+++ b/crates/core/tedge_mapper/src/main.rs
@@ -86,14 +86,14 @@ async fn main() -> anyhow::Result<()> {
// Run only one instance of a mapper
let _flock = check_another_instance_is_not_running(
&mapper_opt.name.to_string(),
- &config.query(RunPathDefaultSetting)?.into(),
+ &config.query(RunPathSetting)?.into(),
)?;
if mapper_opt.init {
- component.init().await
+ component.init(&mapper_opt.config_dir).await
} else if mapper_opt.clear {
component.clear_session().await
} else {
- component.start(config).await
+ component.start(config, &mapper_opt.config_dir).await
}
}