1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
use clock::Timestamp;
use mqtt_channel::Payload;
use thin_edge_json::{
group::{MeasurementGroup, MeasurementGrouper},
measurement::MeasurementVisitor,
serialize::ThinEdgeJsonSerializer,
};
use crate::collectd_mapper::{collectd::CollectdMessage, error::DeviceMonitorError};
use thin_edge_json::group::MeasurementGrouperError;
#[derive(Debug)]
pub struct MessageBatch {
message_grouper: MeasurementGrouper,
}
impl MessageBatch {
pub fn thin_edge_json_bytes(
messages: Vec<CollectdMessage>,
) -> Result<Payload, DeviceMonitorError> {
let mut messages = messages.into_iter();
if let Some(first_message) = messages.next() {
let timestamp = first_message.timestamp;
let mut batch = MessageBatch::start_batch(first_message, timestamp)?;
for message in messages {
batch.add_to_batch(message)?;
}
let measurements = batch.end_batch()?;
let mut tedge_json_serializer = ThinEdgeJsonSerializer::new();
measurements.accept(&mut tedge_json_serializer)?;
let payload = tedge_json_serializer.bytes()?;
Ok(payload)
} else {
Err(DeviceMonitorError::FromInvalidThinEdgeJson(
MeasurementGrouperError::UnexpectedEnd,
))
}
}
fn start_batch(
collectd_message: CollectdMessage,
timestamp: Timestamp,
) -> Result<Self, DeviceMonitorError> {
let mut message_grouper = MeasurementGrouper::new();
message_grouper.visit_timestamp(timestamp)?;
let mut message_batch = Self { message_grouper };
message_batch.add_to_batch(collectd_message)?;
Ok(message_batch)
}
fn add_to_batch(
&mut self,
collectd_message: CollectdMessage,
) -> Result<(), DeviceMonitorError> {
collectd_message.accept(&mut self.message_grouper)?;
Ok(())
}
fn end_batch(self) -> Result<MeasurementGroup, DeviceMonitorError> {
Ok(self.message_grouper.end()?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use clock::{Clock, WallClock};
use time::macros::datetime;
#[test]
fn test_message_batch_processor() -> anyhow::Result<()> {
let timestamp = datetime!(2015-05-15 0:00:01.444 UTC);
let collectd_message = CollectdMessage::new("temperature", "value", 32.5, timestamp);
let mut message_batch = MessageBatch::start_batch(collectd_message, WallClock.now())?;
let collectd_message = CollectdMessage::new("coordinate", "x", 50.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
let collectd_message = CollectdMessage::new("coordinate", "y", 70.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
let collectd_message = CollectdMessage::new("pressure", "value", 98.2, timestamp);
message_batch.add_to_batch(collectd_message)?;
let collectd_message = CollectdMessage::new("coordinate", "z", 90.0, timestamp);
message_batch.add_to_batch(collectd_message)?;
let message_group = message_batch.end_batch()?;
assert_matches!(message_group.timestamp(), Some(_));
assert_eq!(
message_group.get_measurement_value(Some("temperature"), "value"),
Some(32.5)
);
assert_eq!(
message_group.get_measurement_value(Some("pressure"), "value"),
Some(98.2)
);
assert_eq!(
message_group.get_measurement_value(Some("coordinate"), "x"),
Some(50.0)
);
assert_eq!(
message_group.get_measurement_value(Some("coordinate"), "y"),
Some(70.0)
);
assert_eq!(
message_group.get_measurement_value(Some("coordinate"), "z"),
Some(90.0)
);
Ok(())
}
}
|