diff options
author | PradeepKiruvale <pradeepkumar.kj@softwareag.com> | 2022-03-24 13:35:38 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-24 13:35:38 +0530 |
commit | 59142ef60484ddc7b103c841d34b308b9444f316 (patch) | |
tree | d7b8f89179d2244a1e116081b11a28e7c3a141fa | |
parent | d9ca6aedfad3de7931964200f26e40fdd577b48a (diff) |
[#761] tedge init (#993)
* tedge init #Closes 761
* add file/directory creation errors
* add the clear session to az and collectd mappers
* refactor tedgecomponent
* remove redundant code
* subscribe to az/collectd topics in init_session
* remove duplicate init_sessions
-rw-r--r-- | Cargo.lock | 13 | ||||
-rwxr-xr-x | configuration/debian/tedge/postinst | 18 | ||||
-rw-r--r-- | configuration/debian/tedge_agent/postinst | 8 | ||||
-rw-r--r-- | configuration/debian/tedge_mapper/postinst | 13 | ||||
-rw-r--r-- | crates/common/tedge_utils/Cargo.toml | 3 | ||||
-rw-r--r-- | crates/common/tedge_utils/src/file.rs | 210 | ||||
-rw-r--r-- | crates/common/tedge_utils/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/core/tedge/src/cli/mod.rs | 12 | ||||
-rw-r--r-- | crates/core/tedge/src/main.rs | 38 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/agent.rs | 11 | ||||
-rw-r--r-- | crates/core/tedge_agent/src/main.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/converter.rs | 8 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/mapper.rs | 20 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 65 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/mapper.rs | 20 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/core/component.rs | 35 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 20 |
18 files changed, 401 insertions, 98 deletions
@@ -2929,11 +2929,14 @@ dependencies = [ "anyhow", "assert_matches", "futures", + "nix", "tempfile", "thiserror", "tokio", "tracing", "tracing-subscriber", + "users", + "whoami", ] [[package]] @@ -3598,6 +3601,16 @@ dependencies = [ ] [[package]] +name = "whoami" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524b58fa5a20a2fb3014dd6358b70e6579692a56ef6fce928834e488f42f65e8" +dependencies = [ + "wasm-bindgen", + "web-sys", +] + +[[package]] name = "winapi" version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/configuration/debian/tedge/postinst b/configuration/debian/tedge/postinst index 2f0dfee1..788fb369 100755 --- a/configuration/debian/tedge/postinst +++ b/configuration/debian/tedge/postinst @@ -17,21 +17,6 @@ if ! getent passwd tedge >/dev/null; then adduser --quiet --system --gecos "" --no-create-home --disabled-login --shell /sbin/nologin --ingroup tedge tedge fi -### Create directories -# thin-edge.io uses `/etc/tedge` directory as its main configuration directory. -# This directory contains mqtt bus configuration directory under `mosquitto-conf` -# and device certificates under `device-certs` so both have to be created in the directory structure. -install -g tedge -o tedge -m 755 -d /etc/tedge -install -g tedge -o tedge -m 755 -d /etc/tedge/mosquitto-conf -install -g mosquitto -o mosquitto -m 755 -d /etc/tedge/device-certs - -# Create a directory for the operations added by the user. -install -g tedge -o tedge -m 755 -d /etc/tedge/operations -install -g tedge -o tedge -m 755 -d /etc/tedge/plugins - -# Create directory for logs -install -g tedge -o tedge -m 755 -d /var/log/tedge - ### Add include to mosquitto.conf so tedge specific conf will be loaded if ! grep -q "/etc/tedge/mosquitto-conf" "/etc/mosquitto/mosquitto.conf"; then echo "include_dir /etc/tedge/mosquitto-conf" >>/etc/mosquitto/mosquitto.conf @@ -44,5 +29,8 @@ if [ -f "/etc/sudoers.d/010_pi-nopasswd" ]; then echo "%tedge-users ALL = (ALL) NOPASSWD: /usr/bin/tedge" >/etc/sudoers.d/tedge-users-nopasswd fi +# Initialize the tedge +tedge --init + ### Below are automatically created script parts by cargo deb (Search for cargo deb DEBHELPER) #DEBHELPER# diff --git a/configuration/debian/tedge_agent/postinst b/configuration/debian/tedge_agent/postinst index 774c3b09..7430e3dc 100644 --- a/configuration/debian/tedge_agent/postinst +++ b/configuration/debian/tedge_agent/postinst @@ -11,6 +11,7 @@ fi # Create user tedge-agent with no home(--no-create-home), no login(--shell) and in group tedge-agent(--ingroup) if ! getent passwd tedge-agent >/dev/null; then adduser --quiet --system --no-create-home --ingroup tedge-agent --shell /usr/sbin/nologin tedge-agent + adduser tedge-agent tedge fi ### Create file in /etc/sudoers.d directory @@ -21,13 +22,6 @@ if [ -f "/etc/sudoers.d/010_pi-nopasswd" ]; then echo "%tedge-agent ALL = (ALL) NOPASSWD: /etc/tedge/sm-plugins/[a-zA-Z0-9]*, /bin/sync, /sbin/init" >/etc/sudoers.d/tedge-agent-nopasswd fi -### Create directories -# Create tedge-agent state directory /etc/tedge/.agent -install -g tedge-agent -o tedge-agent -m 754 -d /etc/tedge/.agent - -# Create /var/log/tedge/agent directory -install -g tedge-agent -o tedge-agent -m 755 -d /var/log/tedge/agent - # Reenable the services only if systemctl is available if command -v systemctl >/dev/null; then ### Enable the sm services if the device is connected to c8y cloud diff --git a/configuration/debian/tedge_mapper/postinst b/configuration/debian/tedge_mapper/postinst index dbfcd140..a8625f3f 100644 --- a/configuration/debian/tedge_mapper/postinst +++ b/configuration/debian/tedge_mapper/postinst @@ -11,19 +11,10 @@ fi # Create user tedge-mapper with no home(--no-create-home), no login(--shell) and in group tedge(--ingroup) if ! getent passwd tedge-mapper >/dev/null; then adduser --quiet --system --no-create-home --ingroup tedge-mapper --shell /usr/sbin/nologin tedge-mapper + adduser tedge-mapper tedge fi -### Create supported cloud operations directories -install -g tedge -o tedge -m 755 -d /etc/tedge/operations/c8y -install -g tedge -o tedge -m 755 -d /etc/tedge/operations/az - -### Create operation file. -# This allows thin-edge.io components to list and declare supported operations for the cloud provider. -# Some of the examples for Cumulocity IoT supported opertations: https://cumulocity.com/api/10.11.0/#section/Device-management-library/Miscellaneous -install -g tedge -o tedge -m 644 /dev/null /etc/tedge/operations/c8y/c8y_SoftwareUpdate -install -g tedge -o tedge -m 644 /dev/null /etc/tedge/operations/c8y/c8y_Restart - ### Initialize the sm mapper runuser -u tedge-mapper -- tedge_mapper --init c8y - +runuser -u tedge-mapper -- tedge_mapper --init az #DEBHELPER# diff --git a/crates/common/tedge_utils/Cargo.toml b/crates/common/tedge_utils/Cargo.toml index 2f06071f..776642df 100644 --- a/crates/common/tedge_utils/Cargo.toml +++ b/crates/common/tedge_utils/Cargo.toml @@ -15,13 +15,16 @@ logging = ["tracing", "tracing-subscriber"] [dependencies] anyhow = "1.0" futures = "0.3" +nix = "0.23.1" tempfile = "3.2" thiserror = "1.0" tokio = { version = "1.12", default_features = false, features = [ "fs", "io-util", "macros", "signal"] } tracing = { version = "0.1", features = [], optional = true } tracing-subscriber = { version = "0.3", optional = true, features = [ "time" ] } +users = "0.11.0" [dev-dependencies] assert_matches = "1.5" +whoami = "1.2.1" diff --git a/crates/common/tedge_utils/src/file.rs b/crates/common/tedge_utils/src/file.rs new file mode 100644 index 00000000..cc819e05 --- /dev/null +++ b/crates/common/tedge_utils/src/file.rs @@ -0,0 +1,210 @@ +use nix::unistd::*; +use std::fs::File; +use std::os::linux::fs::MetadataExt; +use std::os::unix::fs::PermissionsExt; +use std::{fs, io}; +use users::{get_group_by_name, get_user_by_name}; + +#[derive(thiserror::Error, Debug)] +pub enum FileError { + #[error("Creating the directory failed: {dir:?}.")] + DirectoryCreateFailed { dir: String, from: std::io::Error }, + + #[error("Creating the file failed: {file:?}.")] + FileCreateFailed { file: String, from: std::io::Error }, + + #[error("Failed to change owner: {name:?}.")] + MetaDataError { name: String, from: std::io::Error }, + + #[error("User not found: {user:?}.")] + UserNotFound { user: String }, + + #[error("Group not found: {group:?}.")] + GroupNotFound { group: String }, + + #[error(transparent)] + Errno(#[from] nix::errno::Errno), +} + +pub fn create_directory_with_user_group( + dir: &str, + user: &str, + group: &str, + mode: u32, +) -> Result<(), FileError> { + match fs::create_dir(dir) { + Ok(_) => { + change_owner_and_permission(dir, user, group, mode)?; + } + + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists { + return Ok(()); + } else { + return Err(FileError::DirectoryCreateFailed { + dir: dir.to_string(), + from: e, + }); + } + } + } + Ok(()) +} + +pub fn create_file_with_user_group( + file: &str, + user: &str, + group: &str, + mode: u32, +) -> Result<(), FileError> { + match File::create(file) { + Ok(_) => { + change_owner_and_permission(file, user, group, mode)?; + } + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists { + return Ok(()); + } else { + return Err(FileError::FileCreateFailed { + file: file.to_string(), + from: e, + }); + } + } + } + Ok(()) +} + +fn change_owner_and_permission( + file: &str, + user: &str, + group: &str, + mode: u32, +) -> Result<(), FileError> { + let ud = match get_user_by_name(user) { + Some(user) => user.uid(), + None => { + return Err(FileError::UserNotFound { user: user.into() }); + } + }; + + let gd = match get_group_by_name(group) { + Some(group) => group.gid(), + None => { + return Err(FileError::GroupNotFound { + group: group.into(), + }); + } + }; + + let uid = fs::metadata(file) + .map_err(|e| FileError::MetaDataError { + name: file.to_string(), + from: e, + })? + .st_uid(); + let gid = fs::metadata(file) + .map_err(|e| FileError::MetaDataError { + name: file.to_string(), + from: e, + })? + .st_gid(); + + // if user and group is same as existing, then do not change + if (ud != uid) && (gd != gid) { + chown( + file, + Some(Uid::from_raw(ud.into())), + Some(Gid::from_raw(gd.into())), + )?; + } + + let mut perm = fs::metadata(file) + .map_err(|e| FileError::MetaDataError { + name: file.to_string(), + from: e, + })? + .permissions(); + perm.set_mode(mode); + + fs::set_permissions(file, perm).map_err(|e| FileError::MetaDataError { + name: file.to_string(), + from: e, + })?; + + Ok(()) +} +#[cfg(test)] +mod tests { + use super::*; + use std::os::unix::fs::PermissionsExt; + use std::path::Path; + + #[test] + fn create_file_correct_user_group() { + let user = whoami::username(); + let _ = create_file_with_user_group("/tmp/fcreate_test", &user, &user, 0o644).unwrap(); + assert!(Path::new("/tmp/fcreate_test").exists()); + let meta = std::fs::metadata("/tmp/fcreate_test").unwrap(); + let perm = meta.permissions(); + println!("{:o}", perm.mode()); + assert!(format!("{:o}", perm.mode()).contains("644")); + fs::remove_file("/tmp/fcreate_test").unwrap(); + } + + #[test] + fn create_file_wrong_user() { + let user = whoami::username(); + let err = create_file_with_user_group("/tmp/fcreate_wrong_user", "test", &user, 0o775) + .unwrap_err(); + + assert!(err.to_string().contains("User not found")); + fs::remove_file("/tmp/fcreate_wrong_user").unwrap(); + } + + #[test] + fn create_file_wrong_group() { + let user = whoami::username(); + let err = create_file_with_user_group("/tmp/fcreate_wrong_group", &user, "test", 0o775) + .unwrap_err(); + + assert!(err.to_string().contains("Group not found")); + fs::remove_file("/tmp/fcreate_wrong_group").unwrap(); + } + + #[test] + fn create_directory_with_correct_user_group() { + let user = whoami::username(); + let _ = + create_directory_with_user_group("/tmp/fcreate_test_dir", &user, &user, 0o775).unwrap(); + + assert!(Path::new("/tmp/fcreate_test_dir").exists()); + let meta = std::fs::metadata("/tmp/fcreate_test_dir").unwrap(); + let perm = meta.permissions(); + println!("{:o}", perm.mode()); + assert!(format!("{:o}", perm.mode()).contains("775")); + fs::remove_dir("/tmp/fcreate_test_dir").unwrap(); + } + + #[test] + fn create_directory_with_wrong_user() { + let user = whoami::username(); + + let err = create_directory_with_user_group("/tmp/wrong_user_dir", "test", &user, 0o775) + .unwrap_err(); + + assert!(err.to_string().contains("User not found")); + fs::remove_dir("/tmp/wrong_user_dir").unwrap(); + } + + #[test] + fn create_directory_with_wrong_group() { + let user = whoami::username(); + + let err = create_directory_with_user_group("/tmp/wrong_group_dir", &user, "test", 0o775) + .unwrap_err(); + + assert!(err.to_string().contains("Group not found")); + fs::remove_dir("/tmp/wrong_group_dir").unwrap(); + } +} diff --git a/crates/common/tedge_utils/src/lib.rs b/crates/common/tedge_utils/src/lib.rs index ea37e166..254835cb 100644 --- a/crates/common/tedge_utils/src/lib.rs +++ b/crates/common/tedge_utils/src/lib.rs @@ -1,3 +1,4 @@ +pub mod file; pub mod fs; pub mod paths; pub mod signals; diff --git a/crates/core/tedge/src/cli/mod.rs b/crates/core/tedge/src/cli/mod.rs index 37870b30..6c0e3811 100644 --- a/crates/core/tedge/src/cli/mod.rs +++ b/crates/core/tedge/src/cli/mod.rs @@ -13,14 +13,20 @@ mod mqtt; #[clap( name = clap::crate_name!(), version = clap::crate_version!(), - about = clap::crate_description!() + about = clap::crate_description!(), + arg_required_else_help(true) )] + pub struct Opt { - #[clap(subcommand)] - pub tedge: TEdgeOpt, + /// Initialize the tedge + #[clap(long)] + pub init: bool, #[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)] pub config_dir: PathBuf, + + #[clap(subcommand)] + pub tedge: Option<TEdgeOpt>, } #[derive(clap::Subcommand, Debug)] diff --git a/crates/core/tedge/src/main.rs b/crates/core/tedge/src/main.rs index 672947fb..5b384e35 100644 --- a/crates/core/tedge/src/main.rs +++ b/crates/core/tedge/src/main.rs @@ -4,24 +4,27 @@ use anyhow::Context; use clap::Parser; use tedge_users::UserManager; - mod cli; mod command; mod error; mod system_services; +use tedge_utils::file::create_directory_with_user_group; type ConfigError = crate::error::TEdgeError; use command::{BuildCommand, BuildContext}; fn main() -> anyhow::Result<()> { - let user_manager = UserManager::new(); - - let _user_guard = user_manager.become_user(tedge_users::TEDGE_USER)?; - let opt = cli::Opt::parse(); + if opt.init { + initialize_tedge()?; + return Ok(()); + } + + let user_manager = UserManager::new(); let tedge_config_location = tedge_config::TEdgeConfigLocation::from_custom_root(opt.config_dir); + let _user_guard = user_manager.become_user(tedge_users::TEDGE_USER)?; let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone()); let build_context = BuildContext { @@ -30,11 +33,24 @@ fn main() -> anyhow::Result<()> { user_manager, }; - let cmd = opt - .tedge - .build_command(build_context) - .with_context(|| "missing configuration parameter")?; + if let Some(tedge_opt) = opt.tedge { + let cmd = tedge_opt + .build_command(build_context) + .with_context(|| "missing configuration parameter")?; + + cmd.execute() + .with_context(|| format!("failed to {}", cmd.description())) + } else { + Ok(()) + } +} - cmd.execute() - .with_context(|| format!("failed to {}", cmd.description())) +fn initialize_tedge() -> anyhow::Result<()> { + create_directory_with_user_group("/etc/tedge", "tedge", "tedge", 0o775)?; + create_directory_with_user_group("/var/log/tedge", "tedge", "tedge", 0o775)?; + create_directory_with_user_group("/etc/tedge/mosquitto-conf", "tedge", "tedge", 0o775)?; + create_directory_with_user_group("/etc/tedge/operations", "tedge", "tedge", 0o775)?; + create_directory_with_user_group("/etc/tedge/plugins", "tedge", "tedge", 0o775)?; + create_directory_with_user_group("/etc/tedge/device-certs", "mosquitto", "mosquitto", 0o775)?; + Ok(()) } diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs index 4a7e3667..00c1c71a 100644 --- a/crates/core/tedge_agent/src/agent.rs +++ b/crates/core/tedge_agent/src/agent.rs @@ -21,6 +21,7 @@ use tedge_config::{ MqttBindAddressSetting, MqttPortSetting, RunPathDefaultSetting, SoftwarePluginDefaultSetting, TEdgeConfigLocation, TmpPathDefaultSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH, }; +use tedge_utils::file::create_directory_with_user_group; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, warn}; @@ -209,9 +210,17 @@ impl SmAgent { } #[instrument(skip(self), name = "sm-agent")] - pub async fn init_session(&mut self) -> Result<(), AgentError> { + pub async fn init(&mut self) -> Result<(), anyhow::Error> { + create_directory_with_user_group("/etc/tedge/.agent", "tedge-agent", "tedge-agent", 0o775)?; + create_directory_with_user_group( + "/var/log/tedge/agent", + "tedge-agent", + "tedge-agent", + 0o775, + )?; info!("Initializing the tedge agent session"); mqtt_channel::init_session(&self.config.mqtt_config).await?; + Ok(()) } diff --git a/crates/core/tedge_agent/src/main.rs b/crates/core/tedge_agent/src/main.rs index 4847553f..4e71de4b 100644 --- a/crates/core/tedge_agent/src/main.rs +++ b/crates/core/tedge_agent/src/main.rs @@ -53,7 +53,7 @@ async fn main() -> Result<(), anyhow::Error> { SmAgentConfig::try_new(tedge_config_location)?, )?; if agent_opt.init { - agent.init_session().await?; + agent.init().await?; } else if agent_opt.clear { agent.clear_session().await?; } else { diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs index 7989cfbd..f7f4b7a9 100644 --- a/crates/core/tedge_mapper/src/az/converter.rs +++ b/crates/core/tedge_mapper/src/az/converter.rs @@ -2,7 +2,7 @@ use crate::core::{converter::*, error::*, size_threshold::SizeThreshold}; use async_trait::async_trait; use clock::Clock; -use mqtt_channel::Message; +use mqtt_channel::{Message, TopicFilter}; use thin_edge_json::serialize::ThinEdgeJsonSerializer; pub struct AzureConverter { @@ -15,7 +15,7 @@ pub struct AzureConverter { impl AzureConverter { pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self { let mapper_config = MapperConfig { - in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"), + in_topic_filter: Self::in_topic_filter(), out_topic: make_valid_topic_or_panic("az/messages/events/"), errors_topic: make_valid_topic_or_panic("tedge/errors"), }; @@ -26,6 +26,10 @@ impl AzureConverter { mapper_config, } } + + pub fn in_topic_filter() -> TopicFilter { + make_valid_topic_filter_or_panic("tedge/measurements") + } } #[async_trait] diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 542e6c1c..0d998eff 100644 --- a/crates/core/tedge_mapper/src/az/mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -7,7 +7,8 @@ use async_trait::async_trait; use clock::WallClock; use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig}; use tedge_config::{ConfigSettingAccessor, MqttPortSetting}; -use tracing::{info_span, Instrument}; +use tedge_utils::file::create_directory_with_user_group; +use tracing::{info, info_span, Instrument}; const AZURE_MAPPER_NAME: &str = "tedge-mapper-az"; @@ -21,6 +22,23 @@ impl AzureMapper { #[async_trait] impl TEdgeComponent for AzureMapper { + fn session_name(&self) -> &str { + AZURE_MAPPER_NAME + } + + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper az"); + create_directory_with_user_group( + "/etc/tedge/operations/az", + "tedge-mapper", + "tedge-mapper", + 0o775, + )?; + + self.init_session(AzureConverter::in_topic_filter()).await?; + Ok(()) + } + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set(); let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs index 07abc637..cb9a4f93 100644 --- a/crates/core/tedge_mapper/src/c8y/mapper.rs +++ b/crates/core/tedge_mapper/src/c8y/mapper.rs @@ -9,9 +9,10 @@ use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy}; use c8y_smartrest::operations::Operations; use mqtt_channel::TopicFilter; use tedge_config::{ - ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, - MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, + ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttBindAddressSetting, + MqttPortSetting, TEdgeConfig, }; +use tedge_utils::file::*; use tracing::{info, info_span, Instrument}; use super::topic::C8yTopic; @@ -38,39 +39,23 @@ impl CumulocityMapper { Ok(topic_filter) } +} - pub async fn init_session(&mut self) -> Result<(), anyhow::Error> { - info!("Initialize tedge mapper session"); - mqtt_channel::init_session(&self.get_mqtt_config()?).await?; - Ok(()) - } - - pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> { - info!("Clear tedge mapper session"); - mqtt_channel::clear_session(&self.get_mqtt_config()?).await?; - Ok(()) +#[async_trait] +impl TEdgeComponent for CumulocityMapper { + fn session_name(&self) -> &str { + CUMULOCITY_MAPPER_NAME } - fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> { + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper c8y"); + create_directories()?; let operations = Operations::try_new("/etc/tedge/operations", "c8y")?; - let mqtt_topic = Self::subscriptions(&operations)?; - let config_repository = - tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default()); - let tedge_config = config_repository.load()?; - - let mqtt_config = mqtt_channel::Config::default() - .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string()) - .with_port(tedge_config.query(MqttPortSetting)?.into()) - .with_session_name(CUMULOCITY_MAPPER_NAME) - .with_clean_session(false) - .with_subscriptions(mqtt_topic); - - Ok(mqtt_config) + self.init_session(CumulocityMapper::subscriptions(&operations)?) + .await?; + Ok(()) } -} -#[async_trait] -impl TEdgeComponent for CumulocityMapper { async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD); @@ -101,3 +86,25 @@ impl TEdgeComponent for CumulocityMapper { Ok(()) } } + +fn create_directories() -> Result<(), anyhow::Error> { + create_directory_with_user_group( + "/etc/tedge/operations/c8y", + "tedge-mapper", + "tedge-mapper", + 0o775, + )?; + create_file_with_user_group( + "/etc/tedge/operations/c8y/c8y_SoftwareUpdate", + "tedge-mapper", + "tedge-mapper", + 0o644, + )?; + create_file_with_user_group( + "/etc/tedge/operations/c8y/c8y_Restart", + "tedge-mapper", + "tedge-mapper", + 0o644, + )?; + Ok(()) +} diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs index abae07af..a4c6f4c5 100644 --- a/crates/core/tedge_mapper/src/collectd/mapper.rs +++ b/crates/core/tedge_mapper/src/collectd/mapper.rs @@ -3,10 +3,11 @@ use crate::{ core::component::TEdgeComponent, }; use async_trait::async_trait; +use mqtt_channel::TopicFilter; use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig}; -use tracing::{info_span, Instrument}; +use tracing::{info, info_span, Instrument}; -const APP_NAME: &str = "tedge-mapper-collectd"; +const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd"; pub struct CollectdMapper {} @@ -18,6 +19,19 @@ impl CollectdMapper { #[async_trait] impl TEdgeComponent for CollectdMapper { + fn session_name(&self) -> &str { + COLLECTD_MAPPER_NAME + } + + async fn init(&self) -> Result<(), anyhow::Error> { + info!("Initialize tedge mapper collectd"); + self.init_session(TopicFilter::new( + DeviceMonitorConfig::default().mqtt_source_topic, + )?) + .await?; + Ok(()) + } + async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> { let mqtt_port = tedge_config.query(MqttPortSetting)?.into(); let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string(); @@ -29,7 +43,7 @@ impl TEdgeComponent for CollectdMapper { let device_monitor = DeviceMonitor::new(device_monitor_config); device_monitor .run() - .instrument(info_span!(APP_NAME)) + .instrument(info_span!(COLLECTD_MAPPER_NAME)) .await?; Ok(()) diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs index c804deb5..93ea68cd 100644 --- a/crates/core/tedge_mapper/src/collectd/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd/monitor.rs @@ -18,7 +18,7 @@ pub struct DeviceMonitorConfig { host: String, port: u16, mqtt_client_id: &'static str, - mqtt_source_topic: &'static str, + pub mqtt_source_topic: &'static str, mqtt_target_topic: &'static str, batching_window: u32, maximum_message_delay: u32, diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs index 6e5fedc6..786c78c8 100644 --- a/crates/core/tedge_mapper/src/core/component.rs +++ b/crates/core/tedge_mapper/src/core/component.rs @@ -1,7 +1,38 @@ use async_trait::async_trait; -use tedge_config::TEdgeConfig; +use mqtt_channel::TopicFilter; +use tedge_config::{ + ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig, +}; +use tracing::info; #[async_trait] -pub trait TEdgeCompon |