From 34ffb846e67b7ccee34348e1d899c12b9bda3756 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Mon, 11 Apr 2022 15:13:32 +0530 Subject: Closes #1040 use config-dir feature for initalize and run (#1059) --- crates/core/tedge_mapper/src/az/mapper.rs | 13 ++++++++++--- crates/core/tedge_mapper/src/c8y/mapper.rs | 22 +++++++++++++--------- crates/core/tedge_mapper/src/collectd/mapper.rs | 10 ++++++++-- crates/core/tedge_mapper/src/core/component.rs | 6 ++++-- crates/core/tedge_mapper/src/main.rs | 4 ++-- 5 files changed, 37 insertions(+), 18 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 0d998eff..2971d2f9 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use crate::{ az::converter::AzureConverter, core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold}, @@ -26,10 +28,11 @@ impl TEdgeComponent for AzureMapper { AZURE_MAPPER_NAME } - async fn init(&self) -> Result<(), anyhow::Error> { + async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error> { info!("Initialize tedge mapper az"); + let config_dir = cfg_dir.display().to_string(); create_directory_with_user_group( - "/etc/tedge/operations/az", + &format!("{config_dir}/operations/az"), "tedge-mapper", "tedge-mapper", 0o775, @@ -39,7 +42,11 @@ impl TEdgeComponent for AzureMapper { Ok(()) } - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + async fn start( + &self, + tedge_config: TEdgeConfig, + _config_dir: &Path, + ) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index cb9a4f93..e5ae2d04 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use crate::{ c8y::converter::CumulocityConverter, core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold}, @@ -47,19 +49,21 @@ impl TEdgeComponent for CumulocityMapper { CUMULOCITY_MAPPER_NAME } - async fn init(&self) -> Result<(), anyhow::Error> { + async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error> { info!("Initialize tedge mapper c8y"); - create_directories()?; - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let config_dir = cfg_dir.display().to_string(); + create_directories(&config_dir)?; + let operations = Operations::try_new(&format!("{config_dir}/operations"), "c8y")?; self.init_session(CumulocityMapper::subscriptions(&operations)?) .await?; Ok(()) } - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error> { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); + let config_dir = cfg_dir.display().to_string(); - let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; + let operations = Operations::try_new(format!("{config_dir}/operations"), "c8y")?; let mut http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?; http_proxy.init().await?; let device_name = tedge_config.query(DeviceIdSetting)?; @@ -87,21 +91,21 @@ impl TEdgeComponent for CumulocityMapper { } } -fn create_directories() -> Result<(), anyhow::Error> { +fn create_directories(config_dir: &str) -> Result<(), anyhow::Error> { create_directory_with_user_group( - "/etc/tedge/operations/c8y", + &format!("{config_dir}/operations/c8y"), "tedge-mapper", "tedge-mapper", 0o775, )?; create_file_with_user_group( - "/etc/tedge/operations/c8y/c8y_SoftwareUpdate", + &format!("{config_dir}/operations/c8y/c8y_SoftwareUpdate"), "tedge-mapper", "tedge-mapper", 0o644, )?; create_file_with_user_group( - "/etc/tedge/operations/c8y/c8y_Restart", + &format!("{config_dir}/operations/c8y/c8y_Restart"), "tedge-mapper", "tedge-mapper", 0o644, diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs index a4c6f4c5..ce4b1201 100644 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ b/crates/core/tedge_mapper/src/collectd/mapper.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use crate::{ collectd::monitor::{DeviceMonitor, DeviceMonitorConfig}, core::component::TEdgeComponent, @@ -23,7 +25,7 @@ impl TEdgeComponent for CollectdMapper { COLLECTD_MAPPER_NAME } - async fn init(&self) -> Result<(), anyhow::Error> { + async fn init(&self, _cfg_dir: &Path) -> Result<(), anyhow::Error> { info!("Initialize tedge mapper collectd"); self.init_session(TopicFilter::new( DeviceMonitorConfig::default().mqtt_source_topic, @@ -32,7 +34,11 @@ impl TEdgeComponent for CollectdMapper { Ok(()) } - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { + async fn start( + &self, + tedge_config: TEdgeConfig, + _config_dir: &Path, + ) -> Result<(), anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs index 786c78c8..c39d797c 100644 --- a/crates/core/tedge_mapper/src/core/component.rs +++ b/crates/core/tedge_mapper/src/core/component.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use async_trait::async_trait; use mqtt_channel::TopicFilter; use tedge_config::{ @@ -8,8 +10,8 @@ use tracing::info; #[async_trait] pub trait TEdgeComponent: Sync + Send { fn session_name(&self) -> &str; - async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error>; - async fn init(&self) -> Result<(), anyhow::Error>; + async fn start(&self, tedge_config: TEdgeConfig, cfg_dir: &Path) -> Result<(), anyhow::Error>; + async fn init(&self, cfg_dir: &Path) -> Result<(), anyhow::Error>; async fn init_session(&self, mqtt_topics: TopicFilter) -> Result<(), anyhow::Error> { mqtt_channel::init_session(&self.get_mqtt_config()?.with_subscriptions(mqtt_topics)) .await?; diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index aabcff27..27339ba8 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -90,10 +90,10 @@ async fn main() -> anyhow::Result<()> { )?; if mapper_opt.init { - component.init().await + component.init(&mapper_opt.config_dir).await } else if mapper_opt.clear { component.clear_session().await } else { - component.start(config).await + component.start(config, &mapper_opt.config_dir).await } } -- cgit v1.2.3 From 3e86cb3670da49def33f24a1cf9d4a72e24b0d9d Mon Sep 17 00:00:00 2001 From: Rina Fujino <18257209+rina23q@users.noreply.github.com> Date: Mon, 11 Apr 2022 16:41:02 +0200 Subject: Remove default from some tedge config settigs Signed-off-by: Rina Fujino <18257209+rina23q@users.noreply.github.com> --- crates/core/tedge_mapper/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/main.rs b/crates/core/tedge_mapper/src/main.rs index 27339ba8..4b4a9f47 100644 --- a/crates/core/tedge_mapper/src/main.rs +++ b/crates/core/tedge_mapper/src/main.rs @@ -86,7 +86,7 @@ async fn main() -> anyhow::Result<()> { // Run only one instance of a mapper let _flock = check_another_instance_is_not_running( &mapper_opt.name.to_string(), - &config.query(RunPathDefaultSetting)?.into(), + &config.query(RunPathSetting)?.into(), )?; if mapper_opt.init { -- cgit v1.2.3 From 6fa381f99a8f5b16bcda963afb4034a31e086db6 Mon Sep 17 00:00:00 2001 From: initard Date: Fri, 8 Apr 2022 08:46:49 +0100 Subject: moving python tedge mapper tests to rust #958 - removed tedge_mapper_c8y_negative - removed tedge_mapper_c8y_positive - added the same tests in rust using mqtt_tests Signed-off-by: initard --- crates/core/tedge_mapper/src/c8y/mapper.rs | 106 +++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index cb9a4f93..98301dda 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -108,3 +108,109 @@ fn create_directories() -> Result<(), anyhow::Error> { )?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + use c8y_api::http_proxy::MockC8yJwtTokenRetriever; + use c8y_smartrest::smartrest_deserializer::SmartRestJwtResponse; + use mockito::mock; + use mqtt_tests::{assert_received_all_expected, test_mqtt_broker}; + use serde_json::json; + use std::time::Duration; + use test_case::test_case; + + const TEST_TIMEOUT_SECS: Duration = Duration::from_secs(5); + const CUMULOCITY_MAPPER_NAME_TEST: &str = "tedge-mapper-c8y-test"; + const DEVICE_ID: &str = "test-device"; + const DEVICE_NAME: &str = "test-user"; + const DEVICE_TYPE: &str = "test-thin-edge.io"; + const MQTT_HOST: &str = "127.0.0.1"; + + /// `test_tedge_mapper_with_mqtt_pub` will start tedge mapper and run the following tests: + /// + /// - receive pub message with wrong payload and expect an error message in tedge/errors topic + /// - receive pub message with a correct payload and expect to observe this on the right topic + #[test_case( + "tedge/measurements", + "tedge/errors", + "{", + "Invalid JSON: EOF while parsing an object at line 1 column 1: `{`" + )] // fail case + #[test_case( + "tedge/measurements", + "tedge/measurements", + r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"#, + r#"{"temperature": 12, "time": "2021-06-15T17:01:15.806181503+02:00"}"# + )] // successs case + #[tokio::test] + async fn test_tedge_mapper_with_mqtt_pub( + pub_topic: &str, + sub_topic: &str, + payload: &str, + expected_msg: &str, + ) -> Result<(), anyhow::Error> { + // Mock endpoint to return C8Y internal id + let _get_internal_id_mock = mock("GET", "/identity/externalIds/c8y_Serial/test-device") + .with_status(200) + .with_body( + json!({ "externalId": DEVICE_ID, "managedObject": { "id": "123" } }).to_string(), + ) + .create(); + + let mut jwt_token_retriver = Box::new(MockC8yJwtTokenRetriever::new()); + jwt_token_retriver + .expect_get_jwt_token() + .returning(|| Ok(SmartRestJwtResponse::default())); + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let proxy = JwtAuthHttpProxy::new( + jwt_token_retriver, + http_client, + mockito::server_url().as_str(), + DEVICE_ID, + ); + + // mapper config + let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); + let operations = Operations::default(); + + let converter = Box::new(CumulocityConverter::new( + size_threshold, + DEVICE_NAME.into(), + DEVICE_TYPE.into(), + operations, + proxy, + )); + + let broker = test_mqtt_broker(); + + let mut mapper = create_mapper( + CUMULOCITY_MAPPER_NAME_TEST, + MQTT_HOST.into(), + broker.port, + converter, + ) + .await?; + + // subscribe to `sub_topic` + let mut messages = broker.messages_published_on(sub_topic).await; + + // run tedge_mapper in background + tokio::spawn(async move { + mapper + .run() + .instrument(info_span!(CUMULOCITY_MAPPER_NAME_TEST)) + .await + .unwrap(); + }); + + // publish `payload` to `pub_topic` + let () = broker.publish(pub_topic, payload).await?; + + // check the `messages` returned contain `expected_msg` + assert_received_all_expected(&mut messages, TEST_TIMEOUT_SECS, &[expected_msg]).await; + Ok(()) + } +} -- cgit v1.2.3 From 001347fd92e0b697a01a591ae68b3316be0d9ec1 Mon Sep 17 00:00:00 2001 From: PradeepKiruvale Date: Thu, 21 Apr 2022 15:03:09 +0530 Subject: Have a single unix user used for all the thin-edge daemons (#1085) * Closes #1031 single tedge user Signed-off-by: Pradeep Kumar K J * Add script to upgrade tedge from 0.6 Signed-off-by: Pradeep Kumar K J --- crates/core/tedge_mapper/src/az/mapper.rs | 4 ++-- crates/core/tedge_mapper/src/c8y/mapper.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'crates/core/tedge_mapper/src') diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 2971d2f9..7f608f08 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -33,8 +33,8 @@ impl TEdgeComponent for AzureMapper { let config_dir = cfg_dir.display().to_string(); create_directory_with_user_group( &format!("{config_dir}/operations/az"), - "tedge-mapper", - "tedge-mapper", + "tedge", + "tedge", 0o775, )?; diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 3bb4d2dd..3842b3a5 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -94,20 +94,20 @@ impl TEdgeComponent for CumulocityMapper { fn create_directories(config_dir: &str) -> Result<(), anyhow::Error> { create_directory_with_user_group( &format!("{config_dir}/operations/c8y"), - "tedge-mapper", - "tedge-mapper", + "tedge", + "tedge", 0o775, )?; create_file_with_user_group( &format!("{config_dir}/operations/c8y/c8y_SoftwareUpdate"), - "tedge-mapper", - "tedge-mapper", + "tedge", + "tedge", 0o644, )?; create_file_with_user_group( &format!("{config_dir}/operations/c8y/c8y_Restart"), - "tedge-mapper", - "tedge-mapper", + "tedge", + "tedge", 0o644, )?; Ok(()) -- cgit v1.2.3