summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--mapper/dm_agent/src/batcher.rs16
-rw-r--r--mapper/dm_agent/src/monitor.rs2
-rw-r--r--tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml29
-rw-r--r--tests/PySys/monitoring/monitoring_smaller_interval/run.py137
-rw-r--r--tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml28
-rw-r--r--tests/PySys/monitoring/monitoring_with_collectd/run.py154
-rw-r--r--tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml28
-rw-r--r--tests/PySys/monitoring/monitoring_with_simulated_messages/run.py131
9 files changed, 522 insertions, 6 deletions
diff --git a/.gitignore b/.gitignore
index 05894aae..f53e82e0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,6 +5,9 @@
# These are backup files generated by rustfmt
**/*.rs.bk
+#.lock file
+mapper/cumulocity/c8y_translator_lib/fuzz/Cargo.lock
+
.idea/
*.iml
.tmp/
diff --git a/mapper/dm_agent/src/batcher.rs b/mapper/dm_agent/src/batcher.rs
index cb5b4cbe..100c4222 100644
--- a/mapper/dm_agent/src/batcher.rs
+++ b/mapper/dm_agent/src/batcher.rs
@@ -1,6 +1,6 @@
use clock::{Clock, Timestamp};
use mqtt_client::{Message, MqttClient, MqttMessageStream, Topic, TopicFilter};
-use std::{sync::Arc, time::Duration};
+use std::sync::Arc;
use thin_edge_json::{
group::MeasurementGrouper, measurement::FlatMeasurementVisitor,
serialize::ThinEdgeJsonSerializer,
@@ -8,7 +8,8 @@ use thin_edge_json::{
use tokio::{
select,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
- time::sleep,
+ time,
+ time::Duration,
};
use tracing::{error, log::warn};
@@ -122,8 +123,16 @@ impl MessageBatcher {
let mut message_batch =
MessageBatch::start_batch(collectd_message, first_message_timestamp)?;
+ // Creates a sleep timer future handler and does not await here
+ // for sleep to finish, but inside the select loop
+ let sleep = time::sleep(self.batching_window);
+ tokio::pin!(sleep);
+
loop {
select! {
+ _ = &mut sleep => {
+ break;
+ }
maybe_message = self.receive_message(messages) => {
match maybe_message {
Some((message, _timestamp)) => {
@@ -140,9 +149,6 @@ impl MessageBatcher {
}
}
- _result = sleep(self.batching_window) => {
- break;
- }
}
}
diff --git a/mapper/dm_agent/src/monitor.rs b/mapper/dm_agent/src/monitor.rs
index 6606ac11..97b065ff 100644
--- a/mapper/dm_agent/src/monitor.rs
+++ b/mapper/dm_agent/src/monitor.rs
@@ -12,7 +12,7 @@ use crate::{
const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: u16 = 1883;
const DEFAULT_MQTT_CLIENT_ID: &str = "tedge-dm-agent";
-const DEFAULT_BATCHING_WINDOW: u64 = 1000;
+const DEFAULT_BATCHING_WINDOW: u64 = 200;
const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#";
const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements";
diff --git a/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml b/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml
new file mode 100644
index 00000000..191238c4
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="utf-8"?>
+<pysystest type="auto">
+ <description>
+ <title>
+ Validate the tedge dm agent messages that are produced
+ with smaller sampling interval than the batching window.
+ Here the simulated collectd messages are used.
+ </title>
+ <purpose>
+ <![CDATA[ ]]>
+ </purpose>
+ </description>
+ <classification>
+ <groups inherit="true">
+ <group>
+ </group>
+ </groups>
+ <modes inherit="true">
+ </modes>
+ </classification>
+ <data>
+ <class name="MonitoringSmallInterval" module="run" />
+ </data>
+ <traceability>
+ <requirements>
+ <requirement id="" />
+ </requirements>
+ </traceability>
+</pysystest>
diff --git a/tests/PySys/monitoring/monitoring_smaller_interval/run.py b/tests/PySys/monitoring/monitoring_smaller_interval/run.py
new file mode 100644
index 00000000..c194334c
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_smaller_interval/run.py
@@ -0,0 +1,137 @@
+from pysys.basetest import BaseTest
+
+import time
+import json
+
+"""
+Validate tedge-dm-agent messages that are published
+on tedge/measurements
+
+Given a configured system
+When we start the tedge-dm-agent with sudo in the background
+When we start tedge sub with sudo in the background
+When we start two publishers to publish the simulated collectd messages
+Publish the messages in 100ms interval
+Wait for couple of seconds to publish couple of batch of messages
+Then we kill tedge sub with sudo as it is running with a different user account
+Then we validate the messages in the output of tedge sub,
+
+"""
+
+
+class MonitoringSmallInterval(BaseTest):
+ def setup(self):
+ self.tedge = "/usr/bin/tedge"
+ self.sudo = "/usr/bin/sudo"
+
+ # stop collectd to avoid mixup of messages
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "collectd"],
+ stdouterr="collectd",
+ )
+
+ collectd_mapper = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "start", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )
+ self.addCleanupFunction(self.monitoring_cleanup)
+
+ def execute(self):
+ sub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"],
+ stdouterr="tedge_sub",
+ background=True,
+ )
+
+ # Wait for a small amount of time to give tedge sub time
+ # to initialize. This is a heuristic measure.
+ # Without an additional wait we observe failures in 1% of the test
+ # runs.
+ time.sleep(0.1)
+
+ for i in range(10):
+
+ pub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "pub",
+ "collectd/host/temperature/temp", "123435445:25.5"],
+ stdouterr="tedge_temp",
+ )
+
+ pub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "pub",
+ "collectd/host/pressure/pres", "12345678:500.5"],
+ stdouterr="tedge_pres",
+ )
+
+ # publish every 100ms
+ time.sleep(0.1)
+
+ # wait for tedge-dm-agent to batch messages
+ time.sleep(1)
+
+ # Kill the subscriber process explicitly with sudo as PySys does
+ # not have the rights to do it
+ kill = self.startProcess(
+ command=self.sudo,
+ arguments=["killall", "tedge"],
+ stdouterr="kill_out",
+ )
+
+ def validate(self):
+ self.assertThat('collectd_msg_validation_result == expected_result',
+ collectd_msg_validation_result=self.validate_json(), expected_result=True)
+
+ def validate_json(self):
+ f = open(self.output + '/tedge_sub.out', 'r')
+ lines = f.readlines()
+ for line in lines:
+ self.js_msg = json.loads(line)
+ if not self.validate_time():
+ reason = "time validation failed in message: " + str(line)
+ self.abort(False, reason)
+ if not self.validate_temperature():
+ reason = "temperature stat validation failed in message: " + \
+ str(line)
+ self.abort(False, reason)
+ if not self.validate_pressure():
+ reason = "pressure stat validation failed in message: " + \
+ str(line)
+ self.abort(False, reason)
+ return True
+
+ def validate_time(self):
+ if self.js_msg["time"]:
+ return True
+ else:
+ return False
+
+ def validate_temperature(self):
+ if self.js_msg["temperature"]:
+ if "temp" in self.js_msg["temperature"]:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def validate_pressure(self):
+ if self.js_msg["pressure"]:
+ if "pres" in self.js_msg["pressure"]:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def monitoring_cleanup(self):
+ self.log.info("monitoring_cleanup")
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )
diff --git a/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml b/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml
new file mode 100644
index 00000000..4599a967
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8"?>
+<pysystest type="auto">
+ <description>
+ <title>
+ Validate tedge dm agent messages that are produced
+ by batching the collectd messages
+ </title>
+ <purpose>
+ <![CDATA[ ]]>
+ </purpose>
+ </description>
+ <classification>
+ <groups inherit="true">
+ <group>
+ </group>
+ </groups>
+ <modes inherit="true">
+ </modes>
+ </classification>
+ <data>
+ <class name="MonitoringWithCollectd" module="run" />
+ </data>
+ <traceability>
+ <requirements>
+ <requirement id="" />
+ </requirements>
+ </traceability>
+</pysystest>
diff --git a/tests/PySys/monitoring/monitoring_with_collectd/run.py b/tests/PySys/monitoring/monitoring_with_collectd/run.py
new file mode 100644
index 00000000..6a2ae149
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_with_collectd/run.py
@@ -0,0 +1,154 @@
+from pysys.basetest import BaseTest
+
+import time
+import re
+import json
+
+"""
+Validate tedge-dm-agent messages that are published
+on tedge/measurements
+
+Given a configured system
+When we start the collectd with sudo in the background
+When we start the tedge-dm-agent with sudo in the background
+When we start tedge sub with sudo in the background
+Wait for couple of seconds to publish couple of batch of messages
+Then we kill tedge sub with sudo as it is running with a different user account
+Then we validate the messages in the output of tedge sub,
+"""
+
+
+class MonitoringWithCollectd(BaseTest):
+
+ def setup(self):
+ self.js_msg = ""
+ self.cpu_cnt = 0
+ self.memory_cnt = 0
+ self.time_cnt = 0
+ self.disk_cnt = 0
+
+ self.tedge = "/usr/bin/tedge"
+ self.sudo = "/usr/bin/sudo"
+
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "start", "collectd"],
+ stdouterr="collectd",
+ )
+
+ collectd_mapper = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "start", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )
+ self.addCleanupFunction(self.monitoring_cleanup)
+
+ def execute(self):
+
+ time.sleep(0.1)
+ sub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"],
+ stdouterr="tedge_sub",
+ background=True,
+ )
+
+ sub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "sub", "collectd/#"],
+ #arguments=[self.tedge, "mqtt", "sub", "--no-topic", "collectd/#"],
+ stdouterr="collectd_sub",
+ background=True,
+ )
+
+
+ # Wait for a small amount of time to give tedge sub time
+ # to initialize and capture couple of batches of messages
+ # that are published by tedge-dm-agent.
+ time.sleep(12)
+
+ # Kill the subscriber process explicitly with sudo as PySys does
+ # not have the rights to do it
+ kill = self.startProcess(
+ command=self.sudo,
+ arguments=["killall", "tedge"],
+ stdouterr="kill_out",
+ )
+
+ def validate(self):
+ self.assertGrep("tedge_sub.out", r'time|cpu|memory|df-root')
+ self.assertThat('collectd_msg_validation_result == expected_result',
+ collectd_msg_validation_result=self.validate_json(), expected_result=True)
+
+ def validate_json(self):
+ f = open(self.output + '/tedge_sub.out', 'r')
+ lines = f.readlines()
+ for line in lines:
+ self.js_msg = json.loads(line)
+ if not self.validate_cpu():
+ reason = "cpu stat validation failed in message: " + str(line)
+ self.abort(False, reason)
+ if not self.validate_time():
+ reason = "time validation failed in message: " + str(line)
+ self.abort(False, reason)
+ if not self.validate_memory():
+ reason = "memory stat validation failed in message: " + \
+ str(line)
+ self.abort(False, reason)
+ # validate disk stats if the entries are present, as the disk stats collection window is bigger
+ if "df-root" in self.js_msg:
+ self.validate_disk()
+ if self.time_cnt == self.cpu_cnt == self.memory_cnt and self.disk_cnt > 0 and self.disk_cnt <= 3:
+ return True
+ else:
+ reason = "disk stat validation failed in message: " + str(line)
+ self.abort(False, reason)
+
+ def validate_cpu(self):
+ if self.js_msg["cpu"]:
+ if "percent-active" in self.js_msg["cpu"]:
+ self.cpu_cnt += 1
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def validate_time(self):
+ if self.js_msg["time"]:
+ self.time_cnt += 1
+ return True
+ else:
+ return False
+
+ def validate_memory(self):
+ if self.js_msg["memory"]:
+ if "percent-used" in self.js_msg["memory"]:
+ self.memory_cnt += 1
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def validate_disk(self):
+ if "percent_bytes-used" in self.js_msg["df-root"]:
+ self.disk_cnt += 1
+ return True
+ else:
+ return False
+
+ def monitoring_cleanup(self):
+ self.log.info("monitoring_cleanup")
+
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )
+
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "collectd"],
+ stdouterr="collectd",
+ )
diff --git a/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml b/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml
new file mode 100644
index 00000000..fc946107
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8"?>
+<pysystest type="auto">
+ <description>
+ <title>
+ Validate the tedge dm agent messages that are produced
+ from the simulated collectd messages.
+ </title>
+ <purpose>
+ <![CDATA[ ]]>
+ </purpose>
+ </description>
+ <classification>
+ <groups inherit="true">
+ <group>
+ </group>
+ </groups>
+ <modes inherit="true">
+ </modes>
+ </classification>
+ <data>
+ <class name="MonitoringWithSimulatedMessages" module="run" />
+ </data>
+ <traceability>
+ <requirements>
+ <requirement id="" />
+ </requirements>
+ </traceability>
+</pysystest>
diff --git a/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py b/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py
new file mode 100644
index 00000000..25d56fe7
--- /dev/null
+++ b/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py
@@ -0,0 +1,131 @@
+from pysys.basetest import BaseTest
+
+import time
+import json
+
+"""
+Validate tedge-dm-agent messages that are published
+on tedge/measurements
+
+Given a configured system
+When we start the tedge-dm-agent with sudo in the background
+When we start tedge sub with sudo in the background
+When we start two publishers to publish the simulated collectd messages
+Wait for couple of seconds to publish couple of batch of messages
+Then we kill tedge sub with sudo as it is running with a different user account
+Then we validate the messages in the output of tedge sub,
+
+"""
+
+
+class MonitoringWithSimulatedMessages(BaseTest):
+ def setup(self):
+ self.tedge = "/usr/bin/tedge"
+ self.sudo = "/usr/bin/sudo"
+
+ # stop collectd to avoid mixup of messages
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "collectd"],
+ stdouterr="collectd",
+ )
+
+ collectd_mapper = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "start", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )
+ self.addCleanupFunction(self.monitoring_cleanup)
+
+ def execute(self):
+ sub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"],
+ stdouterr="tedge_sub",
+ background=True,
+ )
+
+ # Wait for a small amount of time to give tedge sub time
+ # to initialize. This is a heuristic measure.
+ # Without an additional wait we observe failures in 1% of the test
+ # runs.
+ time.sleep(0.1)
+
+ pub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "pub",
+ "collectd/host/temperature/temp", "123435445:25.5"],
+ stdouterr="tedge_temp",
+ )
+
+ pub = self.startProcess(
+ command=self.sudo,
+ arguments=[self.tedge, "mqtt", "pub",
+ "collectd/host/pressure/pres", "12345678:500.5"],
+ stdouterr="tedge_pres",
+ )
+
+ # wait for tedge-dm-agent to batch messages
+ time.sleep(1)
+
+ # Kill the subscriber process explicitly with sudo as PySys does
+ # not have the rights to do it
+ kill = self.startProcess(
+ command=self.sudo,
+ arguments=["killall", "tedge"],
+ stdouterr="kill_out",
+ )
+
+ def validate(self):
+ self.assertThat('collectd_msg_validation_result == expected_result',
+ collectd_msg_validation_result=self.validate_json(), expected_result=True)
+
+ def validate_json(self):
+ f = open(self.output + '/tedge_sub.out', 'r')
+ lines = f.readlines()
+ for line in lines:
+ self.js_msg = json.loads(line)
+ if not self.validate_time():
+ reason = "time validation failed in message: " + str(line)
+ self.abort(False, reason)
+ if not self.validate_temperature():
+ reason = "temperature stat validation failed in message: " + \
+ str(line)
+ self.abort(False, reason)
+ if not self.validate_pressure():
+ reason = "pressure stat validation failed in message: " + \
+ str(line)
+ self.abort(False, reason)
+ return True
+
+ def validate_time(self):
+ if self.js_msg["time"]:
+ return True
+ else:
+ return False
+
+ def validate_temperature(self):
+ if self.js_msg["temperature"]:
+ if "temp" in self.js_msg["temperature"]:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def validate_pressure(self):
+ if self.js_msg["pressure"]:
+ if "pres" in self.js_msg["pressure"]:
+ return True
+ else:
+ return False
+ else:
+ return False
+
+ def monitoring_cleanup(self):
+ self.log.info("monitoring_cleanup")
+ collectd = self.startProcess(
+ command=self.sudo,
+ arguments=["systemctl", "stop", "tedge-dm-agent"],
+ stdouterr="collectd_mapper",
+ )