summaryrefslogtreecommitdiffstats
path: root/crates/core/c8y_translator
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/c8y_translator')
-rw-r--r--crates/core/c8y_translator/Cargo.toml30
-rw-r--r--crates/core/c8y_translator/benches/thin_edge_json.rs135
-rw-r--r--crates/core/c8y_translator/examples/multi_value_translation.rs28
-rw-r--r--crates/core/c8y_translator/examples/single_value_translation.rs24
-rw-r--r--crates/core/c8y_translator/examples/translate_to_c8yjson.rs27
-rw-r--r--crates/core/c8y_translator/fuzz/.gitignore4
-rw-r--r--crates/core/c8y_translator/fuzz/Cargo.toml26
-rw-r--r--crates/core/c8y_translator/fuzz/README.md9
-rw-r--r--crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs8
-rw-r--r--crates/core/c8y_translator/src/json.rs280
-rw-r--r--crates/core/c8y_translator/src/lib.rs2
-rw-r--r--crates/core/c8y_translator/src/serializer.rs403
12 files changed, 976 insertions, 0 deletions
diff --git a/crates/core/c8y_translator/Cargo.toml b/crates/core/c8y_translator/Cargo.toml
new file mode 100644
index 00000000..f4dbdd7d
--- /dev/null
+++ b/crates/core/c8y_translator/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "c8y_translator"
+version = "0.4.3"
+authors = ["thin-edge.io team <info@thin-edge.io>"]
+edition = "2018"
+
+[dependencies]
+chrono = "0.4"
+clock = { path = "../../common/clock" }
+json-writer = { path = "../../common/json_writer" }
+thin_edge_json = { path = "../thin_edge_json" }
+thiserror = "1.0"
+
+[dev-dependencies]
+anyhow = "1.0"
+assert_matches = "1.5"
+assert-json-diff = "2.0"
+criterion = "0.3"
+pretty_assertions = "1.0"
+proptest = "1.0"
+serde_json = "1.0"
+test-case = "1.2"
+
+[features]
+# use: #[cfg(feature="integration-test")]
+integration-test = []
+
+[[bench]]
+name = "thin_edge_json"
+harness = false
diff --git a/crates/core/c8y_translator/benches/thin_edge_json.rs b/crates/core/c8y_translator/benches/thin_edge_json.rs
new file mode 100644
index 00000000..76bbd692
--- /dev/null
+++ b/crates/core/c8y_translator/benches/thin_edge_json.rs
@@ -0,0 +1,135 @@
+use c8y_translator::json;
+use criterion::{criterion_group, criterion_main, Criterion};
+
+pub fn criterion_benchmark(c: &mut Criterion) {
+ translate_ref_measurement(c);
+ translate_2_measurements(c);
+ translate_50_measurements(c);
+ translate_17x3_multi_measurements(c);
+}
+
+const REFERENCE_THIN_EDGE_JSON: &str = r#"{
+ "time": "2021-06-22T17:03:14.123456789+05:00",
+ "temperature": 25.01,
+ "location": {
+ "latitude": 32.54,
+ "longitude": -117.67,
+ "altitude": 98.6
+ },
+ "pressure": 98.01
+ }"#;
+
+fn translate_ref_measurement(c: &mut Criterion) {
+ let id = "Translate reference measurement";
+ sanity_check_translate_reference_thin_edge_json()
+ .expect("Expect a valid thin-edge-json message");
+
+ c.bench_function(id, |b| {
+ b.iter(|| json::from_thin_edge_json(REFERENCE_THIN_EDGE_JSON))
+ });
+}
+
+fn translate_2_measurements(c: &mut Criterion) {
+ let id = "Translate 2 measurements";
+ let message = r#"{
+ "temperature": 12.34,
+ "pressure": 56.78
+ }"#;
+ sanity_check(message);
+
+ c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(message)));
+}
+
+fn translate_50_measurements(c: &mut Criterion) {
+ let id = "Translate 50 measurements";
+ let message = flat_message(50);
+ sanity_check(&message);
+
+ c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(&message)));
+}
+
+fn translate_17x3_multi_measurements(c: &mut Criterion) {
+ let id = "Translate 17x3 multi-measurements";
+ let message = group_message(17, 3);
+ sanity_check(&message);
+
+ c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(&message)));
+}
+
+fn flat_message(n: u64) -> String {
+ let mut message = String::with_capacity(5000);
+ let mut sep = "{";
+ for i in 0..n {
+ message.push_str(&format!("{}\n\t\"measurement_{}\" : {}", sep, i, i * 10));
+ sep = ","
+ }
+ message.push_str("\n}");
+ message
+}
+
+fn group_message(n_grp: u64, n_per_grp: u64) -> String {
+ let mut message = String::with_capacity(5000);
+ let mut sep = "{";
+ for i in 0..n_grp {
+ message.push_str(&format!("{}\n\t\"group_{}\" : {{", sep, i));
+ sep = "";
+ for j in 0..n_per_grp {
+ message.push_str(&format!(
+ "{}\n\t\"measurement_{}_{}\" : {}",
+ sep,
+ i,
+ j,
+ i * j
+ ));
+ sep = ","
+ }
+ message.push_str("\n\t}");
+ sep = ","
+ }
+ message.push_str("\n}");
+ message
+}
+
+fn sanity_check(message: &str) {
+ json::from_thin_edge_json(message).expect("Expect a valid thin-edge-json message");
+}
+
+fn sanity_check_translate_reference_thin_edge_json() -> Result<(), anyhow::Error> {
+ let output = json::from_thin_edge_json(REFERENCE_THIN_EDGE_JSON)?;
+
+ let simple_c8y_json = serde_json::json!({
+ "type": "ThinEdgeMeasurement",
+ "time": "2021-06-22T17:03:14.123456789+05:00",
+ "temperature": {
+ "temperature": {
+ "value": 25.01
+ }
+ },
+ "location": {
+ "latitude": {
+ "value": 32.54
+ },
+ "longitude": {
+ "value": -117.67
+ },
+ "altitude": {
+ "value": 98.6
+ }
+ },
+ "pressure": {
+ "pressure": {
+ "value": 98.01
+ }
+ }
+ });
+
+ assert_json_diff::assert_json_eq!(
+ serde_json::from_slice::<serde_json::Value>(output.as_bytes())?,
+ simple_c8y_json
+ );
+
+ Ok(())
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/crates/core/c8y_translator/examples/multi_value_translation.rs b/crates/core/c8y_translator/examples/multi_value_translation.rs
new file mode 100644
index 00000000..22fe5721
--- /dev/null
+++ b/crates/core/c8y_translator/examples/multi_value_translation.rs
@@ -0,0 +1,28 @@
+use c8y_translator::json::from_thin_edge_json;
+
+fn multi_value_translation() {
+ let multi_value_thin_edge_json = r#" {
+ "temperature": 0 ,
+ "location": {
+ "latitude": 32.54,
+ "longitude": -117.67,
+ "altitude": 98.6
+ },
+ "pressure": 98
+ }"#;
+
+ println!("\nThin_Edge_Json: {:#}", multi_value_thin_edge_json);
+ let output = from_thin_edge_json(multi_value_thin_edge_json);
+ match output {
+ Ok(vec) => {
+ println!("{:?}", vec);
+ }
+ Err(e) => {
+ eprintln!("Error: {}", e);
+ }
+ }
+}
+
+pub fn main() {
+ multi_value_translation();
+}
diff --git a/crates/core/c8y_translator/examples/single_value_translation.rs b/crates/core/c8y_translator/examples/single_value_translation.rs
new file mode 100644
index 00000000..ffadd1b0
--- /dev/null
+++ b/crates/core/c8y_translator/examples/single_value_translation.rs
@@ -0,0 +1,24 @@
+use c8y_translator::json::from_thin_edge_json;
+
+fn single_value_translation() {
+ let single_value_thin_edge_json = r#" {
+ "temperature": 23,
+ "pressure": 220
+ }"#;
+
+ println!("Thin_Edge_Json: \n{:#}", single_value_thin_edge_json);
+
+ let output = from_thin_edge_json(single_value_thin_edge_json);
+ match output {
+ Ok(vec) => {
+ println!("{:?}", vec);
+ }
+ Err(e) => {
+ eprintln!("Error: {}", e);
+ }
+ }
+}
+
+pub fn main() {
+ single_value_translation();
+}
diff --git a/crates/core/c8y_translator/examples/translate_to_c8yjson.rs b/crates/core/c8y_translator/examples/translate_to_c8yjson.rs
new file mode 100644
index 00000000..4bc568ce
--- /dev/null
+++ b/crates/core/c8y_translator/examples/translate_to_c8yjson.rs
@@ -0,0 +1,27 @@
+use c8y_translator::json::from_thin_edge_json;
+
+fn thin_edge_translation_with_type_and_timestamp() {
+ let single_value_thin_edge_json_with_type_and_time = r#" {
+ "time" : "2013-06-22T17:03:14.100+02:00",
+ "temperature": 23,
+ "pressure": 220
+ }"#;
+
+ println!(
+ "\nThin_Edge_Json: \n{:#}",
+ single_value_thin_edge_json_with_type_and_time
+ );
+ let output = from_thin_edge_json(single_value_thin_edge_json_with_type_and_time);
+ match output {
+ Ok(vec) => {
+ println!("{:?}", vec);
+ }
+ Err(e) => {
+ eprintln!("Error: {}", e);
+ }
+ }
+}
+
+pub fn main() {
+ thin_edge_translation_with_type_and_timestamp();
+}
diff --git a/crates/core/c8y_translator/fuzz/.gitignore b/crates/core/c8y_translator/fuzz/.gitignore
new file mode 100644
index 00000000..572e03bd
--- /dev/null
+++ b/crates/core/c8y_translator/fuzz/.gitignore
@@ -0,0 +1,4 @@
+
+target
+corpus
+artifacts
diff --git a/crates/core/c8y_translator/fuzz/Cargo.toml b/crates/core/c8y_translator/fuzz/Cargo.toml
new file mode 100644
index 00000000..7744f66b
--- /dev/null
+++ b/crates/core/c8y_translator/fuzz/Cargo.toml
@@ -0,0 +1,26 @@
+
+[package]
+name = "c8y_translator-fuzz"
+version = "0.0.0"
+authors = ["Automatically generated"]
+publish = false
+edition = "2018"
+
+[package.metadata]
+cargo-fuzz = true
+
+[dependencies]
+libfuzzer-sys = "0.4"
+
+[dependencies.c8y_translator]
+path = ".."
+
+# Prevent this from interfering with workspaces
+[workspace]
+members = ["."]
+
+[[bin]]
+name = "fuzz_target_1"
+path = "fuzz_targets/fuzz_target_1.rs"
+test = false
+doc = false
diff --git a/crates/core/c8y_translator/fuzz/README.md b/crates/core/c8y_translator/fuzz/README.md
new file mode 100644
index 00000000..1dc9f87f
--- /dev/null
+++ b/crates/core/c8y_translator/fuzz/README.md
@@ -0,0 +1,9 @@
+See https://rust-fuzz.github.io/book/cargo-fuzz.html
+
+```
+$ cargo install cargo-fuzz
+$ cd mapper/cumulocity/c8y_translator_lib
+$ cargo +nightly fuzz run fuzz_target_1
+```
+
+
diff --git a/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs b/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs
new file mode 100644
index 00000000..1d044ee9
--- /dev/null
+++ b/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs
@@ -0,0 +1,8 @@
+#![no_main]
+use libfuzzer_sys::fuzz_target;
+
+use c8y_translator::CumulocityJson;
+
+fuzz_target!(|data: &[u8]| {
+ let _ = CumulocityJson::from_thin_edge_json(data);
+});
diff --git a/crates/core/c8y_translator/src/json.rs b/crates/core/c8y_translator/src/json.rs
new file mode 100644
index 00000000..5794513f
--- /dev/null
+++ b/crates/core/c8y_translator/src/json.rs
@@ -0,0 +1,280 @@
+//! A library to translate the ThinEdgeJson into C8yJson
+//! Takes thin_edge_json bytes and returns c8y json bytes
+//!
+//! # Examples
+//!
+//! ```
+//! use c8y_translator::json::from_thin_edge_json;
+//! let single_value_thin_edge_json = r#"{
+//! "time": "2020-06-22T17:03:14.000+02:00",
+//! "temperature": 23,
+//! "pressure": 220
+//! }"#;
+//! let output = from_thin_edge_json(single_value_thin_edge_json);
+//! ```
+
+use crate::serializer;
+use chrono::prelude::*;
+use clock::{Clock, WallClock};
+use thin_edge_json::parser::*;
+
+#[derive(thiserror::Error, Debug)]
+pub enum CumulocityJsonError {
+ #[error(transparent)]
+ C8yJsonSerializationError(#[from] serializer::C8yJsonSerializationError),
+
+ #[error(transparent)]
+ ThinEdgeJsonParserError(#[from] ThinEdgeJsonParserError),
+}
+
+/// Converts from thin-edge Json to c8y_json
+pub fn from_thin_edge_json(input: &str) -> Result<String, CumulocityJsonError> {
+ let timestamp = WallClock.now();
+ let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, None)?;
+ Ok(c8y_vec)
+}
+
+/// Converts from thin-edge Json to c8y_json with child id information
+pub fn from_thin_edge_json_with_child(
+ input: &str,
+ child_id: &str,
+) -> Result<String, CumulocityJsonError> {
+ let timestamp = WallClock.now();
+ let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, Some(child_id))?;
+ Ok(c8y_vec)
+}
+
+fn from_thin_edge_json_with_timestamp(
+ input: &str,
+ timestamp: DateTime<FixedOffset>,
+ maybe_child_id: Option<&str>,
+) -> Result<String, CumulocityJsonError> {
+ let mut serializer = serializer::C8yJsonSerializer::new(timestamp, maybe_child_id);
+ let () = parse_str(input, &mut serializer)?;
+ Ok(serializer.into_string()?)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use assert_json_diff::*;
+ use serde_json::{json, Value};
+ use test_case::test_case;
+
+ #[test]
+ fn check_single_value_translation() {
+ let single_value_thin_edge_json = r#"{
+ "temperature": 23.0,
+ "pressure": 220.0
+ }"#;
+
+ let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0);
+
+ let output =
+ from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, None);
+
+ let expected_output = json!({
+ "type": "ThinEdgeMeasurement",
+ "time": timestamp.to_rfc3339(),
+ "temperature": {
+ "temperature": {
+ "value": 23.0
+ }
+ },
+ "pressure": {
+ "pressure": {
+ "value": 220.0
+ }
+ }
+ });
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn check_thin_edge_translation_with_timestamp() {
+ let single_value_thin_edge_json = r#"{
+ "time" : "2013-06-22T17:03:14.123+02:00",
+ "temperature": 23.0,
+ "pressure": 220.0
+ }"#;
+
+ let expected_output = r#"{
+ "type": "ThinEdgeMeasurement",
+ "time": "2013-06-22T17:03:14.123+02:00",
+ "temperature": {
+ "temperature": {
+ "value": 23.0
+ }
+ },
+ "pressure" : {
+ "pressure": {
+ "value" : 220.0
+ }
+ }
+ }"#;
+
+ let output = from_thin_edge_json(single_value_thin_edge_json);
+
+ assert_eq!(
+ expected_output.split_whitespace().collect::<String>(),
+ output.unwrap().split_whitespace().collect::<String>()
+ );
+ }
+
+ #[test]
+ fn check_multi_value_translation() {
+ let multi_value_thin_edge_json = r#"{
+ "temperature": 25.0 ,
+ "location": {
+ "latitude": 32.54,
+ "longitude": -117.67,
+ "altitude": 98.6
+ },
+ "pressure": 98.0
+ }"#;
+
+ let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0);
+
+ let output =
+ from_thin_edge_json_with_timestamp(multi_value_thin_edge_json, timestamp, None);
+
+ let expected_output = json!({
+ "type": "ThinEdgeMeasurement",
+ "time": timestamp.to_rfc3339(),
+ "temperature": {
+ "temperature": {
+ "value": 25.0
+ }
+ },
+ "location": {
+ "latitude": {
+ "value": 32.54
+ },
+ "longitude": {
+ "value": -117.67
+ },
+ "altitude": {
+ "value": 98.6
+ }
+ },
+ "pressure": {
+ "pressure": {
+ "value": 98.0
+ }
+ }
+ });
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(),
+ expected_output
+ );
+ }
+
+ #[test]
+ fn thin_edge_json_round_tiny_number() {
+ let input = r#"{
+ "time" : "2013-06-22T17:03:14.000+02:00",
+ "temperature": 10e-9999999999
+ }"#;
+
+ let expected_output = r#"{
+ "type": "ThinEdgeMeasurement",
+ "time": "2013-06-22T17:03:14+02:00",
+ "temperature": {
+ "temperature": {
+ "value": 0.0
+ }
+ }
+ }"#;
+
+ let output = from_thin_edge_json(input);
+
+ let actual_output = output.unwrap().split_whitespace().collect::<String>();
+
+ assert_eq!(
+ expected_output.split_whitespace().collect::<String>(),
+ actual_output
+ );
+ }
+ use proptest::prelude::*;
+
+ proptest! {
+
+ #[test]
+ fn it_works_for_any_measurement(measurement in r#"[a-z]{3,6}"#) {
+ if measurement == "time" || measurement == "type" {
+ // Skip this test case, since the random measurement name happens to be a reserved key.
+ return Ok(());
+ }
+ let input = format!(r#"{{"time": "2013-06-22T17:03:14.453+02:00",
+ "{}": 123.0
+ }}"#, measurement);
+ let time = "2013-06-22T17:03:14.453+02:00";
+ let expected_output = format!(r#"{{
+ "type": "ThinEdgeMeasurement",
+ "time": "{}",
+ "{}": {{
+ "{}": {{
+ "value": 123.0
+ }}
+ }}
+ }}"#, time, measurement, measurement);
+
+ let output = from_thin_edge_json(input.as_str()).unwrap();
+ assert_eq!(
+ expected_output.split_whitespace().collect::<String>(),
+ output
+ .split_whitespace()
+ .collect::<String>()
+ );
+ }
+ }
+
+ #[test_case(
+ "child1",
+ r#"{"temperature": 23.0}"#,
+ json!({
+ "type": "ThinEdgeMeasurement",
+ "externalSource": {"externalId": "child1","type": "c8y_Serial",},
+ "time": "2021-04-08T00:00:00+05:00",
+ "temperature": {"temperature": {"value": 23.0}}
+ })
+ ;"child device single value thin-edge json translation")]
+ #[test_case(
+ "child2",
+ r#"{"temperature": 23.0, "pressure": 220.0}"#,
+ json!({
+ "type": "ThinEdgeMeasurement",
+ "externalSource": {"externalId": "child2","type": "c8y_Serial",},
+ "time": "2021-04-08T00:00:00+05:00",
+ "temperature": {"temperature": {"value": 23.0}},
+ "pressure": {"pressure": {"value": 220.0}}
+ })
+ ;"child device multiple values thin-edge json translation")]
+ #[test_case(
+ "child3",
+ r#"{"temperature": 23.0, "time": "2021-04-23T19:00:00+05:00"}"#,
+ json!({
+ "type": "ThinEdgeMeasurement",
+ "externalSource": {"externalId": "child3","type": "c8y_Serial",},
+ "time": "2021-04-23T19:00:00+05:00",
+ "temperature": {"temperature": {"value": 23.0}},
+ })
+ ;"child device single value with timestamp thin-edge json translation")]
+ fn check_value_translation_for_child_device(
+ child_id: &str,
+ thin_edge_json: &str,
+ expected_output: Value,
+ ) {
+ let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0);
+ let output = from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, Some(child_id));
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(),
+ expected_output
+ );
+ }
+}
diff --git a/crates/core/c8y_translator/src/lib.rs b/crates/core/c8y_translator/src/lib.rs
new file mode 100644
index 00000000..28c368e2
--- /dev/null
+++ b/crates/core/c8y_translator/src/lib.rs
@@ -0,0 +1,2 @@
+pub mod json;
+pub mod serializer;
diff --git a/crates/core/c8y_translator/src/serializer.rs b/crates/core/c8y_translator/src/serializer.rs
new file mode 100644
index 00000000..96a3e507
--- /dev/null
+++ b/crates/core/c8y_translator/src/serializer.rs
@@ -0,0 +1,403 @@
+use chrono::prelude::*;
+use json_writer::{JsonWriter, JsonWriterError};
+use thin_edge_json::measurement::MeasurementVisitor;
+
+pub struct C8yJsonSerializer {
+ json: JsonWriter,
+ is_within_group: bool,
+ timestamp_present: bool,
+ default_timestamp: DateTime<FixedOffset>,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum C8yJsonSerializationError {
+ #[error(transparent)]
+ MeasurementCollectorError(#[from] MeasurementStreamError),
+
+ #[error(transparent)]
+ JsonWriterError(#[from] JsonWriterError),
+}
+
+#[derive(thiserror::Error, Debug, PartialEq)]
+pub enum MeasurementStreamError {
+ #[error("Unexpected time stamp within a group")]
+ UnexpectedTimestamp,
+
+ #[error("Unexpected end of data")]
+ UnexpectedEndOfData,
+
+ #[error("Unexpected end of group")]
+ UnexpectedEndOfGroup,
+
+ #[error("Unexpected start of group")]
+ UnexpectedStartOfGroup,
+}
+
+impl C8yJsonSerializer {
+ pub fn new(default_timestamp: DateTime<FixedOffset>, maybe_child_id: Option<&str>) -> Self {
+ let capa = 1024; // XXX: Choose a capacity based on expected JSON length.
+ let mut json = JsonWriter::with_capacity(capa);
+
+ json.write_open_obj();
+ let _ = json.write_key("type");
+ let _ = json.write_str("ThinEdgeMeasurement");
+
+ if let Some(child_id) = maybe_child_id {
+ // In case the measurement is addressed to a child-device use fragment
+ // "externalSource" to tell c8Y identity API to use child-device
+ // object referenced by "externalId", instead of root device object
+ // referenced by MQTT client's Device ID.
+ let _ = json.write_key("externalSource");
+ let _ = json.write_open_obj();
+ let _ = json.write_key("externalId");
+ let _ = json.write_str(child_id);
+ let _ = json.write_key("type");
+ let _ = json.write_str("c8y_Serial");
+ let _ = json.write_close_obj();
+ }
+
+ Self {
+ json,
+ is_within_group: false,
+ timestamp_present: false,
+ default_timestamp,
+ }
+ }
+
+ fn end(&mut self) -> Result<(), C8yJsonSerializationError> {
+ if self.is_within_group {
+ return Err(MeasurementStreamError::UnexpectedEndOfData.into());
+ }
+
+ if !self.timestamp_present {
+ self.visit_timestamp(self.default_timestamp)?;
+ }
+
+ assert!(self.timestamp_present);
+
+ self.json.write_close_obj();
+ Ok(())
+ }
+
+ fn write_value_obj(&mut self, value: f64) -> Result<(), C8yJsonSerializationError> {
+ self.json.write_open_obj();
+ self.json.write_key("value")?;
+ self.json.write_f64(value)?;
+ self.json.write_close_obj();
+ Ok(())
+ }
+
+ pub fn into_string(&mut self) -> Result<String, C8yJsonSerializationError> {
+ self.end()?;
+ Ok(self.json.clone().into_string()?)
+ }
+}
+
+impl MeasurementVisitor for C8yJsonSerializer {
+ type Error = C8yJsonSerializationError;
+
+ fn visit_timestamp(&mut self, timestamp: DateTime<FixedOffset>) -> Result<(), Self::Error> {
+ if self.is_within_group {
+ return Err(MeasurementStreamError::UnexpectedTimestamp.into());
+ }
+
+ self.json.write_key("time")?;
+ self.json.write_str(timestamp.to_rfc3339().as_str())?;
+
+ self.timestamp_present = true;
+ Ok(())
+ }
+
+ fn visit_measurement(&mut self, key: &str, value: f64) -> Result<(), Self::Error> {
+ self.json.write_key(key)?;
+
+ if self.is_within_group {
+ self.write_value_obj(value)?;
+ } else {
+ self.json.write_open_obj();
+ self.json.write_key(key)?;
+ self.write_value_obj(value)?;
+ self.json.write_close_obj();
+ }
+ Ok(())
+ }
+
+ fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error> {
+ if self.is_within_group {
+ return Err(MeasurementStreamError::UnexpectedStartOfGroup.into());
+ }
+
+ self.json.write_key(group)?;
+ self.json.write_open_obj();
+ self.is_within_group = true;
+ Ok(())
+ }
+
+ fn visit_end_group(&mut self) -> Result<(), Self::Error> {
+ if !self.is_within_group {
+ return Err(MeasurementStreamError::UnexpectedEndOfGroup.into());
+ }
+
+ self.json.write_close_obj();
+ self.is_within_group = false;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use assert_json_diff::*;
+ use assert_matches::*;
+ use serde_json::json;
+
+ use super::*;
+ use chrono::offset::FixedOffset;
+
+ #[test]
+ fn serialize_single_value_message() -> anyhow::Result<()> {
+ let timestamp = FixedOffset::east(5 * 3600)
+ .ymd(2021, 6, 22)
+ .and_hms_nano(17, 3, 14, 123456789);
+
+ let mut serializer = C8yJsonSerializer::new(timestamp, None);
+ serializer.visit_timestamp(timestamp)?;
+ serializer.visit_measurement("temperature", 25.5)?;
+
+ let output = serializer.into_string()?;
+
+ let expected_output = json!({
+ "type": "ThinEdgeMeasurement",
+ "time": "2021-06-22T17:03:14.123456789+05:00",
+ "temperature":{
+ "temperature":{
+ "value": 25.5
+ }
+ }
+ });
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&output)?,
+ expected_output
+ );
+ Ok(())
+ }
+ #[test]
+ fn serialize_multi_value_message() -> anyhow::Result<()> {
+ let timestamp = FixedOffset::east(5 * 3600)
+ .ymd(2021, 6, 22)
+ .and_hms_nano(17, 3, 14, 123456789);
+
+ let mut serializer = C8yJsonSerializer::new(timestamp, None);
+ serializer.visit_timestamp(timestamp)?;
+ serializer.visit_measurement("temperature", 25.5)?;
+ serializer.visit_start_group("location")?;
+ serializer.visit_measurement("alti", 2100.4)?;
+ serializer.visit_measurement("longi", 2200.4)?;
+ serializer.visit_measurement("lati", 2300.4)?;
+ serializer.visit_end_group()?;
+ serializer.visit_measurement("pressure", 255.2)?;
+
+ let output = serializer.into_string()?;
+
+ let expected_output = json!({
+ "type": "ThinEdgeMeasurement",
+ "time": "2021-06-22T17:03:14.123456789+05:00",
+ "temperature":{
+ "temperature":{
+ "value": 25.5
+ }
+ },
+ "location": {
+ "alti": {
+ "value": 2100.4
+ },
+ "longi":{
+ "value": 2200.4
+ },
+ "lati":{
+ "value": 2300.4
+ },
+ },
+ "pressure":{
+ "pressure":{
+ "value":255.2
+ }
+ }
+
+ });
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&output)?,
+ expected_output
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn serialize_empty_message() -> anyhow::Result<()> {
+ let timestamp = FixedOffset::east(5 * 3600)
+ .ymd(2021, 6, 22)
+ .and_hms_nano(17, 3, 14, 123456789);
+
+ let mut serializer = C8yJsonSerializer::new(timestamp, None);
+
+ let expected_output =
+ json!({"type": "ThinEdgeMeasurement", "time": "2021-06-22T17:03:14.123456789+05:00"});
+
+ let output = serializer.into_string()?;
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&output)?,
+ expected_output
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn serialize_timestamp_message() -> anyhow::Result<()> {
+ let timestamp = FixedOffset::east(5 * 3600)
+ .ymd(2021, 6, 22)
+ .and_hms_nano(17, 3, 14, 123456789);
+
+ let mut serializer = C8yJsonSerializer::new(timestamp, None);
+ serializer.visit_timestamp(timestamp)?;
+
+ let expected_output = json!({
+ "type": "ThinEdgeMeasurement",
+ "time":"2021-06-22T17:03:14.123456789+05:00"
+ });
+
+ let output = serializer.into_string()?;
+
+ assert_json_eq!(
+ serde_json::from_str::<serde_json::Value>(&output)?,
+ expected_output
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn serialize_timestamp_within_group() -> anyhow::Result<()> {
+ let timestamp = FixedOffset::east(5 * 3600)