summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2021-07-06 14:25:23 +0100
committerGitHub <noreply@github.com>2021-07-06 14:25:23 +0100
commitb35efa7692fc9d04a7ec43390e04adf5ed71100d (patch)
treee7c7216d516cfa11366ea15c980a0f5b784a0ff6
parent8bc81842f61a0f3605e5a9a1b28c6a36300b2040 (diff)
[CIT-380] Integrate collectd_mapper with tedge_mapper (#320)
* Move collectd_mapper to tedge_mapper * Add tedge-mapper-collectd service * Rename cli arg from dm to collectd Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
-rw-r--r--Cargo.lock45
-rw-r--r--Cargo.toml1
-rw-r--r--configuration/contrib/collectd/collectd.conf2
-rw-r--r--configuration/debian/collectd_mapper/postinst15
-rw-r--r--configuration/init/systemd/collectd-mapper.service12
-rw-r--r--configuration/init/systemd/tedge-mapper-collectd.service12
-rw-r--r--docs/src/tutorials/device-monitoring.md22
-rw-r--r--mapper/collectd_mapper/Cargo.toml43
-rw-r--r--mapper/collectd_mapper/src/error.rs28
-rw-r--r--mapper/collectd_mapper/src/main.rs69
-rw-r--r--mapper/tedge_mapper/Cargo.toml20
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/batcher.rs (renamed from mapper/collectd_mapper/src/batcher.rs)28
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/collectd.rs (renamed from mapper/collectd_mapper/src/collectd.rs)4
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/error.rs21
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/mapper.rs34
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/mod.rs5
-rw-r--r--mapper/tedge_mapper/src/collectd_mapper/monitor.rs (renamed from mapper/collectd_mapper/src/monitor.rs)2
-rw-r--r--mapper/tedge_mapper/src/error.rs5
-rw-r--r--mapper/tedge_mapper/src/main.rs56
-rw-r--r--tests/PySys/monitoring/monitoring_smaller_interval/run.py12
-rw-r--r--tests/PySys/monitoring/monitoring_with_collectd/run.py10
-rw-r--r--tests/PySys/monitoring/monitoring_with_simulated_messages/run.py8
22 files changed, 152 insertions, 302 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 3db60ea6..cb58ff9f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -367,28 +367,6 @@ dependencies = [
]
[[package]]
-name = "collectd_mapper"
-version = "0.2.1"
-dependencies = [
- "anyhow",
- "assert_matches",
- "async-trait",
- "chrono",
- "clock",
- "futures",
- "mockall",
- "mqtt_client",
- "tedge_config",
- "tedge_users",
- "thin_edge_json",
- "thiserror",
- "tokio",
- "tokio-test",
- "tracing",
- "tracing-subscriber",
-]
-
-[[package]]
name = "colored"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2012,24 +1990,6 @@ dependencies = [
]
[[package]]
-name = "strum"
-version = "0.21.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2"
-
-[[package]]
-name = "strum_macros"
-version = "0.21.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec"
-dependencies = [
- "heck",
- "proc-macro2 1.0.26",
- "quote 1.0.9",
- "syn 1.0.68",
-]
-
-[[package]]
name = "syn"
version = "0.15.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2114,11 +2074,12 @@ dependencies = [
"clock",
"env_logger",
"flockfile",
+ "futures",
"log",
+ "mockall",
"mqtt_client",
"serde_json",
- "strum",
- "strum_macros",
+ "structopt",
"tedge_config",
"tedge_users",
"thin_edge_json",
diff --git a/Cargo.toml b/Cargo.toml
index cf0bc978..9d28db1f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,7 +10,6 @@ members = [
"tedge",
"tedge_config",
"mapper/cumulocity/c8y_translator_lib",
- "mapper/collectd_mapper",
"mapper/tedge_mapper",
"mapper/thin_edge_json",
]
diff --git a/configuration/contrib/collectd/collectd.conf b/configuration/contrib/collectd/collectd.conf
index 76f65b1d..a623d39e 100644
--- a/configuration/contrib/collectd/collectd.conf
+++ b/configuration/contrib/collectd/collectd.conf
@@ -809,7 +809,7 @@ LoadPlugin mqtt
<Publish "name">
Host "localhost"
Port 1883
- ClientId "tedge-dm"
+ ClientId "tedge-collectd"
# User "user"
# Password "secret"
# QoS 0
diff --git a/configuration/debian/collectd_mapper/postinst b/configuration/debian/collectd_mapper/postinst
deleted file mode 100644
index 1d37401a..00000000
--- a/configuration/debian/collectd_mapper/postinst
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/sh
-
-set -e
-
-# Create a group "collectd-mapper" if not created before
-if ! getent group collectd-mapper > /dev/null; then
- addgroup --quiet --system collectd-mapper
-fi
-
-# Create a user "collectd-mapper" if not created before
-if ! getent passwd collectd-mapper > /dev/null; then
- adduser --quiet --system --no-create-home --ingroup collectd-mapper --shell /usr/sbin/nologin collectd-mapper
-fi
-
-#DEBHELPER#
diff --git a/configuration/init/systemd/collectd-mapper.service b/configuration/init/systemd/collectd-mapper.service
deleted file mode 100644
index 50341d8a..00000000
--- a/configuration/init/systemd/collectd-mapper.service
+++ /dev/null
@@ -1,12 +0,0 @@
-[Unit]
-Description=collectd-mapper collects device monitoring measurements and converts them to connectivity provider format.
-After=syslog.target network.target mosquitto.service
-
-[Service]
-User=collectd-mapper
-ExecStart=/usr/bin/collectd_mapper
-Restart=on-failure
-RestartPreventExitStatus=255
-
-[Install]
-WantedBy=multi-user.target
diff --git a/configuration/init/systemd/tedge-mapper-collectd.service b/configuration/init/systemd/tedge-mapper-collectd.service
new file mode 100644
index 00000000..0499c3c9
--- /dev/null
+++ b/configuration/init/systemd/tedge-mapper-collectd.service
@@ -0,0 +1,12 @@
+[Unit]
+Description=tedge-mapper-collectd converts Thin Edge JSON measurements to Cumulocity JSON format.
+After=syslog.target network.target mosquitto.service
+
+[Service]
+User=tedge-mapper
+ExecStart=/usr/bin/tedge_mapper collectd
+Restart=on-failure
+RestartPreventExitStatus=255
+
+[Install]
+WantedBy=multi-user.target
diff --git a/docs/src/tutorials/device-monitoring.md b/docs/src/tutorials/device-monitoring.md
index 72f518b9..cde27d1e 100644
--- a/docs/src/tutorials/device-monitoring.md
+++ b/docs/src/tutorials/device-monitoring.md
@@ -1,7 +1,7 @@
# Monitor your device with collectd
With thin-edge.io device monitoring, you can collect metrics from your device
-and forward these device metrics to IoT platforms in the cloud.
+and forward these device metrics to IoT platforms in the cloud.
Using these metrics, you can monitor the health of devices
and can proactively initiate actions in case the device seems to malfunction.
@@ -61,12 +61,12 @@ __Important notes__ You can enable or disable the collectd plugins of your choic
* Here is a config snippet to configure the MQTT write plugin:
```
LoadPlugin mqtt
-
+
<Plugin mqtt>
<Publish "tedge">
Host "localhost"
Port 1883
- ClientId "tedge-dm"
+ ClientId "tedge-collectd"
</Publish>
</Plugin>
```
@@ -86,9 +86,9 @@ __Important notes__ You can enable or disable the collectd plugins of your choic
filtering out every metric emitted by the memory plugin other than the used metric":
```
PreCacheChain "PreCache"
-
+
LoadPlugin match_regex
-
+
<Chain "PreCache">
<Rule "memory_free_only">
<Match "regex">
@@ -101,15 +101,15 @@ __Important notes__ You can enable or disable the collectd plugins of your choic
Target "stop"
</Rule>
</Chain>
- ```
-
+ ```
+
## Enable thin-edge monitoring
-To enable monitoring on your device, you have to launch the `collectd-mapper` daemon process.
+To enable monitoring on your device, you have to launch the `tedge-mapper-collectd` daemon process.
``` shell
-sudo systemctl enable collectd-mapper
-sudo systemctl start collectd-mapper
+sudo systemctl enable tedge-mapper-collectd
+sudo systemctl start tedge-mapper-collectd
```
This process subscribes to the `collectd/#` topics to read the monitoring metrics published by collectd
@@ -129,7 +129,7 @@ $ tedge mqtt sub 'collectd/#'
```
-The `collectd-mapper` translates these collectd measurements into the [thin-edge.io JSON](../architecture/thin-edge-json.md) format,
+The `tedge-mapper-collectd` translates these collectd measurements into the [thin-edge.io JSON](../architecture/thin-edge-json.md) format,
[grouping the measurements](../references/bridged-topics.md#collectd-topics) emitted by each plugin:
```
diff --git a/mapper/collectd_mapper/Cargo.toml b/mapper/collectd_mapper/Cargo.toml
deleted file mode 100644
index a252bc8c..00000000
--- a/mapper/collectd_mapper/Cargo.toml
+++ /dev/null
@@ -1,43 +0,0 @@
-[package]
-name = "collectd_mapper"
-version = "0.2.1"
-edition = "2018"
-authors = ["thin-edge.io team <info@thin-edge.io>"]
-license = "Apache-2.0"
-readme = "README.md"
-description = "The daemon providing monitoring and remote management capabilities to tedge"
-
-[package.metadata.deb]
-depends = "tedge"
-maintainer-scripts = "configuration/debian/collectd_mapper"
-assets = [
- ["../../configuration/contrib/collectd/collectd.conf", "/etc/tedge/contrib/collectd/", "644"],
- ["target/release/collectd_mapper", "/usr/bin/collectd_mapper", "755"],
-]
-
-[package.metadata.deb.systemd-units]
-unit-scripts = "../../configuration/init/systemd"
-unit-name = "collectd-mapper"
-enable = false
-start = false
-stop-on-upgrade = false
-
-[dependencies]
-thin_edge_json = {path = "../thin_edge_json" }
-clock = {path = "../../common/clock" }
-mqtt_client = {path = "../../common/mqtt_client" }
-chrono = "0.4"
-futures = "0.3"
-tokio = { version = "1.6", features = ["rt", "sync", "time"] }
-anyhow = "1.0"
-thiserror = "1.0"
-tracing = { version = "0.1", features = ["attributes", "log"] }
-tracing-subscriber = "0.2"
-mockall = "0.9"
-async-trait = "0.1"
-tedge_config = {path = "../../tedge_config" }
-tedge_users = { path = "../../common/tedge_users" }
-
-[dev-dependencies]
-assert_matches = "1.4"
-tokio-test = "0.4"
diff --git a/mapper/collectd_mapper/src/error.rs b/mapper/collectd_mapper/src/error.rs
deleted file mode 100644
index 99cd5fe5..00000000
--- a/mapper/collectd_mapper/src/error.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-use crate::collectd::CollectdError;
-use mqtt_client::MqttClientError;
-use thin_edge_json::{
- group::{MeasurementGrouper, MeasurementGrouperError},
- serialize::ThinEdgeJsonSerializationError,
-};
-use tokio::sync::mpsc::error::SendError;
-
-#[derive(thiserror::Error, Debug)]
-pub enum DeviceMonitorError {
- #[error(transparent)]
- MqttClientError(#[from] MqttClientError),
-
- #[error(transparent)]
- InvalidCollectdMeasurementError(#[from] CollectdError),
-
- #[error(transparent)]
- InvalidThinEdgeJsonError(#[from] MeasurementGrouperError),
-
- #[error(transparent)]
- ThinEdgeJsonSerializationError(#[from] ThinEdgeJsonSerializationError),
-
- #[error(transparent)]
- BatchingError(#[from] SendError<MeasurementGrouper>),
-
- #[error("Home directory is not found.")]
- HomeDirNotFound,
-}
diff --git a/mapper/collectd_mapper/src/main.rs b/mapper/collectd_mapper/src/main.rs
deleted file mode 100644
index 542118eb..00000000
--- a/mapper/collectd_mapper/src/main.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-mod batcher;
-mod collectd;
-mod error;
-mod monitor;
-
-use tracing::{debug_span, info, Instrument};
-
-use crate::error::*;
-use crate::monitor::{DeviceMonitor, DeviceMonitorConfig};
-use std::path::PathBuf;
-use tedge_config::*;
-
-const APP_NAME: &str = "collectd-mapper";
-const DEFAULT_LOG_LEVEL: &str = "warn";
-const TIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f%:z";
-
-#[tokio::main]
-async fn main() -> anyhow::Result<()> {
- let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| DEFAULT_LOG_LEVEL.into());
- tracing_subscriber::fmt()
- .with_timer(tracing_subscriber::fmt::time::ChronoUtc::with_format(
- TIME_FORMAT.into(),
- ))
- .with_env_filter(filter)
- .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
- .init();
-
- info!("{} starting!", APP_NAME);
-
- let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port()?);
-
- let device_monitor = DeviceMonitor::new(device_monitor_config);
- device_monitor
- .run()
- .instrument(debug_span!(APP_NAME))
- .await?;
-
- Ok(())
-}
-
-fn mqtt_port() -> anyhow::Result<u16> {
- let config_repository = config_repository()?;
- let tedge_config = config_repository.load()?;
- Ok(tedge_config.query(MqttPortSetting)?.into())
-}
-
-fn config_repository() -> anyhow::Result<TEdgeConfigRepository> {
- Ok(TEdgeConfigRepository::new(config_location()?))
-}
-
-fn config_location() -> anyhow::Result<TEdgeConfigLocation> {
- let tedge_config_location = if tedge_users::UserManager::running_as_root()
- || tedge_users::UserManager::running_as("collectd-mapper")
- {
- tedge_config::TEdgeConfigLocation::from_default_system_location()
- } else {
- tedge_config::TEdgeConfigLocation::from_users_home_location(
- home_dir().ok_or(DeviceMonitorError::HomeDirNotFound)?,
- )
- };
- Ok(tedge_config_location)
-}
-
-// Copied from tedge/src/utils/paths.rs. In the future, it would be good to separate it from tedge crate.
-fn home_dir() -> Option<PathBuf> {
- std::env::var_os("HOME")
- .and_then(|home| if home.is_empty() { None } else { Some(home) })
- .map(PathBuf::from)
-}
diff --git a/mapper/tedge_mapper/Cargo.toml b/mapper/tedge_mapper/Cargo.toml
index f45e7c79..0d68d01a 100644
--- a/mapper/tedge_mapper/Cargo.toml
+++ b/mapper/tedge_mapper/Cargo.toml
@@ -12,6 +12,7 @@ maintainer-scripts = "configuration/debian/tedge_mapper"
assets = [
["../../configuration/init/systemd/tedge-mapper-az.service", "/lib/systemd/system/tedge-mapper-az.service", "644"],
["../../configuration/init/systemd/tedge-mapper-c8y.service", "/lib/systemd/system/tedge-mapper-c8y.service", "644"],
+ ["../../configuration/init/systemd/tedge-mapper-collectd.service", "/lib/systemd/system/tedge-mapper-collectd.service", "644"],
["target/release/tedge_mapper", "/usr/bin/tedge_mapper", "755"],
]
@@ -23,26 +24,27 @@ stop-on-upgrade = false
[dependencies]
anyhow = "1.0"
-clock = {path = "../../common/clock" }
+async-trait = "0.1"
c8y_translator_lib = {path = "../cumulocity/c8y_translator_lib" }
-tedge_config = {path = "../../tedge_config" }
-thin_edge_json = {path = "../../mapper/thin_edge_json"}
chrono = "0.4"
+clock = {path = "../../common/clock" }
env_logger = "0.8"
flockfile = {path = "../../common/flockfile" }
+futures = "0.3"
log = "0.4"
+mockall = "0.9"
mqtt_client = {path = "../../common/mqtt_client" }
-strum = "0.21"
-strum_macros = "0.21"
+structopt = "0.3"
+tedge_config = {path = "../../tedge_config" }
+tedge_users = { path = "../../common/tedge_users" }
+thin_edge_json = {path = "../thin_edge_json" }
+thiserror = "1.0"
tokio = { version = "1.6", features = ["rt", "sync", "time"] }
tracing = { version = "0.1", features = ["attributes", "log"] }
tracing-subscriber = "0.2"
-async-trait = "0.1"
-thiserror = "1.0"
-tedge_users = { path = "../../common/tedge_users" }
[dev-dependencies]
+assert_matches = "1.5"
assert-json-diff = "2.0"
serde_json = "1.0"
-assert_matches = "1.5"
tokio-test = "0.4"
diff --git a/mapper/collectd_mapper/src/batcher.rs b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs
index f8fc1274..a58a2fd0 100644
--- a/mapper/collectd_mapper/src/batcher.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/batcher.rs
@@ -13,8 +13,7 @@ use tokio::{
};
use tracing::{error, log::warn};
-use crate::collectd::CollectdMessage;
-use crate::error::*;
+use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError};
#[derive(Debug)]
pub struct MessageBatch {
@@ -218,10 +217,7 @@ mod tests {
use clock::WallClock;
use futures::future::{pending, ready};
use mockall::Sequence;
- use mqtt_client::MockMqttClient;
- use mqtt_client::MockMqttErrorStream;
- use mqtt_client::MockMqttMessageStream;
- use mqtt_client::QoS;
+ use mqtt_client::{MockMqttClient, MockMqttErrorStream, MockMqttMessageStream, QoS};
use tokio::time::{self, Instant};
#[test]
@@ -314,7 +310,7 @@ mod tests {
message_stream
.expect_next()
.times(1)
- .in_sequence(&mut seq) // The second value to be returend by this mock stream
+ .in_sequence(&mut seq) // The second value to be returned by this mock stream
.returning(|| {
Box::pin(async {
time::advance(Duration::from_millis(100)).await; // Advance time, but stay within the batching window so that this message is part of the batch
@@ -327,7 +323,7 @@ mod tests {
message_stream
.expect_next()
.times(1)
- .in_sequence(&mut seq) // The second value to be returend by this mock stream
+ .in_sequence(&mut seq) // The second value to be returned by this mock stream
.returning(|| {
Box::pin(async {
time::advance(Duration::from_millis(1000)).await; // Advance time beyond the batching window so that upcoming messages arrive after the window is closed
@@ -342,7 +338,7 @@ mod tests {
message_stream
.expect_next()
.times(0)
- .in_sequence(&mut seq) // The third value to be returend by this mock stream
+ .in_sequence(&mut seq) // The third value to be returned by this mock stream
.returning(|| {
println!("Third message time: {:?}", Instant::now());
Box::pin(async {
@@ -396,7 +392,7 @@ mod tests {
let mut message_stream = build_message_stream_from_messages(vec![
("collectd/localhost/temperature/value", 32.5),
- ("collectd/pressure/value", 98.0), // Erraneous collectd message with invalid topic
+ ("collectd/pressure/value", 98.0), // Erroneous collectd message with invalid topic
("collectd/localhost/speed/value", 350.0),
]);
@@ -419,18 +415,18 @@ mod tests {
);
assert_eq!(
message_grouper.get_measurement_value(Some("pressure"), "value"),
- None // This measurement isn't included in the batch because the value was erraneous
+ None // This measurement isn't included in the batch because the value was erroneous
);
assert_eq!(
message_grouper.get_measurement_value(Some("speed"), "value"),
- Some(350.0) // This measurement is included in the batch even though the last message was erraneous
+ Some(350.0) // This measurement is included in the batch even though the last message was erroneous
);
Ok(())
}
#[tokio::test]
- async fn batching_with_erraneous_first_message() -> anyhow::Result<()> {
+ async fn batching_with_erroneous_first_message() -> anyhow::Result<()> {
let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<MeasurementGrouper>();
let mqtt_client = build_mock_mqtt_client();
@@ -477,7 +473,7 @@ mod tests {
Ok(Box::new(message_stream))
});
- return mqtt_client;
+ mqtt_client
}
fn build_message_stream_from_messages(
@@ -490,7 +486,7 @@ mod tests {
message_stream
.expect_next()
.times(1)
- .in_sequence(&mut seq) // The third value to be returend by this mock stream
+ .in_sequence(&mut seq) // The third value to be returned by this mock stream
.returning(move || {
let topic = Topic::new(message.0).unwrap();
let message = Message::new(&topic, format!("123456789:{}", message.1));
@@ -503,6 +499,6 @@ mod tests {
.expect_next()
.returning(|| Box::pin(pending())); // Block the stream with a pending future
- return message_stream;
+ message_stream
}
}
diff --git a/mapper/collectd_mapper/src/collectd.rs b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs
index 86587bf0..068e4172 100644
--- a/mapper/collectd_mapper/src/collectd.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/collectd.rs
@@ -211,7 +211,7 @@ mod tests {
}
#[test]
- fn invalid_collectd_payload_no_seperator() {
+ fn invalid_collectd_payload_no_separator() {
let payload = "123456789";
let result = CollectdPayload::parse_from(payload);
@@ -222,7 +222,7 @@ mod tests {
}
#[test]
- fn invalid_collectd_payload_more_seperators() {
+ fn invalid_collectd_payload_more_separators() {
let payload = "123456789:98.6:abc";
let result = CollectdPayload::parse_from(payload);
diff --git a/mapper/tedge_mapper/src/collectd_mapper/error.rs b/mapper/tedge_mapper/src/collectd_mapper/error.rs
new file mode 100644
index 00000000..4f1d66fe
--- /dev/null
+++ b/mapper/tedge_mapper/src/collectd_mapper/error.rs
@@ -0,0 +1,21 @@
+use tokio::sync::mpsc::error::SendError;
+
+#[derive(thiserror::Error, Debug)]
+pub enum DeviceMonitorError {
+ #[error(transparent)]
+ MqttClientError(#[from] mqtt_client::MqttClientError),
+
+ #[error(transparent)]
+ InvalidCollectdMeasurementError(#[from] crate::collectd_mapper::collectd::CollectdError),
+
+ #[error(transparent)]
+ InvalidThinEdgeJsonError(#[from] thin_edge_json::group::MeasurementGrouperError),
+
+ #[error(transparent)]
+ ThinEdgeJsonSerializationError(
+ #[from] thin_edge_json::serialize::ThinEdgeJsonSerializationError,
+ ),
+
+ #[error(transparent)]
+ BatchingError(#[from] SendError<thin_edge_json::group::MeasurementGrouper>),
+}
diff --git a/mapper/tedge_mapper/src/collectd_mapper/mapper.rs b/mapper/tedge_mapper/src/collectd_mapper/mapper.rs
new file mode 100644
index 00000000..9611a314
--- /dev/null
+++ b/mapper/tedge_mapper/src/collectd_mapper/mapper.rs
@@ -0,0 +1,34 @@
+use crate::{
+ collectd_mapper::monitor::{DeviceMonitor, DeviceMonitorConfig},
+ component::TEdgeComponent,
+};
+use async_trait::async_trait;
+use tedge_config::{ConfigSettingAccessor, MqttPortSetting, TEdgeConfig};
+use tracing::{debug_span, Instrument};
+
+const APP_NAME: &str = "tedge-mapper-collectd";
+
+pub struct CollectdMapper {}
+
+impl CollectdMapper {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl TEdgeComponent for CollectdMapper {
+ async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
+ let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
+
+ let device_monitor_config = DeviceMonitorConfig::default().with_port(mqtt_port);
+
+ let device_monitor = DeviceMonitor::new(device_monitor_config);
+ device_monitor
+ .run()
+ .instrument(debug_span!(APP_NAME))
+ .await?;
+
+ Ok(())
+ }
+}
diff --git a/mapper/tedge_mapper/src/collectd_mapper/mod.rs b/mapper/tedge_mapper/src/collectd_mapper/mod.rs
new file mode 100644
index 00000000..2013b1ec
--- /dev/null
+++ b/mapper/tedge_mapper/src/collectd_mapper/mod.rs
@@ -0,0 +1,5 @@
+mod batcher;
+mod collectd;
+mod error;
+pub mod mapper;
+mod monitor;
diff --git a/mapper/collectd_mapper/src/monitor.rs b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs
index af2c7cd7..c1d3a8de 100644
--- a/mapper/collectd_mapper/src/monitor.rs
+++ b/mapper/tedge_mapper/src/collectd_mapper/monitor.rs
@@ -4,7 +4,7 @@ use std::sync::Arc;
use thin_edge_json::group::MeasurementGrouper;
use tracing::{instrument, log::error};
-use crate::{
+use crate::collectd_mapper::{
batcher::{MessageBatchPublisher, MessageBatcher},
error::DeviceMonitorError,
};
diff --git a/mapper/tedge_mapper/src/error.rs b/mapper/tedge_mapper/src/error.rs
index 35ace840..71be2e4d 100644
--- a/mapper/tedge_mapper/src/error.rs
+++ b/mapper/tedge_mapper/src/error.rs
@@ -8,11 +8,6 @@ pub enum MapperError {
#[error(transparent)]
MqttClientError(#[from] MqttClientError),
- #[error(
- "tedge_mapper accepts only one argument. Run `tedge_mapper c8y` or `tedge_mapper az`."
- )]
- IncorrectArgument,
-
#[error("Home directory is not found.")]
HomeDirNotFound,
diff --git a/mapper/tedge_mapper/src/main.rs b/mapper/tedge_mapper/src/main.rs
index 6b6dee69..1432c14d 100644
--- a/mapper/tedge_mapper/src/main.rs
+++ b/mapper/tedge_mapper/src/main.rs
@@ -1,67 +1,59 @@
-use crate::az_mapper::AzureMapper;
-use crate::c8y_mapper::CumulocityMapper;
-use crate::component::TEdgeComponent;
-use crate::error::*;
+use crate::{
+ az_mapper::AzureMapper, c8y_mapper::CumulocityMapper, collectd_mapper::mapper::CollectdMapper,
+ component::TEdgeComponent, error::*,
+};
use std::path::PathBuf;
-use std::str::FromStr;
-use strum_macros::*;
+use structopt::*;
use tedge_config::*;
mod az_converter;
mod az_mapper;
mod c8y_converter;
mod c8y_mapper;
+mod collectd_mapper;
mod component;
mod converter;
mod error;
mod mapper;
mod size_threshold;
-const DEFAULT_LOG_LEVEL: &str = "warn";
const TIME_FORMAT: &str = "%Y-%m-%dT%H:%M:%S%.3f%:z";<