summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper
diff options
context:
space:
mode:
authorPradeepKiruvale <PRADEEPKIRUVALE@gmail.com>2021-12-21 14:44:20 +0530
committerGitHub <noreply@github.com>2021-12-21 14:44:20 +0530
commit888a4e70ca44d9b41d645148b7a24d89a984da7c (patch)
treeb18316cc05f02939df9c9935d828e3161a16b61b /crates/core/tedge_mapper
parent1b5102985f67f9795fbb841d754155de865336ef (diff)
[601] collectd multi value measurement (#724)
* multi value collectd message * add a multi value parse test * update test * fix typos * refactor * address review comments * refactor parse_from * cargo fmt * revert the changes * add invalid test * fix typo Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/collectd.rs143
-rw-r--r--crates/core/tedge_mapper/src/collectd_mapper/monitor.rs9
2 files changed, 108 insertions, 44 deletions
diff --git a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs
index 9d387314..80a18c62 100644
--- a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs
+++ b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs
@@ -53,7 +53,7 @@ impl CollectdMessage {
}
}
- pub fn parse_from(mqtt_message: &Message) -> Result<Self, CollectdError> {
+ pub fn parse_from(mqtt_message: &Message) -> Result<Vec<Self>, CollectdError> {
let topic = mqtt_message.topic.name.as_str();
let collectd_topic = match CollectdTopic::from_str(topic) {
Ok(collectd_topic) => collectd_topic,
@@ -69,12 +69,23 @@ impl CollectdMessage {
let collectd_payload = CollectdPayload::parse_from(payload)
.map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?;
- Ok(CollectdMessage {
- metric_group_key: collectd_topic.metric_group_key.to_string(),
- metric_key: collectd_topic.metric_key.to_string(),
- timestamp: collectd_payload.timestamp(),
- metric_value: collectd_payload.metric_value,
- })
+ let num_measurements = collectd_payload.metric_values.len();
+ let mut collectd_mssages: Vec<CollectdMessage> = Vec::with_capacity(num_measurements);
+
+ for (i, value) in collectd_payload.metric_values.iter().enumerate() {
+ let mut metric_key = collectd_topic.metric_key.to_string();
+ // If there are multiple values, then create unique keys metric_key_val1, metric_key_val2 etc.
+ if num_measurements > 1 {
+ metric_key = format!("{}_val{}", metric_key, i + 1);
+ }
+ collectd_mssages.push(CollectdMessage {
+ metric_group_key: collectd_topic.metric_group_key.to_string(),
+ metric_key,
+ timestamp: collectd_payload.timestamp(),
+ metric_value: *value,
+ });
+ }
+ Ok(collectd_mssages)
}
}
@@ -108,7 +119,7 @@ impl<'a> CollectdTopic<'a> {
#[derive(Debug)]
struct CollectdPayload {
timestamp: f64,
- metric_value: f64,
+ metric_values: Vec<f64>,
}
#[derive(thiserror::Error, Debug)]
@@ -125,33 +136,35 @@ pub enum CollectdPayloadError {
impl CollectdPayload {
fn parse_from(payload: &str) -> Result<Self, CollectdPayloadError> {
- let mut iter = payload.split(':');
+ let msg: Vec<&str> = payload.split(':').collect();
+ let vec_len = msg.len();
- let timestamp = iter.next().ok_or_else(|| {
- CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string())
- })?;
+ if vec_len <= 1 {
+ return Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(
+ payload.to_string(),
+ ));
+ }
- let timestamp = timestamp.parse::<f64>().map_err(|_err| {
- CollectdPayloadError::InvalidMeasurementTimestamp(timestamp.to_string())
+ // First element is always the timestamp
+ let timestamp = msg[0].parse::<f64>().map_err(|_err| {
+ CollectdPayloadError::InvalidMeasurementTimestamp(msg[0].to_string())
})?;
- let metric_value = iter.next().ok_or_else(|| {
- CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string())
- })?;
+ let mut metric_values: Vec<f64> = Vec::with_capacity(vec_len - 1);
- let metric_value = metric_value.parse::<f64>().map_err(|_err| {
- CollectdPayloadError::InvalidMeasurementValue(metric_value.to_string())
- })?;
+ // Process the values
+ for i in 1..vec_len {
+ let value = msg[i].parse::<f64>().map_err(|_err| {
+ CollectdPayloadError::InvalidMeasurementValue(msg[i].to_string())
+ })?;
- match iter.next() {
- None => Ok(CollectdPayload {
- timestamp,
- metric_value,
- }),
- Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(
- payload.to_string(),
- )),
+ metric_values.push(value);
}
+
+ Ok(CollectdPayload {
+ timestamp,
+ metric_values,
+ })
}
pub fn timestamp(&self) -> DateTime<Utc> {
@@ -175,6 +188,8 @@ impl Batchable for CollectdMessage {
#[cfg(test)]
mod tests {
+ use std::ops::Index;
+
use assert_matches::assert_matches;
use chrono::TimeZone;
use mqtt_client::Topic;
@@ -193,15 +208,52 @@ mod tests {
metric_key,
timestamp,
metric_value,
- } = collectd_message;
-
+ } = collectd_message.index(0);
assert_eq!(metric_group_key, "temperature");
+
assert_eq!(metric_key, "value");
assert_eq!(
+ *timestamp,
+ Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0)
+ );
+ assert_eq!(*metric_value, 32.5);
+ }
+
+ #[test]
+ fn collectd_message_parsing_multi_valued_measurement() {
+ let topic = Topic::new("collectd/localhost/temperature/value").unwrap();
+ let mqtt_message = Message::new(&topic, "123456789:32.5:45.2");
+
+ let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap();
+
+ let CollectdMessage {
+ metric_group_key,
+ metric_key,
+ timestamp,
+ metric_value,
+ } = collectd_message.index(0);
+ assert_eq!(metric_group_key, "temperature");
+
+ assert_eq!(metric_key, "value_val1");
+ assert_eq!(
+ *timestamp,
+ Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0)
+ );
+
+ let CollectdMessage {
+ metric_group_key,
+ metric_key,
timestamp,
+ metric_value,
+ } = collectd_message.index(1);
+
+ assert_eq!(metric_group_key, "temperature");
+ assert_eq!(metric_key, "value_val2");
+ assert_eq!(
+ *timestamp,
Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0)
);
- assert_eq!(metric_value, 32.5);
+ assert_eq!(*metric_value, 45.2);
}
#[test]
@@ -216,15 +268,15 @@ mod tests {
metric_key,
timestamp,
metric_value,
- } = collectd_message;
+ } = collectd_message.index(0);
assert_eq!(metric_group_key, "temperature");
assert_eq!(metric_key, "value");
assert_eq!(
- timestamp,
+ *timestamp,
Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 125)
);
- assert_eq!(metric_value, 32.5);
+ assert_eq!(*metric_value, 32.5);
}
#[test]
@@ -273,19 +325,19 @@ mod tests {
}
#[test]
- fn invalid_collectd_payload_more_separators() {
- let payload = "123456789:98.6:abc";
+ fn invalid_collectd_metric_value() {
+ let payload = "123456789:abc";
let result = CollectdPayload::parse_from(payload);
assert_matches!(
result,
- Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_))
+ Err(CollectdPayloadError::InvalidMeasurementValue(_))
);
}
#[test]
- fn invalid_collectd_metric_value() {
- let payload = "123456789:abc";
+ fn invalid_collectd_metric_multi_value() {
+ let payload = "123456789:96.6:abc";
let result = CollectdPayload::parse_from(payload);
assert_matches!(
@@ -295,6 +347,15 @@ mod tests {
}
#[test]
+ fn valid_collectd_multivalue_metric() {
+ let payload = "123456789:1234:5678";
+ let result = CollectdPayload::parse_from(payload).unwrap();
+
+ assert_eq!(result.timestamp, 123456789.0);
+ assert_eq!(result.metric_values, vec![1234.0, 5678.0]);
+ }
+
+ #[test]
fn invalid_collectd_metric_timestamp() {
let payload = "abc:98.6";
let result = CollectdPayload::parse_from(payload);
@@ -310,7 +371,7 @@ mod tests {
let payload: String = format!("123456789:{}", u128::MAX);
let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap();
- assert_eq!(collectd_payload.metric_value, u128::MAX as f64);
+ assert_eq!(*collectd_payload.metric_values.index(0), u128::MAX as f64);
}
#[test]
@@ -318,6 +379,6 @@ mod tests {
let payload: String = format!("123456789:{}", i128::MIN);
let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap();
- assert_eq!(collectd_payload.metric_value, i128::MIN as f64);
+ assert_eq!(*collectd_payload.metric_values.index(0), i128::MIN as f64);
}
}
diff --git a/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs b/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs
index bc2f1027..76e6a144 100644
--- a/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs
+++ b/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs
@@ -96,9 +96,11 @@ impl DeviceMonitor {
while let Some(message) = collectd_messages.next().await {
match CollectdMessage::parse_from(&message) {
Ok(collectd_message) => {
- let batch_input = BatchDriverInput::Event(collectd_message);
- if let Err(err) = msg_send.send(batch_input).await {
- error!("Error while processing a collectd message: {}", err);
+ for msg in collectd_message {
+ let batch_input = BatchDriverInput::Event(msg);
+ if let Err(err) = msg_send.send(batch_input).await {
+ error!("Error while processing a collectd message: {}", err);
+ }
}
}
Err(err) => {
@@ -106,6 +108,7 @@ impl DeviceMonitor {
}
}
}
+
// The MQTT connection has been closed by the process itself.
info!("Stop batching");
let eof = BatchDriverInput::Flush;