summaryrefslogtreecommitdiffstats
path: root/crates/core/thin_edge_json/src
diff options
context:
space:
mode:
authorLukasz Woznicki <75632179+makr11st@users.noreply.github.com>2021-11-24 20:54:56 +0000
committerGitHub <noreply@github.com>2021-11-24 20:54:56 +0000
commita4ffeccf60090e4456755bc53a6e3b8c8038e855 (patch)
tree9583f187114913a92866571920dd3bb205bd50a3 /crates/core/thin_edge_json/src
parent8217e80670e76dbf9168780f5e0545355a39f8f3 (diff)
Restructure directories of the workspace (#559)
* Restructure directories of the workspace * Rename c8y_translator_lib to c8y_translator * Update comment on how to get dummy plugin path Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/thin_edge_json/src')
-rw-r--r--crates/core/thin_edge_json/src/builder.rs104
-rw-r--r--crates/core/thin_edge_json/src/data.rs71
-rw-r--r--crates/core/thin_edge_json/src/group.rs356
-rw-r--r--crates/core/thin_edge_json/src/lib.rs10
-rw-r--r--crates/core/thin_edge_json/src/measurement.rs94
-rw-r--r--crates/core/thin_edge_json/src/parser.rs361
-rw-r--r--crates/core/thin_edge_json/src/serialize.rs252
-rw-r--r--crates/core/thin_edge_json/src/utils.rs50
8 files changed, 1298 insertions, 0 deletions
diff --git a/crates/core/thin_edge_json/src/builder.rs b/crates/core/thin_edge_json/src/builder.rs
new file mode 100644
index 00000000..098c7304
--- /dev/null
+++ b/crates/core/thin_edge_json/src/builder.rs
@@ -0,0 +1,104 @@
+use crate::{data::*, measurement::*};
+use chrono::prelude::*;
+
+/// A `MeasurementVisitor` that builds up `ThinEdgeJson`.
+pub struct ThinEdgeJsonBuilder {
+ timestamp: Option<DateTime<FixedOffset>>,
+ inside_group: Option<MultiValueMeasurement>,
+ measurements: Vec<ThinEdgeValue>,
+}
+
+impl ThinEdgeJsonBuilder {
+ pub fn new() -> Self {
+ Self {
+ timestamp: None,
+ inside_group: None,
+ measurements: Vec::new(),
+ }
+ }
+
+ pub fn done(self) -> Result<ThinEdgeJson, ThinEdgeJsonBuilderError> {
+ if self.inside_group.is_some() {
+ return Err(ThinEdgeJsonBuilderError::UnexpectedOpenGroup);
+ }
+
+ if self.measurements.is_empty() {
+ return Err(ThinEdgeJsonBuilderError::EmptyThinEdgeJsonRoot);
+ }
+
+ Ok(ThinEdgeJson {
+ timestamp: self.timestamp,
+ values: self.measurements,
+ })
+ }
+}
+
+impl MeasurementVisitor for ThinEdgeJsonBuilder {
+ type Error = ThinEdgeJsonBuilderError;
+
+ fn visit_timestamp(&mut self, value: DateTime<FixedOffset>) -> Result<(), Self::Error> {
+ match self.timestamp {
+ None => {
+ self.timestamp = Some(value);
+ Ok(())
+ }
+ Some(_) => Err(ThinEdgeJsonBuilderError::DuplicatedTimestamp),
+ }
+ }
+
+ fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error> {
+ if let Some(group) = &mut self.inside_group {
+ group.values.push((name, value).into());
+ } else {
+ self.measurements.push((name, value).into());
+ }
+ Ok(())
+ }
+
+ fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error> {
+ if self.inside_group.is_none() {
+ self.inside_group = Some(MultiValueMeasurement {
+ name: group.into(),
+ values: Vec::new(),
+ });
+ Ok(())
+ } else {
+ Err(ThinEdgeJsonBuilderError::UnexpectedStartOfGroup)
+ }
+ }
+
+ fn visit_end_group(&mut self) -> Result<(), Self::Error> {
+ match self.inside_group.take() {
+ Some(group) => {
+ if group.values.is_empty() {
+ return Err(ThinEdgeJsonBuilderError::EmptyThinEdgeJson { name: group.name });
+ } else {
+ self.measurements.push(ThinEdgeValue::Multi(group))
+ }
+ }
+ None => return Err(ThinEdgeJsonBuilderError::UnexpectedEndOfGroup),
+ }
+ Ok(())
+ }
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum ThinEdgeJsonBuilderError {
+ #[error("Empty Thin Edge measurement: it must contain at least one measurement")]
+ EmptyThinEdgeJsonRoot,
+
+ #[error("Empty Thin Edge measurement: {name:?} must contain at least one measurement")]
+ EmptyThinEdgeJson { name: String },
+
+ #[error("... time stamp within a group")]
+ DuplicatedTimestamp,
+
+ #[error("Unexpected open group")]
+ UnexpectedOpenGroup,
+
+ #[error("Unexpected start of group")]
+ UnexpectedStartOfGroup,
+
+ #[error("Unexpected end of group")]
+ UnexpectedEndOfGroup,
+}
diff --git a/crates/core/thin_edge_json/src/data.rs b/crates/core/thin_edge_json/src/data.rs
new file mode 100644
index 00000000..4fa25cf4
--- /dev/null
+++ b/crates/core/thin_edge_json/src/data.rs
@@ -0,0 +1,71 @@
+//! The in-memory data model representing ThinEdge JSON.
+
+use chrono::prelude::*;
+
+/// In-memory representation of parsed ThinEdge JSON.
+#[derive(Debug)]
+pub struct ThinEdgeJson {
+ pub timestamp: Option<DateTime<FixedOffset>>,
+ pub values: Vec<ThinEdgeValue>,
+}
+
+impl ThinEdgeJson {
+ pub fn has_timestamp(&self) -> bool {
+ self.timestamp.is_some()
+ }
+
+ pub fn set_timestamp(&mut self, timestamp: DateTime<FixedOffset>) {
+ self.timestamp = Some(timestamp)
+ }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum ThinEdgeValue {
+ Single(SingleValueMeasurement),
+ Multi(MultiValueMeasurement),
+}
+
+#[derive(Debug, PartialEq)]
+pub struct SingleValueMeasurement {
+ pub name: String,
+ pub value: f64,
+}
+
+#[derive(Debug, PartialEq)]
+pub struct MultiValueMeasurement {
+ pub name: String,
+ pub values: Vec<SingleValueMeasurement>,
+}
+
+impl<T> From<(T, f64)> for SingleValueMeasurement
+where
+ T: Into<String>,
+{
+ fn from((name, value): (T, f64)) -> Self {
+ SingleValueMeasurement {
+ name: name.into(),
+ value,
+ }
+ }
+}
+
+impl<T> From<(T, f64)> for ThinEdgeValue
+where
+ T: Into<String>,
+{
+ fn from((name, value): (T, f64)) -> Self {
+ ThinEdgeValue::Single((name, value).into())
+ }
+}
+
+impl<T> From<(T, Vec<SingleValueMeasurement>)> for ThinEdgeValue
+where
+ T: Into<String>,
+{
+ fn from((name, values): (T, Vec<SingleValueMeasurement>)) -> Self {
+ ThinEdgeValue::Multi(MultiValueMeasurement {
+ name: name.into(),
+ values,
+ })
+ }
+}
diff --git a/crates/core/thin_edge_json/src/group.rs b/crates/core/thin_edge_json/src/group.rs
new file mode 100644
index 00000000..17488a3a
--- /dev/null
+++ b/crates/core/thin_edge_json/src/group.rs
@@ -0,0 +1,356 @@
+use chrono::offset::FixedOffset;
+use chrono::DateTime;
+use std::collections::HashMap;
+
+use crate::measurement::MeasurementVisitor;
+
+#[derive(Debug)]
+pub struct MeasurementGroup {
+ timestamp: Option<DateTime<FixedOffset>>,
+ values: HashMap<String, Measurement>,
+}
+
+impl MeasurementGroup {
+ fn new() -> Self {
+ Self {
+ timestamp: None,
+ values: HashMap::new(),
+ }
+ }
+
+ pub fn timestamp(&self) -> Option<DateTime<FixedOffset>> {
+ self.timestamp
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.values.is_empty()
+ }
+
+ pub fn get_measurement_value(
+ &self,
+ group_key: Option<&str>,
+ measurement_key: &str,
+ ) -> Option<f64> {
+ match group_key {
+ Some(group_key) => match self.values.get(group_key) {
+ Some(Measurement::Multi(map)) => map.get(measurement_key).cloned(),
+ _ => None,
+ },
+ None => match self.values.get(measurement_key) {
+ Some(Measurement::Single(val)) => Some(*val),
+ _ => None,
+ },
+ }
+ }
+
+ pub fn accept<V, E>(&self, visitor: &mut V) -> Result<(), E>
+ where
+ V: MeasurementVisitor<Error = E>,
+ E: std::error::Error + std::fmt::Debug,
+ {
+ if let Some(timestamp) = self.timestamp {
+ visitor.visit_timestamp(timestamp)?;
+ }
+
+ for (key, value) in self.values.iter() {
+ match value {
+ Measurement::Single(sv) => {
+ visitor.visit_measurement(key, *sv)?;
+ }
+ Measurement::Multi(m) => {
+ visitor.visit_start_group(key)?;
+ for (key, value) in m.iter() {
+ visitor.visit_measurement(key, *value)?;
+ }
+ visitor.visit_end_group()?;
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+pub struct MeasurementGrouper {
+ measurement_group: MeasurementGroup,
+ group_state: GroupState,
+}
+
+/// Keeps track whether we are currently in a group or not.
+/// This serves the same purpose an `Option<String>` would do, just that
+/// the `String` is not allocated over and over again.
+#[derive(Debug)]
+struct GroupState {
+ in_group: bool,
+ group: String,
+}
+
+#[derive(Debug)]
+pub enum Measurement {
+ Single(f64),
+ Multi(HashMap<String, f64>),
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum MeasurementGrouperError {
+ #[error("Duplicated measurement: {0}")]
+ DuplicatedMeasurement(String),
+
+ #[error("Duplicated measurement: {0}.{1}")]
+ DuplicatedSubMeasurement(String, String),
+
+ #[error("Unexpected end")]
+ UnexpectedEnd,
+
+ #[error("Unexpected start of group")]
+ UnexpectedStartOfGroup,
+
+ #[error("Unexpected end of group")]
+ UnexpectedEndOfGroup,
+}
+
+impl MeasurementGrouper {
+ pub fn new() -> Self {
+ Self {
+ measurement_group: MeasurementGroup::new(),
+ group_state: GroupState {
+ in_group: false,
+ group: String::with_capacity(20),
+ },
+ }
+ }
+
+ pub fn end(self) -> Result<MeasurementGroup, MeasurementGrouperError> {
+ if self.group_state.in_group {
+ Err(MeasurementGrouperError::UnexpectedEnd)
+ } else {
+ Ok(self.measurement_group)
+ }
+ }
+}
+
+impl Default for MeasurementGrouper {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MeasurementVisitor for MeasurementGrouper {
+ type Error = MeasurementGrouperError;
+
+ fn visit_timestamp(&mut self, time: DateTime<FixedOffset>) -> Result<(), Self::Error> {
+ self.measurement_group.timestamp = Some(time);
+ Ok(())
+ }
+
+ fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error> {
+ if self.group_state.in_group {
+ Err(MeasurementGrouperError::UnexpectedStartOfGroup)
+ } else {
+ self.group_state.in_group = true;
+ self.group_state.group.replace_range(.., group);
+ Ok(())
+ }
+ }
+
+ fn visit_end_group(&mut self) -> Result<(), Self::Error> {
+ if self.group_state.in_group {
+ self.group_state.in_group = false;
+ self.group_state.group.clear();
+ Ok(())
+ } else {
+ Err(MeasurementGrouperError::UnexpectedEndOfGroup)
+ }
+ }
+
+ fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error> {
+ let key = name.to_owned();
+
+ match self.group_state.in_group {
+ false => {
+ self.measurement_group
+ .values
+ .insert(key, Measurement::Single(value));
+ Ok(())
+ }
+ true => {
+ let group_key = self.group_state.group.clone();
+ if let Measurement::Multi(group_map) = self
+ .measurement_group
+ .values
+ .entry(group_key)
+ .or_insert_with(|| Measurement::Multi(HashMap::new()))
+ {
+ group_map.insert(name.to_owned(), value);
+ }
+ Ok(())
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use chrono::prelude::*;
+ use mockall::predicate::*;
+ use mockall::*;
+
+ #[derive(thiserror::Error, Debug, Clone)]
+ pub enum TestError {
+ #[error("test")]
+ _Test,
+ }
+
+ mock! {
+ pub GroupedVisitor {
+ }
+
+ impl MeasurementVisitor for GroupedVisitor {
+ type Error = TestError;
+
+ fn visit_timestamp(&mut self, value: DateTime<FixedOffset>) -> Result<(), TestError>;
+ fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), TestError>;
+ fn visit_start_group(&mut self, group: &str) -> Result<(), TestError>;
+ fn visit_end_group(&mut self) -> Result<(), TestError>;
+ }
+ }
+
+ // XXX: These test cases should be split into those test cases that test the MeasurementGrouper and
+ // those that test the MeasurementGroup.
+ #[test]
+ fn new_measurement_grouper_is_empty() -> anyhow::Result<()> {
+ let grouper = MeasurementGrouper::new();
+ let group = grouper.end()?;
+ assert!(group.is_empty());
+
+ Ok(())
+ }
+
+ #[test]
+ fn empty_measurement_group_visits_nothing() -> anyhow::Result<()> {
+ let group = MeasurementGroup::new();
+
+ let mut mock = MockGroupedVisitor::new();
+ mock.expect_visit_measurement().never();
+ mock.expect_visit_start_group().never();
+ mock.expect_visit_end_group().never();
+
+ let _ = group.accept(&mut mock)?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn new_measurement_grouper_with_a_timestamp_is_empty() -> anyhow::Result<()> {
+ let mut grouper = MeasurementGrouper::new();
+ let _ = grouper.visit_timestamp(test_timestamp(4));
+
+ let group = grouper.end()?;
+ assert!(group.is_empty());
+
+ let mut mock = MockGroupedVisitor::new();
+ mock.expect_visit_timestamp().return_const(Ok(()));
+ mock.expect_visit_measurement().never();
+ mock.expect_visit_start_group().never();
+ mock.expect_visit_end_group().never();
+
+ let _ = group.accept(&mut mock);
+
+ Ok(())
+ }
+
+ #[test]
+ fn new_measurement_grouper_has_no_timestamp() -> anyhow::Result<()> {
+ let grouper = MeasurementGrouper::new();
+ let mut mock = MockGroupedVisitor::new();
+
+ mock.expect_visit_timestamp().never();
+ let group = grouper.end()?;
+ let _ = group.accept(&mut mock);
+
+ Ok(())
+ }
+
+ #[test]
+ fn measurement_grouper_forward_timestamp() -> anyhow::Result<()> {
+ let mut grouper = MeasurementGrouper::new();
+ let _ = grouper.visit_timestamp(test_timestamp(4));
+
+ let mut mock = MockGroupedVisitor::new();
+ mock.expect_visit_timestamp()
+ .times(1)
+ .with(eq(test_timestamp(4)))
+ .return_const(Ok(()));
+
+ let group = grouper.end()?;
+ let _ = group.accept(&mut mock);
+
+ Ok(())
+ }
+
+ #[test]
+ fn measurement_grouper_forward_only_the_latest_received_timestamp() -> anyhow::Result<()> {
+ let mut grouper = MeasurementGrouper::new();
+ let _ = grouper.visit_timestamp(test_timestamp(4));
+ let _ = grouper.visit_timestamp(test_timestamp(6));
+ let _ = grouper.visit_timestamp(test_timestamp(5));
+
+ let mut mock = MockGroupedVisitor::new();
+ mock.expect_visit_timestamp()
+ .times(1)
+ .with(eq(test_timestamp(5)))
+ .return_const(Ok(()));
+
+ let group = grouper.end()?;
+ let _ = group.accept(&mut mock);
+
+ Ok(())
+ }
+
+ #[test]
+ fn get_measurement_value() -> anyhow::Result<()> {
+ let mut grouper = MeasurementGrouper::new();
+ grouper.visit_measurement("temperature", 32.5)?;
+ grouper.visit_start_group("coordinate")?;
+ grouper.visit_measurement("x", 50.0)?;
+ grouper.visit_measurement("y", 70.0)?;
+ grouper.visit_measurement("z", 90.0)?;
+ grouper.visit_end_group()?;
+ grouper.visit_measurement("pressure", 98.2)?;
+
+ let group = grouper.end()?;
+
+ assert_eq!(
+ group.get_measurement_value(None, "temperature").unwrap(),
+ 32.5
+ );
+ assert_eq!(group.get_measurement_value(None, "pressure").unwrap(), 98.2);
+ assert_eq!(
+ group
+ .get_measurement_value(Some("coordinate"), "x")
+ .unwrap(),
+ 50.0
+ );
+ assert_eq!(
+ group
+ .get_measurement_value(Some("coordinate"), "y")
+ .unwrap(),
+ 70.0
+ );
+ assert_eq!(
+ group
+ .get_measurement_value(Some("coordinate"), "z")
+ .unwrap(),
+ 90.0
+ );
+
+ Ok(())
+ }
+
+ fn test_timestamp(minute: u32) -> DateTime<FixedOffset> {
+ FixedOffset::east(5 * 3600)
+ .ymd(2021, 4, 8)
+ .and_hms(13, minute, 00)
+ }
+}
diff --git a/crates/core/thin_edge_json/src/lib.rs b/crates/core/thin_edge_json/src/lib.rs
new file mode 100644
index 00000000..56664969
--- /dev/null
+++ b/crates/core/thin_edge_json/src/lib.rs
@@ -0,0 +1,10 @@
+//! A library to create [ThinEdgeJson][1] from bytes of json data by validating it.
+//! [1]: https://github.com/thin-edge/thin-edge.io/blob/main/docs/src/architecture/thin-edge-json.md
+
+pub mod builder;
+pub mod data;
+pub mod group;
+pub mod measurement;
+pub mod parser;
+pub mod serialize;
+pub mod utils;
diff --git a/crates/core/thin_edge_json/src/measurement.rs b/crates/core/thin_edge_json/src/measurement.rs
new file mode 100644
index 00000000..a458372d
--- /dev/null
+++ b/crates/core/thin_edge_json/src/measurement.rs
@@ -0,0 +1,94 @@
+use chrono::offset::FixedOffset;
+use chrono::DateTime;
+
+/// The `MeasurementVisitor` trait represents the capability to visit a series of measurements, possibly grouped.
+///
+/// Here is an implementation of the `MeasurementVisitor` trait that prints the measurements:
+///
+/// ```
+/// # use thin_edge_json::measurement::*;
+/// # use chrono::*;
+/// struct MeasurementPrinter {
+/// group: Option<String>,
+/// }
+///
+/// #[derive(thiserror::Error, Debug)]
+/// pub enum MeasurementError {
+/// #[error("Unexpected time stamp within a group")]
+/// UnexpectedTimestamp,
+///
+/// #[error("Unexpected end of group")]
+/// UnexpectedEndOfGroup,
+///
+/// #[error("Unexpected start of group")]
+/// UnexpectedStartOfGroup,
+/// }
+///
+/// impl MeasurementVisitor for MeasurementPrinter {
+/// type Error = MeasurementError;
+///
+/// fn visit_timestamp(&mut self, timestamp: DateTime<FixedOffset>) -> Result<(), Self::Error> {
+/// if self.group.is_none() {
+/// Ok(println!("time = {}", timestamp.to_rfc2822()))
+/// } else {
+/// Err(MeasurementError::UnexpectedTimestamp)
+/// }
+/// }
+///
+/// fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error> {
+/// if let Some(group_name) = self.group.as_ref() {
+/// Ok(println!("{}.{} = {}", group_name, name, value))
+/// } else {
+/// Ok(println!("{} = {}", name, value))
+/// }
+/// }
+///
+/// fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error> {
+/// if self.group.is_none() {
+/// self.group = Some(group.to_owned());
+/// Ok(())
+/// } else {
+/// Err(MeasurementError::UnexpectedStartOfGroup)
+/// }
+/// }
+///
+/// fn visit_end_group(&mut self) -> Result<(), Self::Error> {
+/// if self.group.is_none() {
+/// Err(MeasurementError::UnexpectedEndOfGroup)
+/// } else {
+/// self.group = None;
+/// Ok(())
+/// }
+/// }
+/// }
+/// ```
+pub trait MeasurementVisitor {
+ /// Error type specific to this visitor.
+ type Error: std::error::Error + std::fmt::Debug;
+
+ /// Set the timestamp shared by all the measurements of this series.
+ fn visit_timestamp(&mut self, value: DateTime<FixedOffset>) -> Result<(), Self::Error>;
+
+ /// Add a new measurement, attached to the current group if any.
+ fn visit_measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error>;
+
+ /// Start to gather measurements for a group.
+ fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error>;
+
+ /// End to gather measurements for the current group.
+ fn visit_end_group(&mut self) -> Result<(), Self::Error>;
+
+ /// A single measurement contained in `group`. Defaults to a sequence of
+ /// `visit_start_group`, `visit_measurement` and `visit_end_group`.
+ fn visit_grouped_measurement(
+ &mut self,
+ group: &str,
+ name: &str,
+ value: f64,
+ ) -> Result<(), Self::Error> {
+ let () = self.visit_start_group(group)?;
+ let () = self.visit_measurement(name, value)?;
+ let () = self.visit_end_group()?;
+ Ok(())
+ }
+}
diff --git a/crates/core/thin_edge_json/src/parser.rs b/crates/core/thin_edge_json/src/parser.rs
new file mode 100644
index 00000000..3d41ced3
--- /dev/null
+++ b/crates/core/thin_edge_json/src/parser.rs
@@ -0,0 +1,361 @@
+//! A streaming, almost non-allocating [^1] ThinEdge JSON parser using `serde`.
+//!
+//! [^1]: It only allocates in presence of escaped strings as keys.
+//!
+use crate::measurement::MeasurementVisitor;
+use chrono::prelude::*;
+use serde::{
+ de::{self, DeserializeSeed, MapAccess},
+ Deserializer,
+};
+use std::borrow::Cow;
+use std::convert::TryFrom;
+use std::fmt;
+
+/// Parses `input` as ThinEdge JSON yielding the parsed measurements to the `visitor`.
+pub fn parse_str<T: MeasurementVisitor>(
+ input: &str,
+ visitor: &mut T,
+) -> Result<(), ThinEdgeJsonParserError> {
+ let mut deserializer = serde_json::Deserializer::from_str(input);
+
+ let parser = ThinEdgeJsonParser { visitor };
+
+ let () = deserializer
+ .deserialize_map(parser)
+ .map_err(|error| map_error(error, input))?;
+ Ok(())
+}
+
+/// The error returned by `parse_str`.
+#[derive(Debug, thiserror::Error)]
+#[error("Invalid JSON: {error}: `{input_excerpt}`")]
+pub struct ThinEdgeJsonParserError {
+ /// The underlying serde error.
+ error: serde_json::Error,
+ /// An excerpt from the input string near the error location.
+ input_excerpt: String,
+}
+
+/// Parses top-level ThinEdge JSON:
+///
+/// ```grammar
+/// {
+/// time?: string,
+/// [key: string]: number | {[key: string]: number},
+/// }
+/// ```
+///
+struct ThinEdgeJsonParser<'vis, T>
+where
+ T: MeasurementVisitor,
+{
+ visitor: &'vis mut T,
+}
+
+/// Parses a single value (number) or multi-value measurement:
+///
+/// ```grammar
+/// number | {[key: string]: number}
+/// ```
+///
+struct ThinEdgeValueParser<'key, 'vis, T> {
+ /// Recursion depth.
+ ///
+ /// When `depth = 0`, we accept both number or multi-value measurements.
+ /// When `depth > 0`, we only accept numbers.
+ depth: usize,
+ /// The associated key of the single or multi-value measurement.
+ key: Cow<'key, str>,
+ /// The visitor to callback into when parsing relevant data.
+ visitor: &'vis mut T,
+}
+
+impl<'vis, 'de, T> de::Visitor<'de> for ThinEdgeJsonParser<'vis, T>
+where
+ T: MeasurementVisitor,
+{
+ type Value = ();
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter.write_str("ThinEdge JSON")
+ }
+
+ fn visit_map<V>(self, mut map: V) -> Result<(), V::Error>
+ where
+ V: MapAccess<'de>,
+ {
+ let mut measurements_count: usize = 0;
+
+ while let Some(key) = map.next_key()? {
+ let key: Cow<str> = key;
+
+ match key.as_ref() {
+ "type" => return Err(de::Error::custom(invalid_measurement_name("type"))),
+ "externalSource" => {
+ return Err(de::Error::custom(invalid_measurement_name(
+ "externalSource",
+ )))
+ }
+ "time" => {
+ let timestamp_str: &str = map.next_value()?;
+ let timestamp = DateTime::parse_from_rfc3339(timestamp_str)
+ .map_err(|err| de::Error::custom(invalid_timestamp(timestamp_str, err)))?;
+
+ let () = self
+ .visitor
+ .visit_timestamp(timestamp)
+ .map_err(de::Error::custom)?;
+ }
+ _key => {
+ let parser = ThinEdgeValueParser {
+ depth: 0,
+ key,
+ visitor: self.visitor,
+ };
+
+ let () = map.next_value_seed(parser)?;
+ measurements_count += 1;
+ }
+ }
+ }
+
+ if measurements_count == 0 {
+ return Err(de::Error::custom(invalid_empty_root()));
+ }
+
+ Ok(())
+ }
+}
+
+impl<'key, 'vis, 'de, T> de::Visitor<'de> for ThinEdgeValueParser<'key, 'vis, T>
+where
+ T: MeasurementVisitor,
+{
+ type Value = ();
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ if self.depth == 0 {
+ formatter.write_str("ThinEdge single or multi-value measurement")
+ } else {
+ formatter.write_str("ThinEdge single-value measurement")
+ }
+ }
+
+ /// Parses a multi-value measurement: `{[string]: number}` or fails if depth > 0.
+ ///
+ fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
+ where
+ A: MapAccess<'de>,
+ {
+ // To support arbitrarily nested measurements remove the following line.
+ if self.depth > 0 {
+ return Err(de::Error::custom("Expect single-value measurement"));
+ }
+
+ let () = self
+ .visitor
+ .visit_start_group(self.key.as_ref())
+ .map_err(de::Error::custom)?;
+
+ let mut measurements_count: usize = 0;
+
+ while let Some(key) = map.next_key()? {
+ let parser = ThinEdgeValueParser {
+ depth: self.depth + 1,
+ key,
+ visitor: self.visitor,
+ };
+
+ let () = map.next_value_seed(parser)?;
+ measurements_count += 1;
+ }
+
+ if measurements_count == 0 {
+ return Err(de::Error::custom(invalid_empty_measurement(&self.key)));
+ }
+
+ let () = self.visitor.visit_end_group().map_err(de::Error::custom)?;
+
+ Ok(())
+ }
+
+ /// Parses a single-value measurement.
+ ///
+ /// `serde_json` requires us to handle three cases:
+ /// - floating point numbers (f64),
+ /// - negative integers (i64) and
+ /// - positive integers (u64).
+ ///
+ /// See `visit_i64` and `visit_u64`.
+ ///
+ /// For JSON `1.0`, serde_json will call `visit_f64`.
+ /// For JSON `-31`, serde_json will call `visit_i64`.
+ /// For JSON `420`, serde_json will call `visit_u64`.
+ ///
+ fn visit_f64<E>(self, value: f64) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ if value != 0.0 && !value.is_normal() {
+ return Err(de::Error::custom(invalid_json_number(&self.key)));
+ }
+
+ let () = self
+ .visitor
+ .visit_measurement(self.key.as_ref(), value)
+ .map_err(de::Error::custom)?;
+
+ Ok(())
+ }
+
+ /// Parses a single-value measurement. See `visit_f64`.
+ fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ let value = i32::try_from(value)
+ .map_err(|_| de::Error::custom(invalid_json_number(&self.key)))?
+ .into();
+
+ self.visit_f64(value)
+ }
+
+ /// Parses a single-value measurement. See `visit_f64`.
+ fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ let value = u32::try_from(value)
+ .map_err(|_| de::Error::custom(invalid_json_number(&self.key)))?
+ .into();
+
+ self.visit_f64(value)
+ }
+}
+
+/// The `DeserializeSeed` trait enables us to inject state required for deserialization. In our case
+/// the state is the `visitor` that we want to use for callbacks and the `key` that we are currently
+/// parsing.
+///
+/// As we are passing the parsed data over to the embedded visitor, all of our parsers do not
+/// produce a value, so we use the empty tuple type.
+impl<'key, 'vis, 'de, T> DeserializeSeed<'de> for ThinEdgeValueParser<'key, 'vis, T>
+where
+ T: MeasurementVisitor,
+{
+ type Value = ();
+
+ fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ // Use `self` as `de::Visitor`
+ deserializer.deserialize_any(self)
+ }