summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPradeepKiruvale <pradeepkumar.kj@softwareag.com>2022-03-24 13:35:38 +0530
committerGitHub <noreply@github.com>2022-03-24 13:35:38 +0530
commit59142ef60484ddc7b103c841d34b308b9444f316 (patch)
treed7b8f89179d2244a1e116081b11a28e7c3a141fa
parentd9ca6aedfad3de7931964200f26e40fdd577b48a (diff)
[#761] tedge init (#993)
* tedge init #Closes 761 * add file/directory creation errors * add the clear session to az and collectd mappers * refactor tedgecomponent * remove redundant code * subscribe to az/collectd topics in init_session * remove duplicate init_sessions
-rw-r--r--Cargo.lock13
-rwxr-xr-xconfiguration/debian/tedge/postinst18
-rw-r--r--configuration/debian/tedge_agent/postinst8
-rw-r--r--configuration/debian/tedge_mapper/postinst13
-rw-r--r--crates/common/tedge_utils/Cargo.toml3
-rw-r--r--crates/common/tedge_utils/src/file.rs210
-rw-r--r--crates/common/tedge_utils/src/lib.rs1
-rw-r--r--crates/core/tedge/src/cli/mod.rs12
-rw-r--r--crates/core/tedge/src/main.rs38
-rw-r--r--crates/core/tedge_agent/src/agent.rs11
-rw-r--r--crates/core/tedge_agent/src/main.rs2
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs8
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs20
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs65
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs20
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs2
-rw-r--r--crates/core/tedge_mapper/src/core/component.rs35
-rw-r--r--crates/core/tedge_mapper/src/main.rs20
18 files changed, 401 insertions, 98 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 38b4d9c7..4da5a3bc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2929,11 +2929,14 @@ dependencies = [
"anyhow",
"assert_matches",
"futures",
+ "nix",
"tempfile",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
+ "users",
+ "whoami",
]
[[package]]
@@ -3598,6 +3601,16 @@ dependencies = [
]
[[package]]
+name = "whoami"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "524b58fa5a20a2fb3014dd6358b70e6579692a56ef6fce928834e488f42f65e8"
+dependencies = [
+ "wasm-bindgen",
+ "web-sys",
+]
+
+[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/configuration/debian/tedge/postinst b/configuration/debian/tedge/postinst
index 2f0dfee1..788fb369 100755
--- a/configuration/debian/tedge/postinst
+++ b/configuration/debian/tedge/postinst
@@ -17,21 +17,6 @@ if ! getent passwd tedge >/dev/null; then
adduser --quiet --system --gecos "" --no-create-home --disabled-login --shell /sbin/nologin --ingroup tedge tedge
fi
-### Create directories
-# thin-edge.io uses `/etc/tedge` directory as its main configuration directory.
-# This directory contains mqtt bus configuration directory under `mosquitto-conf`
-# and device certificates under `device-certs` so both have to be created in the directory structure.
-install -g tedge -o tedge -m 755 -d /etc/tedge
-install -g tedge -o tedge -m 755 -d /etc/tedge/mosquitto-conf
-install -g mosquitto -o mosquitto -m 755 -d /etc/tedge/device-certs
-
-# Create a directory for the operations added by the user.
-install -g tedge -o tedge -m 755 -d /etc/tedge/operations
-install -g tedge -o tedge -m 755 -d /etc/tedge/plugins
-
-# Create directory for logs
-install -g tedge -o tedge -m 755 -d /var/log/tedge
-
### Add include to mosquitto.conf so tedge specific conf will be loaded
if ! grep -q "/etc/tedge/mosquitto-conf" "/etc/mosquitto/mosquitto.conf"; then
echo "include_dir /etc/tedge/mosquitto-conf" >>/etc/mosquitto/mosquitto.conf
@@ -44,5 +29,8 @@ if [ -f "/etc/sudoers.d/010_pi-nopasswd" ]; then
echo "%tedge-users ALL = (ALL) NOPASSWD: /usr/bin/tedge" >/etc/sudoers.d/tedge-users-nopasswd
fi
+# Initialize the tedge
+tedge --init
+
### Below are automatically created script parts by cargo deb (Search for cargo deb DEBHELPER)
#DEBHELPER#
diff --git a/configuration/debian/tedge_agent/postinst b/configuration/debian/tedge_agent/postinst
index 774c3b09..7430e3dc 100644
--- a/configuration/debian/tedge_agent/postinst
+++ b/configuration/debian/tedge_agent/postinst
@@ -11,6 +11,7 @@ fi
# Create user tedge-agent with no home(--no-create-home), no login(--shell) and in group tedge-agent(--ingroup)
if ! getent passwd tedge-agent >/dev/null; then
adduser --quiet --system --no-create-home --ingroup tedge-agent --shell /usr/sbin/nologin tedge-agent
+ adduser tedge-agent tedge
fi
### Create file in /etc/sudoers.d directory
@@ -21,13 +22,6 @@ if [ -f "/etc/sudoers.d/010_pi-nopasswd" ]; then
echo "%tedge-agent ALL = (ALL) NOPASSWD: /etc/tedge/sm-plugins/[a-zA-Z0-9]*, /bin/sync, /sbin/init" >/etc/sudoers.d/tedge-agent-nopasswd
fi
-### Create directories
-# Create tedge-agent state directory /etc/tedge/.agent
-install -g tedge-agent -o tedge-agent -m 754 -d /etc/tedge/.agent
-
-# Create /var/log/tedge/agent directory
-install -g tedge-agent -o tedge-agent -m 755 -d /var/log/tedge/agent
-
# Reenable the services only if systemctl is available
if command -v systemctl >/dev/null; then
### Enable the sm services if the device is connected to c8y cloud
diff --git a/configuration/debian/tedge_mapper/postinst b/configuration/debian/tedge_mapper/postinst
index dbfcd140..a8625f3f 100644
--- a/configuration/debian/tedge_mapper/postinst
+++ b/configuration/debian/tedge_mapper/postinst
@@ -11,19 +11,10 @@ fi
# Create user tedge-mapper with no home(--no-create-home), no login(--shell) and in group tedge(--ingroup)
if ! getent passwd tedge-mapper >/dev/null; then
adduser --quiet --system --no-create-home --ingroup tedge-mapper --shell /usr/sbin/nologin tedge-mapper
+ adduser tedge-mapper tedge
fi
-### Create supported cloud operations directories
-install -g tedge -o tedge -m 755 -d /etc/tedge/operations/c8y
-install -g tedge -o tedge -m 755 -d /etc/tedge/operations/az
-
-### Create operation file.
-# This allows thin-edge.io components to list and declare supported operations for the cloud provider.
-# Some of the examples for Cumulocity IoT supported opertations: https://cumulocity.com/api/10.11.0/#section/Device-management-library/Miscellaneous
-install -g tedge -o tedge -m 644 /dev/null /etc/tedge/operations/c8y/c8y_SoftwareUpdate
-install -g tedge -o tedge -m 644 /dev/null /etc/tedge/operations/c8y/c8y_Restart
-
### Initialize the sm mapper
runuser -u tedge-mapper -- tedge_mapper --init c8y
-
+runuser -u tedge-mapper -- tedge_mapper --init az
#DEBHELPER#
diff --git a/crates/common/tedge_utils/Cargo.toml b/crates/common/tedge_utils/Cargo.toml
index 2f06071f..776642df 100644
--- a/crates/common/tedge_utils/Cargo.toml
+++ b/crates/common/tedge_utils/Cargo.toml
@@ -15,13 +15,16 @@ logging = ["tracing", "tracing-subscriber"]
[dependencies]
anyhow = "1.0"
futures = "0.3"
+nix = "0.23.1"
tempfile = "3.2"
thiserror = "1.0"
tokio = { version = "1.12", default_features = false, features = [ "fs", "io-util", "macros", "signal"] }
tracing = { version = "0.1", features = [], optional = true }
tracing-subscriber = { version = "0.3", optional = true, features = [ "time" ] }
+users = "0.11.0"
[dev-dependencies]
assert_matches = "1.5"
+whoami = "1.2.1"
diff --git a/crates/common/tedge_utils/src/file.rs b/crates/common/tedge_utils/src/file.rs
new file mode 100644
index 00000000..cc819e05
--- /dev/null
+++ b/crates/common/tedge_utils/src/file.rs
@@ -0,0 +1,210 @@
+use nix::unistd::*;
+use std::fs::File;
+use std::os::linux::fs::MetadataExt;
+use std::os::unix::fs::PermissionsExt;
+use std::{fs, io};
+use users::{get_group_by_name, get_user_by_name};
+
+#[derive(thiserror::Error, Debug)]
+pub enum FileError {
+ #[error("Creating the directory failed: {dir:?}.")]
+ DirectoryCreateFailed { dir: String, from: std::io::Error },
+
+ #[error("Creating the file failed: {file:?}.")]
+ FileCreateFailed { file: String, from: std::io::Error },
+
+ #[error("Failed to change owner: {name:?}.")]
+ MetaDataError { name: String, from: std::io::Error },
+
+ #[error("User not found: {user:?}.")]
+ UserNotFound { user: String },
+
+ #[error("Group not found: {group:?}.")]
+ GroupNotFound { group: String },
+
+ #[error(transparent)]
+ Errno(#[from] nix::errno::Errno),
+}
+
+pub fn create_directory_with_user_group(
+ dir: &str,
+ user: &str,
+ group: &str,
+ mode: u32,
+) -> Result<(), FileError> {
+ match fs::create_dir(dir) {
+ Ok(_) => {
+ change_owner_and_permission(dir, user, group, mode)?;
+ }
+
+ Err(e) => {
+ if e.kind() == io::ErrorKind::AlreadyExists {
+ return Ok(());
+ } else {
+ return Err(FileError::DirectoryCreateFailed {
+ dir: dir.to_string(),
+ from: e,
+ });
+ }
+ }
+ }
+ Ok(())
+}
+
+pub fn create_file_with_user_group(
+ file: &str,
+ user: &str,
+ group: &str,
+ mode: u32,
+) -> Result<(), FileError> {
+ match File::create(file) {
+ Ok(_) => {
+ change_owner_and_permission(file, user, group, mode)?;
+ }
+ Err(e) => {
+ if e.kind() == io::ErrorKind::AlreadyExists {
+ return Ok(());
+ } else {
+ return Err(FileError::FileCreateFailed {
+ file: file.to_string(),
+ from: e,
+ });
+ }
+ }
+ }
+ Ok(())
+}
+
+fn change_owner_and_permission(
+ file: &str,
+ user: &str,
+ group: &str,
+ mode: u32,
+) -> Result<(), FileError> {
+ let ud = match get_user_by_name(user) {
+ Some(user) => user.uid(),
+ None => {
+ return Err(FileError::UserNotFound { user: user.into() });
+ }
+ };
+
+ let gd = match get_group_by_name(group) {
+ Some(group) => group.gid(),
+ None => {
+ return Err(FileError::GroupNotFound {
+ group: group.into(),
+ });
+ }
+ };
+
+ let uid = fs::metadata(file)
+ .map_err(|e| FileError::MetaDataError {
+ name: file.to_string(),
+ from: e,
+ })?
+ .st_uid();
+ let gid = fs::metadata(file)
+ .map_err(|e| FileError::MetaDataError {
+ name: file.to_string(),
+ from: e,
+ })?
+ .st_gid();
+
+ // if user and group is same as existing, then do not change
+ if (ud != uid) && (gd != gid) {
+ chown(
+ file,
+ Some(Uid::from_raw(ud.into())),
+ Some(Gid::from_raw(gd.into())),
+ )?;
+ }
+
+ let mut perm = fs::metadata(file)
+ .map_err(|e| FileError::MetaDataError {
+ name: file.to_string(),
+ from: e,
+ })?
+ .permissions();
+ perm.set_mode(mode);
+
+ fs::set_permissions(file, perm).map_err(|e| FileError::MetaDataError {
+ name: file.to_string(),
+ from: e,
+ })?;
+
+ Ok(())
+}
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::os::unix::fs::PermissionsExt;
+ use std::path::Path;
+
+ #[test]
+ fn create_file_correct_user_group() {
+ let user = whoami::username();
+ let _ = create_file_with_user_group("/tmp/fcreate_test", &user, &user, 0o644).unwrap();
+ assert!(Path::new("/tmp/fcreate_test").exists());
+ let meta = std::fs::metadata("/tmp/fcreate_test").unwrap();
+ let perm = meta.permissions();
+ println!("{:o}", perm.mode());
+ assert!(format!("{:o}", perm.mode()).contains("644"));
+ fs::remove_file("/tmp/fcreate_test").unwrap();
+ }
+
+ #[test]
+ fn create_file_wrong_user() {
+ let user = whoami::username();
+ let err = create_file_with_user_group("/tmp/fcreate_wrong_user", "test", &user, 0o775)
+ .unwrap_err();
+
+ assert!(err.to_string().contains("User not found"));
+ fs::remove_file("/tmp/fcreate_wrong_user").unwrap();
+ }
+
+ #[test]
+ fn create_file_wrong_group() {
+ let user = whoami::username();
+ let err = create_file_with_user_group("/tmp/fcreate_wrong_group", &user, "test", 0o775)
+ .unwrap_err();
+
+ assert!(err.to_string().contains("Group not found"));
+ fs::remove_file("/tmp/fcreate_wrong_group").unwrap();
+ }
+
+ #[test]
+ fn create_directory_with_correct_user_group() {
+ let user = whoami::username();
+ let _ =
+ create_directory_with_user_group("/tmp/fcreate_test_dir", &user, &user, 0o775).unwrap();
+
+ assert!(Path::new("/tmp/fcreate_test_dir").exists());
+ let meta = std::fs::metadata("/tmp/fcreate_test_dir").unwrap();
+ let perm = meta.permissions();
+ println!("{:o}", perm.mode());
+ assert!(format!("{:o}", perm.mode()).contains("775"));
+ fs::remove_dir("/tmp/fcreate_test_dir").unwrap();
+ }
+
+ #[test]
+ fn create_directory_with_wrong_user() {
+ let user = whoami::username();
+
+ let err = create_directory_with_user_group("/tmp/wrong_user_dir", "test", &user, 0o775)
+ .unwrap_err();
+
+ assert!(err.to_string().contains("User not found"));
+ fs::remove_dir("/tmp/wrong_user_dir").unwrap();
+ }
+
+ #[test]
+ fn create_directory_with_wrong_group() {
+ let user = whoami::username();
+
+ let err = create_directory_with_user_group("/tmp/wrong_group_dir", &user, "test", 0o775)
+ .unwrap_err();
+
+ assert!(err.to_string().contains("Group not found"));
+ fs::remove_dir("/tmp/wrong_group_dir").unwrap();
+ }
+}
diff --git a/crates/common/tedge_utils/src/lib.rs b/crates/common/tedge_utils/src/lib.rs
index ea37e166..254835cb 100644
--- a/crates/common/tedge_utils/src/lib.rs
+++ b/crates/common/tedge_utils/src/lib.rs
@@ -1,3 +1,4 @@
+pub mod file;
pub mod fs;
pub mod paths;
pub mod signals;
diff --git a/crates/core/tedge/src/cli/mod.rs b/crates/core/tedge/src/cli/mod.rs
index 37870b30..6c0e3811 100644
--- a/crates/core/tedge/src/cli/mod.rs
+++ b/crates/core/tedge/src/cli/mod.rs
@@ -13,14 +13,20 @@ mod mqtt;
#[clap(
name = clap::crate_name!(),
version = clap::crate_version!(),
- about = clap::crate_description!()
+ about = clap::crate_description!(),
+ arg_required_else_help(true)
)]
+
pub struct Opt {
- #[clap(subcommand)]
- pub tedge: TEdgeOpt,
+ /// Initialize the tedge
+ #[clap(long)]
+ pub init: bool,
#[clap(long = "config-dir", default_value = DEFAULT_TEDGE_CONFIG_PATH)]
pub config_dir: PathBuf,
+
+ #[clap(subcommand)]
+ pub tedge: Option<TEdgeOpt>,
}
#[derive(clap::Subcommand, Debug)]
diff --git a/crates/core/tedge/src/main.rs b/crates/core/tedge/src/main.rs
index 672947fb..5b384e35 100644
--- a/crates/core/tedge/src/main.rs
+++ b/crates/core/tedge/src/main.rs
@@ -4,24 +4,27 @@
use anyhow::Context;
use clap::Parser;
use tedge_users::UserManager;
-
mod cli;
mod command;
mod error;
mod system_services;
+use tedge_utils::file::create_directory_with_user_group;
type ConfigError = crate::error::TEdgeError;
use command::{BuildCommand, BuildContext};
fn main() -> anyhow::Result<()> {
- let user_manager = UserManager::new();
-
- let _user_guard = user_manager.become_user(tedge_users::TEDGE_USER)?;
-
let opt = cli::Opt::parse();
+ if opt.init {
+ initialize_tedge()?;
+ return Ok(());
+ }
+
+ let user_manager = UserManager::new();
let tedge_config_location = tedge_config::TEdgeConfigLocation::from_custom_root(opt.config_dir);
+ let _user_guard = user_manager.become_user(tedge_users::TEDGE_USER)?;
let config_repository = tedge_config::TEdgeConfigRepository::new(tedge_config_location.clone());
let build_context = BuildContext {
@@ -30,11 +33,24 @@ fn main() -> anyhow::Result<()> {
user_manager,
};
- let cmd = opt
- .tedge
- .build_command(build_context)
- .with_context(|| "missing configuration parameter")?;
+ if let Some(tedge_opt) = opt.tedge {
+ let cmd = tedge_opt
+ .build_command(build_context)
+ .with_context(|| "missing configuration parameter")?;
+
+ cmd.execute()
+ .with_context(|| format!("failed to {}", cmd.description()))
+ } else {
+ Ok(())
+ }
+}
- cmd.execute()
- .with_context(|| format!("failed to {}", cmd.description()))
+fn initialize_tedge() -> anyhow::Result<()> {
+ create_directory_with_user_group("/etc/tedge", "tedge", "tedge", 0o775)?;
+ create_directory_with_user_group("/var/log/tedge", "tedge", "tedge", 0o775)?;
+ create_directory_with_user_group("/etc/tedge/mosquitto-conf", "tedge", "tedge", 0o775)?;
+ create_directory_with_user_group("/etc/tedge/operations", "tedge", "tedge", 0o775)?;
+ create_directory_with_user_group("/etc/tedge/plugins", "tedge", "tedge", 0o775)?;
+ create_directory_with_user_group("/etc/tedge/device-certs", "mosquitto", "mosquitto", 0o775)?;
+ Ok(())
}
diff --git a/crates/core/tedge_agent/src/agent.rs b/crates/core/tedge_agent/src/agent.rs
index 4a7e3667..00c1c71a 100644
--- a/crates/core/tedge_agent/src/agent.rs
+++ b/crates/core/tedge_agent/src/agent.rs
@@ -21,6 +21,7 @@ use tedge_config::{
MqttBindAddressSetting, MqttPortSetting, RunPathDefaultSetting, SoftwarePluginDefaultSetting,
TEdgeConfigLocation, TmpPathDefaultSetting, DEFAULT_LOG_PATH, DEFAULT_RUN_PATH,
};
+use tedge_utils::file::create_directory_with_user_group;
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, warn};
@@ -209,9 +210,17 @@ impl SmAgent {
}
#[instrument(skip(self), name = "sm-agent")]
- pub async fn init_session(&mut self) -> Result<(), AgentError> {
+ pub async fn init(&mut self) -> Result<(), anyhow::Error> {
+ create_directory_with_user_group("/etc/tedge/.agent", "tedge-agent", "tedge-agent", 0o775)?;
+ create_directory_with_user_group(
+ "/var/log/tedge/agent",
+ "tedge-agent",
+ "tedge-agent",
+ 0o775,
+ )?;
info!("Initializing the tedge agent session");
mqtt_channel::init_session(&self.config.mqtt_config).await?;
+
Ok(())
}
diff --git a/crates/core/tedge_agent/src/main.rs b/crates/core/tedge_agent/src/main.rs
index 4847553f..4e71de4b 100644
--- a/crates/core/tedge_agent/src/main.rs
+++ b/crates/core/tedge_agent/src/main.rs
@@ -53,7 +53,7 @@ async fn main() -> Result<(), anyhow::Error> {
SmAgentConfig::try_new(tedge_config_location)?,
)?;
if agent_opt.init {
- agent.init_session().await?;
+ agent.init().await?;
} else if agent_opt.clear {
agent.clear_session().await?;
} else {
diff --git a/crates/core/tedge_mapper/src/az/converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index 7989cfbd..f7f4b7a9 100644
--- a/crates/core/tedge_mapper/src/az/converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -2,7 +2,7 @@ use crate::core::{converter::*, error::*, size_threshold::SizeThreshold};
use async_trait::async_trait;
use clock::Clock;
-use mqtt_channel::Message;
+use mqtt_channel::{Message, TopicFilter};
use thin_edge_json::serialize::ThinEdgeJsonSerializer;
pub struct AzureConverter {
@@ -15,7 +15,7 @@ pub struct AzureConverter {
impl AzureConverter {
pub fn new(add_timestamp: bool, clock: Box<dyn Clock>, size_threshold: SizeThreshold) -> Self {
let mapper_config = MapperConfig {
- in_topic_filter: make_valid_topic_filter_or_panic("tedge/measurements"),
+ in_topic_filter: Self::in_topic_filter(),
out_topic: make_valid_topic_or_panic("az/messages/events/"),
errors_topic: make_valid_topic_or_panic("tedge/errors"),
};
@@ -26,6 +26,10 @@ impl AzureConverter {
mapper_config,
}
}
+
+ pub fn in_topic_filter() -> TopicFilter {
+ make_valid_topic_filter_or_panic("tedge/measurements")
+ }
}
#[async_trait]
diff --git a/crates/core/tedge_mapper/src/az/mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index 542e6c1c..0d998eff 100644
--- a/crates/core/tedge_mapper/src/az/mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -7,7 +7,8 @@ use async_trait::async_trait;
use clock::WallClock;
use tedge_config::{AzureMapperTimestamp, MqttBindAddressSetting, TEdgeConfig};
use tedge_config::{ConfigSettingAccessor, MqttPortSetting};
-use tracing::{info_span, Instrument};
+use tedge_utils::file::create_directory_with_user_group;
+use tracing::{info, info_span, Instrument};
const AZURE_MAPPER_NAME: &str = "tedge-mapper-az";
@@ -21,6 +22,23 @@ impl AzureMapper {
#[async_trait]
impl TEdgeComponent for AzureMapper {
+ fn session_name(&self) -> &str {
+ AZURE_MAPPER_NAME
+ }
+
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper az");
+ create_directory_with_user_group(
+ "/etc/tedge/operations/az",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o775,
+ )?;
+
+ self.init_session(AzureConverter::in_topic_filter()).await?;
+ Ok(())
+ }
+
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let add_timestamp = tedge_config.query(AzureMapperTimestamp)?.is_set();
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
diff --git a/crates/core/tedge_mapper/src/c8y/mapper.rs b/crates/core/tedge_mapper/src/c8y/mapper.rs
index 07abc637..cb9a4f93 100644
--- a/crates/core/tedge_mapper/src/c8y/mapper.rs
+++ b/crates/core/tedge_mapper/src/c8y/mapper.rs
@@ -9,9 +9,10 @@ use c8y_api::http_proxy::{C8YHttpProxy, JwtAuthHttpProxy};
use c8y_smartrest::operations::Operations;
use mqtt_channel::TopicFilter;
use tedge_config::{
- ConfigRepository, ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting,
- MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
+ ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttBindAddressSetting,
+ MqttPortSetting, TEdgeConfig,
};
+use tedge_utils::file::*;
use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
@@ -38,39 +39,23 @@ impl CumulocityMapper {
Ok(topic_filter)
}
+}
- pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Initialize tedge mapper session");
- mqtt_channel::init_session(&self.get_mqtt_config()?).await?;
- Ok(())
- }
-
- pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
- info!("Clear tedge mapper session");
- mqtt_channel::clear_session(&self.get_mqtt_config()?).await?;
- Ok(())
+#[async_trait]
+impl TEdgeComponent for CumulocityMapper {
+ fn session_name(&self) -> &str {
+ CUMULOCITY_MAPPER_NAME
}
- fn get_mqtt_config(&mut self) -> Result<mqtt_channel::Config, anyhow::Error> {
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper c8y");
+ create_directories()?;
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let mqtt_topic = Self::subscriptions(&operations)?;
- let config_repository =
- tedge_config::TEdgeConfigRepository::new(tedge_config::TEdgeConfigLocation::default());
- let tedge_config = config_repository.load()?;
-
- let mqtt_config = mqtt_channel::Config::default()
- .with_host(tedge_config.query(MqttBindAddressSetting)?.to_string())
- .with_port(tedge_config.query(MqttPortSetting)?.into())
- .with_session_name(CUMULOCITY_MAPPER_NAME)
- .with_clean_session(false)
- .with_subscriptions(mqtt_topic);
-
- Ok(mqtt_config)
+ self.init_session(CumulocityMapper::subscriptions(&operations)?)
+ .await?;
+ Ok(())
}
-}
-#[async_trait]
-impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(MQTT_MESSAGE_SIZE_THRESHOLD);
@@ -101,3 +86,25 @@ impl TEdgeComponent for CumulocityMapper {
Ok(())
}
}
+
+fn create_directories() -> Result<(), anyhow::Error> {
+ create_directory_with_user_group(
+ "/etc/tedge/operations/c8y",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o775,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_SoftwareUpdate",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ create_file_with_user_group(
+ "/etc/tedge/operations/c8y/c8y_Restart",
+ "tedge-mapper",
+ "tedge-mapper",
+ 0o644,
+ )?;
+ Ok(())
+}
diff --git a/crates/core/tedge_mapper/src/collectd/mapper.rs b/crates/core/tedge_mapper/src/collectd/mapper.rs
index abae07af..a4c6f4c5 100644
--- a/crates/core/tedge_mapper/src/collectd/mapper.rs
+++ b/crates/core/tedge_mapper/src/collectd/mapper.rs
@@ -3,10 +3,11 @@ use crate::{
core::component::TEdgeComponent,
};
use async_trait::async_trait;
+use mqtt_channel::TopicFilter;
use tedge_config::{ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig};
-use tracing::{info_span, Instrument};
+use tracing::{info, info_span, Instrument};
-const APP_NAME: &str = "tedge-mapper-collectd";
+const COLLECTD_MAPPER_NAME: &str = "tedge-mapper-collectd";
pub struct CollectdMapper {}
@@ -18,6 +19,19 @@ impl CollectdMapper {
#[async_trait]
impl TEdgeComponent for CollectdMapper {
+ fn session_name(&self) -> &str {
+ COLLECTD_MAPPER_NAME
+ }
+
+ async fn init(&self) -> Result<(), anyhow::Error> {
+ info!("Initialize tedge mapper collectd");
+ self.init_session(TopicFilter::new(
+ DeviceMonitorConfig::default().mqtt_source_topic,
+ )?)
+ .await?;
+ Ok(())
+ }
+
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let mqtt_host = tedge_config.query(MqttBindAddressSetting)?.to_string();
@@ -29,7 +43,7 @@ impl TEdgeComponent for CollectdMapper {
let device_monitor = DeviceMonitor::new(device_monitor_config);
device_monitor
.run()
- .instrument(info_span!(APP_NAME))
+ .instrument(info_span!(COLLECTD_MAPPER_NAME))
.await?;
Ok(())
diff --git a/crates/core/tedge_mapper/src/collectd/monitor.rs b/crates/core/tedge_mapper/src/collectd/monitor.rs
index c804deb5..93ea68cd 100644
--- a/crates/core/tedge_mapper/src/collectd/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd/monitor.rs
@@ -18,7 +18,7 @@ pub struct DeviceMonitorConfig {
host: String,
port: u16,
mqtt_client_id: &'static str,
- mqtt_source_topic: &'static str,
+ pub mqtt_source_topic: &'static str,
mqtt_target_topic: &'static str,
batching_window: u32,
maximum_message_delay: u32,
diff --git a/crates/core/tedge_mapper/src/core/component.rs b/crates/core/tedge_mapper/src/core/component.rs
index 6e5fedc6..786c78c8 100644
--- a/crates/core/tedge_mapper/src/core/component.rs
+++ b/crates/core/tedge_mapper/src/core/component.rs
@@ -1,7 +1,38 @@
use async_trait::async_trait;
-use tedge_config::TEdgeConfig;
+use mqtt_channel::TopicFilter;
+use tedge_config::{
+ ConfigRepository, ConfigSettingAccessor, MqttBindAddressSetting, MqttPortSetting, TEdgeConfig,
+};
+use tracing::info;
#[async_trait]
-pub trait TEdgeCompon