diff options
author | PradeepKiruvale <PRADEEPKIRUVALE@gmail.com> | 2021-12-21 14:44:20 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-21 14:44:20 +0530 |
commit | 888a4e70ca44d9b41d645148b7a24d89a984da7c (patch) | |
tree | b18316cc05f02939df9c9935d828e3161a16b61b /crates/core/tedge_mapper | |
parent | 1b5102985f67f9795fbb841d754155de865336ef (diff) |
[601] collectd multi value measurement (#724)
* multi value collectd message
* add a multi value parse test
* update test
* fix typos
* refactor
* address review comments
* refactor parse_from
* cargo fmt
* revert the changes
* add invalid test
* fix typo
Co-authored-by: Pradeep Kumar K J <pradeepkumar.kj@sofwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper')
-rw-r--r-- | crates/core/tedge_mapper/src/collectd_mapper/collectd.rs | 143 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd_mapper/monitor.rs | 9 |
2 files changed, 108 insertions, 44 deletions
diff --git a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs index 9d387314..80a18c62 100644 --- a/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs +++ b/crates/core/tedge_mapper/src/collectd_mapper/collectd.rs @@ -53,7 +53,7 @@ impl CollectdMessage { } } - pub fn parse_from(mqtt_message: &Message) -> Result<Self, CollectdError> { + pub fn parse_from(mqtt_message: &Message) -> Result<Vec<Self>, CollectdError> { let topic = mqtt_message.topic.name.as_str(); let collectd_topic = match CollectdTopic::from_str(topic) { Ok(collectd_topic) => collectd_topic, @@ -69,12 +69,23 @@ impl CollectdMessage { let collectd_payload = CollectdPayload::parse_from(payload) .map_err(|err| CollectdError::InvalidMeasurementPayload(topic.into(), err))?; - Ok(CollectdMessage { - metric_group_key: collectd_topic.metric_group_key.to_string(), - metric_key: collectd_topic.metric_key.to_string(), - timestamp: collectd_payload.timestamp(), - metric_value: collectd_payload.metric_value, - }) + let num_measurements = collectd_payload.metric_values.len(); + let mut collectd_mssages: Vec<CollectdMessage> = Vec::with_capacity(num_measurements); + + for (i, value) in collectd_payload.metric_values.iter().enumerate() { + let mut metric_key = collectd_topic.metric_key.to_string(); + // If there are multiple values, then create unique keys metric_key_val1, metric_key_val2 etc. + if num_measurements > 1 { + metric_key = format!("{}_val{}", metric_key, i + 1); + } + collectd_mssages.push(CollectdMessage { + metric_group_key: collectd_topic.metric_group_key.to_string(), + metric_key, + timestamp: collectd_payload.timestamp(), + metric_value: *value, + }); + } + Ok(collectd_mssages) } } @@ -108,7 +119,7 @@ impl<'a> CollectdTopic<'a> { #[derive(Debug)] struct CollectdPayload { timestamp: f64, - metric_value: f64, + metric_values: Vec<f64>, } #[derive(thiserror::Error, Debug)] @@ -125,33 +136,35 @@ pub enum CollectdPayloadError { impl CollectdPayload { fn parse_from(payload: &str) -> Result<Self, CollectdPayloadError> { - let mut iter = payload.split(':'); + let msg: Vec<&str> = payload.split(':').collect(); + let vec_len = msg.len(); - let timestamp = iter.next().ok_or_else(|| { - CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string()) - })?; + if vec_len <= 1 { + return Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( + payload.to_string(), + )); + } - let timestamp = timestamp.parse::<f64>().map_err(|_err| { - CollectdPayloadError::InvalidMeasurementTimestamp(timestamp.to_string()) + // First element is always the timestamp + let timestamp = msg[0].parse::<f64>().map_err(|_err| { + CollectdPayloadError::InvalidMeasurementTimestamp(msg[0].to_string()) })?; - let metric_value = iter.next().ok_or_else(|| { - CollectdPayloadError::InvalidMeasurementPayloadFormat(payload.to_string()) - })?; + let mut metric_values: Vec<f64> = Vec::with_capacity(vec_len - 1); - let metric_value = metric_value.parse::<f64>().map_err(|_err| { - CollectdPayloadError::InvalidMeasurementValue(metric_value.to_string()) - })?; + // Process the values + for i in 1..vec_len { + let value = msg[i].parse::<f64>().map_err(|_err| { + CollectdPayloadError::InvalidMeasurementValue(msg[i].to_string()) + })?; - match iter.next() { - None => Ok(CollectdPayload { - timestamp, - metric_value, - }), - Some(_) => Err(CollectdPayloadError::InvalidMeasurementPayloadFormat( - payload.to_string(), - )), + metric_values.push(value); } + + Ok(CollectdPayload { + timestamp, + metric_values, + }) } pub fn timestamp(&self) -> DateTime<Utc> { @@ -175,6 +188,8 @@ impl Batchable for CollectdMessage { #[cfg(test)] mod tests { + use std::ops::Index; + use assert_matches::assert_matches; use chrono::TimeZone; use mqtt_client::Topic; @@ -193,15 +208,52 @@ mod tests { metric_key, timestamp, metric_value, - } = collectd_message; - + } = collectd_message.index(0); assert_eq!(metric_group_key, "temperature"); + assert_eq!(metric_key, "value"); assert_eq!( + *timestamp, + Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0) + ); + assert_eq!(*metric_value, 32.5); + } + + #[test] + fn collectd_message_parsing_multi_valued_measurement() { + let topic = Topic::new("collectd/localhost/temperature/value").unwrap(); + let mqtt_message = Message::new(&topic, "123456789:32.5:45.2"); + + let collectd_message = CollectdMessage::parse_from(&mqtt_message).unwrap(); + + let CollectdMessage { + metric_group_key, + metric_key, + timestamp, + metric_value, + } = collectd_message.index(0); + assert_eq!(metric_group_key, "temperature"); + + assert_eq!(metric_key, "value_val1"); + assert_eq!( + *timestamp, + Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0) + ); + + let CollectdMessage { + metric_group_key, + metric_key, timestamp, + metric_value, + } = collectd_message.index(1); + + assert_eq!(metric_group_key, "temperature"); + assert_eq!(metric_key, "value_val2"); + assert_eq!( + *timestamp, Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 0) ); - assert_eq!(metric_value, 32.5); + assert_eq!(*metric_value, 45.2); } #[test] @@ -216,15 +268,15 @@ mod tests { metric_key, timestamp, metric_value, - } = collectd_message; + } = collectd_message.index(0); assert_eq!(metric_group_key, "temperature"); assert_eq!(metric_key, "value"); assert_eq!( - timestamp, + *timestamp, Utc.ymd(1973, 11, 29).and_hms_milli(21, 33, 09, 125) ); - assert_eq!(metric_value, 32.5); + assert_eq!(*metric_value, 32.5); } #[test] @@ -273,19 +325,19 @@ mod tests { } #[test] - fn invalid_collectd_payload_more_separators() { - let payload = "123456789:98.6:abc"; + fn invalid_collectd_metric_value() { + let payload = "123456789:abc"; let result = CollectdPayload::parse_from(payload); assert_matches!( result, - Err(CollectdPayloadError::InvalidMeasurementPayloadFormat(_)) + Err(CollectdPayloadError::InvalidMeasurementValue(_)) ); } #[test] - fn invalid_collectd_metric_value() { - let payload = "123456789:abc"; + fn invalid_collectd_metric_multi_value() { + let payload = "123456789:96.6:abc"; let result = CollectdPayload::parse_from(payload); assert_matches!( @@ -295,6 +347,15 @@ mod tests { } #[test] + fn valid_collectd_multivalue_metric() { + let payload = "123456789:1234:5678"; + let result = CollectdPayload::parse_from(payload).unwrap(); + + assert_eq!(result.timestamp, 123456789.0); + assert_eq!(result.metric_values, vec![1234.0, 5678.0]); + } + + #[test] fn invalid_collectd_metric_timestamp() { let payload = "abc:98.6"; let result = CollectdPayload::parse_from(payload); @@ -310,7 +371,7 @@ mod tests { let payload: String = format!("123456789:{}", u128::MAX); let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - assert_eq!(collectd_payload.metric_value, u128::MAX as f64); + assert_eq!(*collectd_payload.metric_values.index(0), u128::MAX as f64); } #[test] @@ -318,6 +379,6 @@ mod tests { let payload: String = format!("123456789:{}", i128::MIN); let collectd_payload = CollectdPayload::parse_from(payload.as_str()).unwrap(); - assert_eq!(collectd_payload.metric_value, i128::MIN as f64); + assert_eq!(*collectd_payload.metric_values.index(0), i128::MIN as f64); } } diff --git a/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs b/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs index bc2f1027..76e6a144 100644 --- a/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs +++ b/crates/core/tedge_mapper/src/collectd_mapper/monitor.rs @@ -96,9 +96,11 @@ impl DeviceMonitor { while let Some(message) = collectd_messages.next().await { match CollectdMessage::parse_from(&message) { Ok(collectd_message) => { - let batch_input = BatchDriverInput::Event(collectd_message); - if let Err(err) = msg_send.send(batch_input).await { - error!("Error while processing a collectd message: {}", err); + for msg in collectd_message { + let batch_input = BatchDriverInput::Event(msg); + if let Err(err) = msg_send.send(batch_input).await { + error!("Error while processing a collectd message: {}", err); + } } } Err(err) => { @@ -106,6 +108,7 @@ impl DeviceMonitor { } } } + // The MQTT connection has been closed by the process itself. info!("Stop batching"); let eof = BatchDriverInput::Flush; |