diff options
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | mapper/dm_agent/src/batcher.rs | 16 | ||||
-rw-r--r-- | mapper/dm_agent/src/monitor.rs | 2 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml | 29 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_smaller_interval/run.py | 137 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml | 28 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_with_collectd/run.py | 154 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml | 28 | ||||
-rw-r--r-- | tests/PySys/monitoring/monitoring_with_simulated_messages/run.py | 131 |
9 files changed, 522 insertions, 6 deletions
@@ -5,6 +5,9 @@ # These are backup files generated by rustfmt **/*.rs.bk +#.lock file +mapper/cumulocity/c8y_translator_lib/fuzz/Cargo.lock + .idea/ *.iml .tmp/ diff --git a/mapper/dm_agent/src/batcher.rs b/mapper/dm_agent/src/batcher.rs index cb5b4cbe..100c4222 100644 --- a/mapper/dm_agent/src/batcher.rs +++ b/mapper/dm_agent/src/batcher.rs @@ -1,6 +1,6 @@ use clock::{Clock, Timestamp}; use mqtt_client::{Message, MqttClient, MqttMessageStream, Topic, TopicFilter}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use thin_edge_json::{ group::MeasurementGrouper, measurement::FlatMeasurementVisitor, serialize::ThinEdgeJsonSerializer, @@ -8,7 +8,8 @@ use thin_edge_json::{ use tokio::{ select, sync::mpsc::{UnboundedReceiver, UnboundedSender}, - time::sleep, + time, + time::Duration, }; use tracing::{error, log::warn}; @@ -122,8 +123,16 @@ impl MessageBatcher { let mut message_batch = MessageBatch::start_batch(collectd_message, first_message_timestamp)?; + // Creates a sleep timer future handler and does not await here + // for sleep to finish, but inside the select loop + let sleep = time::sleep(self.batching_window); + tokio::pin!(sleep); + loop { select! { + _ = &mut sleep => { + break; + } maybe_message = self.receive_message(messages) => { match maybe_message { Some((message, _timestamp)) => { @@ -140,9 +149,6 @@ impl MessageBatcher { } } - _result = sleep(self.batching_window) => { - break; - } } } diff --git a/mapper/dm_agent/src/monitor.rs b/mapper/dm_agent/src/monitor.rs index 6606ac11..97b065ff 100644 --- a/mapper/dm_agent/src/monitor.rs +++ b/mapper/dm_agent/src/monitor.rs @@ -12,7 +12,7 @@ use crate::{ const DEFAULT_HOST: &str = "localhost"; const DEFAULT_PORT: u16 = 1883; const DEFAULT_MQTT_CLIENT_ID: &str = "tedge-dm-agent"; -const DEFAULT_BATCHING_WINDOW: u64 = 1000; +const DEFAULT_BATCHING_WINDOW: u64 = 200; const DEFAULT_MQTT_SOURCE_TOPIC: &str = "collectd/#"; const DEFAULT_MQTT_TARGET_TOPIC: &str = "tedge/measurements"; diff --git a/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml b/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml new file mode 100644 index 00000000..191238c4 --- /dev/null +++ b/tests/PySys/monitoring/monitoring_smaller_interval/pysystest.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="utf-8"?> +<pysystest type="auto"> + <description> + <title> + Validate the tedge dm agent messages that are produced + with smaller sampling interval than the batching window. + Here the simulated collectd messages are used. + </title> + <purpose> + <![CDATA[ ]]> + </purpose> + </description> + <classification> + <groups inherit="true"> + <group> + </group> + </groups> + <modes inherit="true"> + </modes> + </classification> + <data> + <class name="MonitoringSmallInterval" module="run" /> + </data> + <traceability> + <requirements> + <requirement id="" /> + </requirements> + </traceability> +</pysystest> diff --git a/tests/PySys/monitoring/monitoring_smaller_interval/run.py b/tests/PySys/monitoring/monitoring_smaller_interval/run.py new file mode 100644 index 00000000..c194334c --- /dev/null +++ b/tests/PySys/monitoring/monitoring_smaller_interval/run.py @@ -0,0 +1,137 @@ +from pysys.basetest import BaseTest + +import time +import json + +""" +Validate tedge-dm-agent messages that are published +on tedge/measurements + +Given a configured system +When we start the tedge-dm-agent with sudo in the background +When we start tedge sub with sudo in the background +When we start two publishers to publish the simulated collectd messages +Publish the messages in 100ms interval +Wait for couple of seconds to publish couple of batch of messages +Then we kill tedge sub with sudo as it is running with a different user account +Then we validate the messages in the output of tedge sub, + +""" + + +class MonitoringSmallInterval(BaseTest): + def setup(self): + self.tedge = "/usr/bin/tedge" + self.sudo = "/usr/bin/sudo" + + # stop collectd to avoid mixup of messages + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "collectd"], + stdouterr="collectd", + ) + + collectd_mapper = self.startProcess( + command=self.sudo, + arguments=["systemctl", "start", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) + self.addCleanupFunction(self.monitoring_cleanup) + + def execute(self): + sub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"], + stdouterr="tedge_sub", + background=True, + ) + + # Wait for a small amount of time to give tedge sub time + # to initialize. This is a heuristic measure. + # Without an additional wait we observe failures in 1% of the test + # runs. + time.sleep(0.1) + + for i in range(10): + + pub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "pub", + "collectd/host/temperature/temp", "123435445:25.5"], + stdouterr="tedge_temp", + ) + + pub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "pub", + "collectd/host/pressure/pres", "12345678:500.5"], + stdouterr="tedge_pres", + ) + + # publish every 100ms + time.sleep(0.1) + + # wait for tedge-dm-agent to batch messages + time.sleep(1) + + # Kill the subscriber process explicitly with sudo as PySys does + # not have the rights to do it + kill = self.startProcess( + command=self.sudo, + arguments=["killall", "tedge"], + stdouterr="kill_out", + ) + + def validate(self): + self.assertThat('collectd_msg_validation_result == expected_result', + collectd_msg_validation_result=self.validate_json(), expected_result=True) + + def validate_json(self): + f = open(self.output + '/tedge_sub.out', 'r') + lines = f.readlines() + for line in lines: + self.js_msg = json.loads(line) + if not self.validate_time(): + reason = "time validation failed in message: " + str(line) + self.abort(False, reason) + if not self.validate_temperature(): + reason = "temperature stat validation failed in message: " + \ + str(line) + self.abort(False, reason) + if not self.validate_pressure(): + reason = "pressure stat validation failed in message: " + \ + str(line) + self.abort(False, reason) + return True + + def validate_time(self): + if self.js_msg["time"]: + return True + else: + return False + + def validate_temperature(self): + if self.js_msg["temperature"]: + if "temp" in self.js_msg["temperature"]: + return True + else: + return False + else: + return False + + def validate_pressure(self): + if self.js_msg["pressure"]: + if "pres" in self.js_msg["pressure"]: + return True + else: + return False + else: + return False + + def monitoring_cleanup(self): + self.log.info("monitoring_cleanup") + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) diff --git a/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml b/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml new file mode 100644 index 00000000..4599a967 --- /dev/null +++ b/tests/PySys/monitoring/monitoring_with_collectd/pysystest.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="utf-8"?> +<pysystest type="auto"> + <description> + <title> + Validate tedge dm agent messages that are produced + by batching the collectd messages + </title> + <purpose> + <![CDATA[ ]]> + </purpose> + </description> + <classification> + <groups inherit="true"> + <group> + </group> + </groups> + <modes inherit="true"> + </modes> + </classification> + <data> + <class name="MonitoringWithCollectd" module="run" /> + </data> + <traceability> + <requirements> + <requirement id="" /> + </requirements> + </traceability> +</pysystest> diff --git a/tests/PySys/monitoring/monitoring_with_collectd/run.py b/tests/PySys/monitoring/monitoring_with_collectd/run.py new file mode 100644 index 00000000..6a2ae149 --- /dev/null +++ b/tests/PySys/monitoring/monitoring_with_collectd/run.py @@ -0,0 +1,154 @@ +from pysys.basetest import BaseTest + +import time +import re +import json + +""" +Validate tedge-dm-agent messages that are published +on tedge/measurements + +Given a configured system +When we start the collectd with sudo in the background +When we start the tedge-dm-agent with sudo in the background +When we start tedge sub with sudo in the background +Wait for couple of seconds to publish couple of batch of messages +Then we kill tedge sub with sudo as it is running with a different user account +Then we validate the messages in the output of tedge sub, +""" + + +class MonitoringWithCollectd(BaseTest): + + def setup(self): + self.js_msg = "" + self.cpu_cnt = 0 + self.memory_cnt = 0 + self.time_cnt = 0 + self.disk_cnt = 0 + + self.tedge = "/usr/bin/tedge" + self.sudo = "/usr/bin/sudo" + + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "start", "collectd"], + stdouterr="collectd", + ) + + collectd_mapper = self.startProcess( + command=self.sudo, + arguments=["systemctl", "start", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) + self.addCleanupFunction(self.monitoring_cleanup) + + def execute(self): + + time.sleep(0.1) + sub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"], + stdouterr="tedge_sub", + background=True, + ) + + sub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "sub", "collectd/#"], + #arguments=[self.tedge, "mqtt", "sub", "--no-topic", "collectd/#"], + stdouterr="collectd_sub", + background=True, + ) + + + # Wait for a small amount of time to give tedge sub time + # to initialize and capture couple of batches of messages + # that are published by tedge-dm-agent. + time.sleep(12) + + # Kill the subscriber process explicitly with sudo as PySys does + # not have the rights to do it + kill = self.startProcess( + command=self.sudo, + arguments=["killall", "tedge"], + stdouterr="kill_out", + ) + + def validate(self): + self.assertGrep("tedge_sub.out", r'time|cpu|memory|df-root') + self.assertThat('collectd_msg_validation_result == expected_result', + collectd_msg_validation_result=self.validate_json(), expected_result=True) + + def validate_json(self): + f = open(self.output + '/tedge_sub.out', 'r') + lines = f.readlines() + for line in lines: + self.js_msg = json.loads(line) + if not self.validate_cpu(): + reason = "cpu stat validation failed in message: " + str(line) + self.abort(False, reason) + if not self.validate_time(): + reason = "time validation failed in message: " + str(line) + self.abort(False, reason) + if not self.validate_memory(): + reason = "memory stat validation failed in message: " + \ + str(line) + self.abort(False, reason) + # validate disk stats if the entries are present, as the disk stats collection window is bigger + if "df-root" in self.js_msg: + self.validate_disk() + if self.time_cnt == self.cpu_cnt == self.memory_cnt and self.disk_cnt > 0 and self.disk_cnt <= 3: + return True + else: + reason = "disk stat validation failed in message: " + str(line) + self.abort(False, reason) + + def validate_cpu(self): + if self.js_msg["cpu"]: + if "percent-active" in self.js_msg["cpu"]: + self.cpu_cnt += 1 + return True + else: + return False + else: + return False + + def validate_time(self): + if self.js_msg["time"]: + self.time_cnt += 1 + return True + else: + return False + + def validate_memory(self): + if self.js_msg["memory"]: + if "percent-used" in self.js_msg["memory"]: + self.memory_cnt += 1 + return True + else: + return False + else: + return False + + def validate_disk(self): + if "percent_bytes-used" in self.js_msg["df-root"]: + self.disk_cnt += 1 + return True + else: + return False + + def monitoring_cleanup(self): + self.log.info("monitoring_cleanup") + + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) + + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "collectd"], + stdouterr="collectd", + ) diff --git a/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml b/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml new file mode 100644 index 00000000..fc946107 --- /dev/null +++ b/tests/PySys/monitoring/monitoring_with_simulated_messages/pysystest.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="utf-8"?> +<pysystest type="auto"> + <description> + <title> + Validate the tedge dm agent messages that are produced + from the simulated collectd messages. + </title> + <purpose> + <![CDATA[ ]]> + </purpose> + </description> + <classification> + <groups inherit="true"> + <group> + </group> + </groups> + <modes inherit="true"> + </modes> + </classification> + <data> + <class name="MonitoringWithSimulatedMessages" module="run" /> + </data> + <traceability> + <requirements> + <requirement id="" /> + </requirements> + </traceability> +</pysystest> diff --git a/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py b/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py new file mode 100644 index 00000000..25d56fe7 --- /dev/null +++ b/tests/PySys/monitoring/monitoring_with_simulated_messages/run.py @@ -0,0 +1,131 @@ +from pysys.basetest import BaseTest + +import time +import json + +""" +Validate tedge-dm-agent messages that are published +on tedge/measurements + +Given a configured system +When we start the tedge-dm-agent with sudo in the background +When we start tedge sub with sudo in the background +When we start two publishers to publish the simulated collectd messages +Wait for couple of seconds to publish couple of batch of messages +Then we kill tedge sub with sudo as it is running with a different user account +Then we validate the messages in the output of tedge sub, + +""" + + +class MonitoringWithSimulatedMessages(BaseTest): + def setup(self): + self.tedge = "/usr/bin/tedge" + self.sudo = "/usr/bin/sudo" + + # stop collectd to avoid mixup of messages + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "collectd"], + stdouterr="collectd", + ) + + collectd_mapper = self.startProcess( + command=self.sudo, + arguments=["systemctl", "start", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) + self.addCleanupFunction(self.monitoring_cleanup) + + def execute(self): + sub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "sub", "--no-topic", "tedge/#"], + stdouterr="tedge_sub", + background=True, + ) + + # Wait for a small amount of time to give tedge sub time + # to initialize. This is a heuristic measure. + # Without an additional wait we observe failures in 1% of the test + # runs. + time.sleep(0.1) + + pub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "pub", + "collectd/host/temperature/temp", "123435445:25.5"], + stdouterr="tedge_temp", + ) + + pub = self.startProcess( + command=self.sudo, + arguments=[self.tedge, "mqtt", "pub", + "collectd/host/pressure/pres", "12345678:500.5"], + stdouterr="tedge_pres", + ) + + # wait for tedge-dm-agent to batch messages + time.sleep(1) + + # Kill the subscriber process explicitly with sudo as PySys does + # not have the rights to do it + kill = self.startProcess( + command=self.sudo, + arguments=["killall", "tedge"], + stdouterr="kill_out", + ) + + def validate(self): + self.assertThat('collectd_msg_validation_result == expected_result', + collectd_msg_validation_result=self.validate_json(), expected_result=True) + + def validate_json(self): + f = open(self.output + '/tedge_sub.out', 'r') + lines = f.readlines() + for line in lines: + self.js_msg = json.loads(line) + if not self.validate_time(): + reason = "time validation failed in message: " + str(line) + self.abort(False, reason) + if not self.validate_temperature(): + reason = "temperature stat validation failed in message: " + \ + str(line) + self.abort(False, reason) + if not self.validate_pressure(): + reason = "pressure stat validation failed in message: " + \ + str(line) + self.abort(False, reason) + return True + + def validate_time(self): + if self.js_msg["time"]: + return True + else: + return False + + def validate_temperature(self): + if self.js_msg["temperature"]: + if "temp" in self.js_msg["temperature"]: + return True + else: + return False + else: + return False + + def validate_pressure(self): + if self.js_msg["pressure"]: + if "pres" in self.js_msg["pressure"]: + return True + else: + return False + else: + return False + + def monitoring_cleanup(self): + self.log.info("monitoring_cleanup") + collectd = self.startProcess( + command=self.sudo, + arguments=["systemctl", "stop", "tedge-dm-agent"], + stdouterr="collectd_mapper", + ) |