diff options
author | Lukasz Woznicki <75632179+makr11st@users.noreply.github.com> | 2021-11-24 20:54:56 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-24 20:54:56 +0000 |
commit | a4ffeccf60090e4456755bc53a6e3b8c8038e855 (patch) | |
tree | 9583f187114913a92866571920dd3bb205bd50a3 /crates/core/c8y_translator | |
parent | 8217e80670e76dbf9168780f5e0545355a39f8f3 (diff) |
Restructure directories of the workspace (#559)
* Restructure directories of the workspace
* Rename c8y_translator_lib to c8y_translator
* Update comment on how to get dummy plugin path
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core/c8y_translator')
-rw-r--r-- | crates/core/c8y_translator/Cargo.toml | 30 | ||||
-rw-r--r-- | crates/core/c8y_translator/benches/thin_edge_json.rs | 135 | ||||
-rw-r--r-- | crates/core/c8y_translator/examples/multi_value_translation.rs | 28 | ||||
-rw-r--r-- | crates/core/c8y_translator/examples/single_value_translation.rs | 24 | ||||
-rw-r--r-- | crates/core/c8y_translator/examples/translate_to_c8yjson.rs | 27 | ||||
-rw-r--r-- | crates/core/c8y_translator/fuzz/.gitignore | 4 | ||||
-rw-r--r-- | crates/core/c8y_translator/fuzz/Cargo.toml | 26 | ||||
-rw-r--r-- | crates/core/c8y_translator/fuzz/README.md | 9 | ||||
-rw-r--r-- | crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs | 8 | ||||
-rw-r--r-- | crates/core/c8y_translator/src/json.rs | 280 | ||||
-rw-r--r-- | crates/core/c8y_translator/src/lib.rs | 2 | ||||
-rw-r--r-- | crates/core/c8y_translator/src/serializer.rs | 403 |
12 files changed, 976 insertions, 0 deletions
diff --git a/crates/core/c8y_translator/Cargo.toml b/crates/core/c8y_translator/Cargo.toml new file mode 100644 index 00000000..f4dbdd7d --- /dev/null +++ b/crates/core/c8y_translator/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "c8y_translator" +version = "0.4.3" +authors = ["thin-edge.io team <info@thin-edge.io>"] +edition = "2018" + +[dependencies] +chrono = "0.4" +clock = { path = "../../common/clock" } +json-writer = { path = "../../common/json_writer" } +thin_edge_json = { path = "../thin_edge_json" } +thiserror = "1.0" + +[dev-dependencies] +anyhow = "1.0" +assert_matches = "1.5" +assert-json-diff = "2.0" +criterion = "0.3" +pretty_assertions = "1.0" +proptest = "1.0" +serde_json = "1.0" +test-case = "1.2" + +[features] +# use: #[cfg(feature="integration-test")] +integration-test = [] + +[[bench]] +name = "thin_edge_json" +harness = false diff --git a/crates/core/c8y_translator/benches/thin_edge_json.rs b/crates/core/c8y_translator/benches/thin_edge_json.rs new file mode 100644 index 00000000..76bbd692 --- /dev/null +++ b/crates/core/c8y_translator/benches/thin_edge_json.rs @@ -0,0 +1,135 @@ +use c8y_translator::json; +use criterion::{criterion_group, criterion_main, Criterion}; + +pub fn criterion_benchmark(c: &mut Criterion) { + translate_ref_measurement(c); + translate_2_measurements(c); + translate_50_measurements(c); + translate_17x3_multi_measurements(c); +} + +const REFERENCE_THIN_EDGE_JSON: &str = r#"{ + "time": "2021-06-22T17:03:14.123456789+05:00", + "temperature": 25.01, + "location": { + "latitude": 32.54, + "longitude": -117.67, + "altitude": 98.6 + }, + "pressure": 98.01 + }"#; + +fn translate_ref_measurement(c: &mut Criterion) { + let id = "Translate reference measurement"; + sanity_check_translate_reference_thin_edge_json() + .expect("Expect a valid thin-edge-json message"); + + c.bench_function(id, |b| { + b.iter(|| json::from_thin_edge_json(REFERENCE_THIN_EDGE_JSON)) + }); +} + +fn translate_2_measurements(c: &mut Criterion) { + let id = "Translate 2 measurements"; + let message = r#"{ + "temperature": 12.34, + "pressure": 56.78 + }"#; + sanity_check(message); + + c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(message))); +} + +fn translate_50_measurements(c: &mut Criterion) { + let id = "Translate 50 measurements"; + let message = flat_message(50); + sanity_check(&message); + + c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(&message))); +} + +fn translate_17x3_multi_measurements(c: &mut Criterion) { + let id = "Translate 17x3 multi-measurements"; + let message = group_message(17, 3); + sanity_check(&message); + + c.bench_function(id, |b| b.iter(|| json::from_thin_edge_json(&message))); +} + +fn flat_message(n: u64) -> String { + let mut message = String::with_capacity(5000); + let mut sep = "{"; + for i in 0..n { + message.push_str(&format!("{}\n\t\"measurement_{}\" : {}", sep, i, i * 10)); + sep = "," + } + message.push_str("\n}"); + message +} + +fn group_message(n_grp: u64, n_per_grp: u64) -> String { + let mut message = String::with_capacity(5000); + let mut sep = "{"; + for i in 0..n_grp { + message.push_str(&format!("{}\n\t\"group_{}\" : {{", sep, i)); + sep = ""; + for j in 0..n_per_grp { + message.push_str(&format!( + "{}\n\t\"measurement_{}_{}\" : {}", + sep, + i, + j, + i * j + )); + sep = "," + } + message.push_str("\n\t}"); + sep = "," + } + message.push_str("\n}"); + message +} + +fn sanity_check(message: &str) { + json::from_thin_edge_json(message).expect("Expect a valid thin-edge-json message"); +} + +fn sanity_check_translate_reference_thin_edge_json() -> Result<(), anyhow::Error> { + let output = json::from_thin_edge_json(REFERENCE_THIN_EDGE_JSON)?; + + let simple_c8y_json = serde_json::json!({ + "type": "ThinEdgeMeasurement", + "time": "2021-06-22T17:03:14.123456789+05:00", + "temperature": { + "temperature": { + "value": 25.01 + } + }, + "location": { + "latitude": { + "value": 32.54 + }, + "longitude": { + "value": -117.67 + }, + "altitude": { + "value": 98.6 + } + }, + "pressure": { + "pressure": { + "value": 98.01 + } + } + }); + + assert_json_diff::assert_json_eq!( + serde_json::from_slice::<serde_json::Value>(output.as_bytes())?, + simple_c8y_json + ); + + Ok(()) +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/crates/core/c8y_translator/examples/multi_value_translation.rs b/crates/core/c8y_translator/examples/multi_value_translation.rs new file mode 100644 index 00000000..22fe5721 --- /dev/null +++ b/crates/core/c8y_translator/examples/multi_value_translation.rs @@ -0,0 +1,28 @@ +use c8y_translator::json::from_thin_edge_json; + +fn multi_value_translation() { + let multi_value_thin_edge_json = r#" { + "temperature": 0 , + "location": { + "latitude": 32.54, + "longitude": -117.67, + "altitude": 98.6 + }, + "pressure": 98 + }"#; + + println!("\nThin_Edge_Json: {:#}", multi_value_thin_edge_json); + let output = from_thin_edge_json(multi_value_thin_edge_json); + match output { + Ok(vec) => { + println!("{:?}", vec); + } + Err(e) => { + eprintln!("Error: {}", e); + } + } +} + +pub fn main() { + multi_value_translation(); +} diff --git a/crates/core/c8y_translator/examples/single_value_translation.rs b/crates/core/c8y_translator/examples/single_value_translation.rs new file mode 100644 index 00000000..ffadd1b0 --- /dev/null +++ b/crates/core/c8y_translator/examples/single_value_translation.rs @@ -0,0 +1,24 @@ +use c8y_translator::json::from_thin_edge_json; + +fn single_value_translation() { + let single_value_thin_edge_json = r#" { + "temperature": 23, + "pressure": 220 + }"#; + + println!("Thin_Edge_Json: \n{:#}", single_value_thin_edge_json); + + let output = from_thin_edge_json(single_value_thin_edge_json); + match output { + Ok(vec) => { + println!("{:?}", vec); + } + Err(e) => { + eprintln!("Error: {}", e); + } + } +} + +pub fn main() { + single_value_translation(); +} diff --git a/crates/core/c8y_translator/examples/translate_to_c8yjson.rs b/crates/core/c8y_translator/examples/translate_to_c8yjson.rs new file mode 100644 index 00000000..4bc568ce --- /dev/null +++ b/crates/core/c8y_translator/examples/translate_to_c8yjson.rs @@ -0,0 +1,27 @@ +use c8y_translator::json::from_thin_edge_json; + +fn thin_edge_translation_with_type_and_timestamp() { + let single_value_thin_edge_json_with_type_and_time = r#" { + "time" : "2013-06-22T17:03:14.100+02:00", + "temperature": 23, + "pressure": 220 + }"#; + + println!( + "\nThin_Edge_Json: \n{:#}", + single_value_thin_edge_json_with_type_and_time + ); + let output = from_thin_edge_json(single_value_thin_edge_json_with_type_and_time); + match output { + Ok(vec) => { + println!("{:?}", vec); + } + Err(e) => { + eprintln!("Error: {}", e); + } + } +} + +pub fn main() { + thin_edge_translation_with_type_and_timestamp(); +} diff --git a/crates/core/c8y_translator/fuzz/.gitignore b/crates/core/c8y_translator/fuzz/.gitignore new file mode 100644 index 00000000..572e03bd --- /dev/null +++ b/crates/core/c8y_translator/fuzz/.gitignore @@ -0,0 +1,4 @@ + +target +corpus +artifacts diff --git a/crates/core/c8y_translator/fuzz/Cargo.toml b/crates/core/c8y_translator/fuzz/Cargo.toml new file mode 100644 index 00000000..7744f66b --- /dev/null +++ b/crates/core/c8y_translator/fuzz/Cargo.toml @@ -0,0 +1,26 @@ + +[package] +name = "c8y_translator-fuzz" +version = "0.0.0" +authors = ["Automatically generated"] +publish = false +edition = "2018" + +[package.metadata] +cargo-fuzz = true + +[dependencies] +libfuzzer-sys = "0.4" + +[dependencies.c8y_translator] +path = ".." + +# Prevent this from interfering with workspaces +[workspace] +members = ["."] + +[[bin]] +name = "fuzz_target_1" +path = "fuzz_targets/fuzz_target_1.rs" +test = false +doc = false diff --git a/crates/core/c8y_translator/fuzz/README.md b/crates/core/c8y_translator/fuzz/README.md new file mode 100644 index 00000000..1dc9f87f --- /dev/null +++ b/crates/core/c8y_translator/fuzz/README.md @@ -0,0 +1,9 @@ +See https://rust-fuzz.github.io/book/cargo-fuzz.html + +``` +$ cargo install cargo-fuzz +$ cd mapper/cumulocity/c8y_translator_lib +$ cargo +nightly fuzz run fuzz_target_1 +``` + + diff --git a/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs b/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs new file mode 100644 index 00000000..1d044ee9 --- /dev/null +++ b/crates/core/c8y_translator/fuzz/fuzz_targets/fuzz_target_1.rs @@ -0,0 +1,8 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +use c8y_translator::CumulocityJson; + +fuzz_target!(|data: &[u8]| { + let _ = CumulocityJson::from_thin_edge_json(data); +}); diff --git a/crates/core/c8y_translator/src/json.rs b/crates/core/c8y_translator/src/json.rs new file mode 100644 index 00000000..5794513f --- /dev/null +++ b/crates/core/c8y_translator/src/json.rs @@ -0,0 +1,280 @@ +//! A library to translate the ThinEdgeJson into C8yJson +//! Takes thin_edge_json bytes and returns c8y json bytes +//! +//! # Examples +//! +//! ``` +//! use c8y_translator::json::from_thin_edge_json; +//! let single_value_thin_edge_json = r#"{ +//! "time": "2020-06-22T17:03:14.000+02:00", +//! "temperature": 23, +//! "pressure": 220 +//! }"#; +//! let output = from_thin_edge_json(single_value_thin_edge_json); +//! ``` + +use crate::serializer; +use chrono::prelude::*; +use clock::{Clock, WallClock}; +use thin_edge_json::parser::*; + +#[derive(thiserror::Error, Debug)] +pub enum CumulocityJsonError { + #[error(transparent)] + C8yJsonSerializationError(#[from] serializer::C8yJsonSerializationError), + + #[error(transparent)] + ThinEdgeJsonParserError(#[from] ThinEdgeJsonParserError), +} + +/// Converts from thin-edge Json to c8y_json +pub fn from_thin_edge_json(input: &str) -> Result<String, CumulocityJsonError> { + let timestamp = WallClock.now(); + let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, None)?; + Ok(c8y_vec) +} + +/// Converts from thin-edge Json to c8y_json with child id information +pub fn from_thin_edge_json_with_child( + input: &str, + child_id: &str, +) -> Result<String, CumulocityJsonError> { + let timestamp = WallClock.now(); + let c8y_vec = from_thin_edge_json_with_timestamp(input, timestamp, Some(child_id))?; + Ok(c8y_vec) +} + +fn from_thin_edge_json_with_timestamp( + input: &str, + timestamp: DateTime<FixedOffset>, + maybe_child_id: Option<&str>, +) -> Result<String, CumulocityJsonError> { + let mut serializer = serializer::C8yJsonSerializer::new(timestamp, maybe_child_id); + let () = parse_str(input, &mut serializer)?; + Ok(serializer.into_string()?) +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_json_diff::*; + use serde_json::{json, Value}; + use test_case::test_case; + + #[test] + fn check_single_value_translation() { + let single_value_thin_edge_json = r#"{ + "temperature": 23.0, + "pressure": 220.0 + }"#; + + let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0); + + let output = + from_thin_edge_json_with_timestamp(single_value_thin_edge_json, timestamp, None); + + let expected_output = json!({ + "type": "ThinEdgeMeasurement", + "time": timestamp.to_rfc3339(), + "temperature": { + "temperature": { + "value": 23.0 + } + }, + "pressure": { + "pressure": { + "value": 220.0 + } + } + }); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(), + expected_output + ); + } + + #[test] + fn check_thin_edge_translation_with_timestamp() { + let single_value_thin_edge_json = r#"{ + "time" : "2013-06-22T17:03:14.123+02:00", + "temperature": 23.0, + "pressure": 220.0 + }"#; + + let expected_output = r#"{ + "type": "ThinEdgeMeasurement", + "time": "2013-06-22T17:03:14.123+02:00", + "temperature": { + "temperature": { + "value": 23.0 + } + }, + "pressure" : { + "pressure": { + "value" : 220.0 + } + } + }"#; + + let output = from_thin_edge_json(single_value_thin_edge_json); + + assert_eq!( + expected_output.split_whitespace().collect::<String>(), + output.unwrap().split_whitespace().collect::<String>() + ); + } + + #[test] + fn check_multi_value_translation() { + let multi_value_thin_edge_json = r#"{ + "temperature": 25.0 , + "location": { + "latitude": 32.54, + "longitude": -117.67, + "altitude": 98.6 + }, + "pressure": 98.0 + }"#; + + let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0); + + let output = + from_thin_edge_json_with_timestamp(multi_value_thin_edge_json, timestamp, None); + + let expected_output = json!({ + "type": "ThinEdgeMeasurement", + "time": timestamp.to_rfc3339(), + "temperature": { + "temperature": { + "value": 25.0 + } + }, + "location": { + "latitude": { + "value": 32.54 + }, + "longitude": { + "value": -117.67 + }, + "altitude": { + "value": 98.6 + } + }, + "pressure": { + "pressure": { + "value": 98.0 + } + } + }); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(), + expected_output + ); + } + + #[test] + fn thin_edge_json_round_tiny_number() { + let input = r#"{ + "time" : "2013-06-22T17:03:14.000+02:00", + "temperature": 10e-9999999999 + }"#; + + let expected_output = r#"{ + "type": "ThinEdgeMeasurement", + "time": "2013-06-22T17:03:14+02:00", + "temperature": { + "temperature": { + "value": 0.0 + } + } + }"#; + + let output = from_thin_edge_json(input); + + let actual_output = output.unwrap().split_whitespace().collect::<String>(); + + assert_eq!( + expected_output.split_whitespace().collect::<String>(), + actual_output + ); + } + use proptest::prelude::*; + + proptest! { + + #[test] + fn it_works_for_any_measurement(measurement in r#"[a-z]{3,6}"#) { + if measurement == "time" || measurement == "type" { + // Skip this test case, since the random measurement name happens to be a reserved key. + return Ok(()); + } + let input = format!(r#"{{"time": "2013-06-22T17:03:14.453+02:00", + "{}": 123.0 + }}"#, measurement); + let time = "2013-06-22T17:03:14.453+02:00"; + let expected_output = format!(r#"{{ + "type": "ThinEdgeMeasurement", + "time": "{}", + "{}": {{ + "{}": {{ + "value": 123.0 + }} + }} + }}"#, time, measurement, measurement); + + let output = from_thin_edge_json(input.as_str()).unwrap(); + assert_eq!( + expected_output.split_whitespace().collect::<String>(), + output + .split_whitespace() + .collect::<String>() + ); + } + } + + #[test_case( + "child1", + r#"{"temperature": 23.0}"#, + json!({ + "type": "ThinEdgeMeasurement", + "externalSource": {"externalId": "child1","type": "c8y_Serial",}, + "time": "2021-04-08T00:00:00+05:00", + "temperature": {"temperature": {"value": 23.0}} + }) + ;"child device single value thin-edge json translation")] + #[test_case( + "child2", + r#"{"temperature": 23.0, "pressure": 220.0}"#, + json!({ + "type": "ThinEdgeMeasurement", + "externalSource": {"externalId": "child2","type": "c8y_Serial",}, + "time": "2021-04-08T00:00:00+05:00", + "temperature": {"temperature": {"value": 23.0}}, + "pressure": {"pressure": {"value": 220.0}} + }) + ;"child device multiple values thin-edge json translation")] + #[test_case( + "child3", + r#"{"temperature": 23.0, "time": "2021-04-23T19:00:00+05:00"}"#, + json!({ + "type": "ThinEdgeMeasurement", + "externalSource": {"externalId": "child3","type": "c8y_Serial",}, + "time": "2021-04-23T19:00:00+05:00", + "temperature": {"temperature": {"value": 23.0}}, + }) + ;"child device single value with timestamp thin-edge json translation")] + fn check_value_translation_for_child_device( + child_id: &str, + thin_edge_json: &str, + expected_output: Value, + ) { + let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0); + let output = from_thin_edge_json_with_timestamp(thin_edge_json, timestamp, Some(child_id)); + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(output.unwrap().as_str()).unwrap(), + expected_output + ); + } +} diff --git a/crates/core/c8y_translator/src/lib.rs b/crates/core/c8y_translator/src/lib.rs new file mode 100644 index 00000000..28c368e2 --- /dev/null +++ b/crates/core/c8y_translator/src/lib.rs @@ -0,0 +1,2 @@ +pub mod json; +pub mod serializer; diff --git a/crates/core/c8y_translator/src/serializer.rs b/crates/core/c8y_translator/src/serializer.rs new file mode 100644 index 00000000..96a3e507 --- /dev/null +++ b/crates/core/c8y_translator/src/serializer.rs @@ -0,0 +1,403 @@ +use chrono::prelude::*; +use json_writer::{JsonWriter, JsonWriterError}; +use thin_edge_json::measurement::MeasurementVisitor; + +pub struct C8yJsonSerializer { + json: JsonWriter, + is_within_group: bool, + timestamp_present: bool, + default_timestamp: DateTime<FixedOffset>, +} + +#[derive(thiserror::Error, Debug)] +pub enum C8yJsonSerializationError { + #[error(transparent)] + MeasurementCollectorError(#[from] MeasurementStreamError), + + #[error(transparent)] + JsonWriterError(#[from] JsonWriterError), +} + +#[derive(thiserror::Error, Debug, PartialEq)] +pub enum MeasurementStreamError { + #[error("Unexpected time stamp within a group")] + UnexpectedTimestamp, + + #[error("Unexpected end of data")] + UnexpectedEndOfData, + + #[error("Unexpected end of group")] + UnexpectedEndOfGroup, + + #[error("Unexpected start of group")] + UnexpectedStartOfGroup, +} + +impl C8yJsonSerializer { + pub fn new(default_timestamp: DateTime<FixedOffset>, maybe_child_id: Option<&str>) -> Self { + let capa = 1024; // XXX: Choose a capacity based on expected JSON length. + let mut json = JsonWriter::with_capacity(capa); + + json.write_open_obj(); + let _ = json.write_key("type"); + let _ = json.write_str("ThinEdgeMeasurement"); + + if let Some(child_id) = maybe_child_id { + // In case the measurement is addressed to a child-device use fragment + // "externalSource" to tell c8Y identity API to use child-device + // object referenced by "externalId", instead of root device object + // referenced by MQTT client's Device ID. + let _ = json.write_key("externalSource"); + let _ = json.write_open_obj(); + let _ = json.write_key("externalId"); + let _ = json.write_str(child_id); + let _ = json.write_key("type"); + let _ = json.write_str("c8y_Serial"); + let _ = json.write_close_obj(); + } + + Self { + json, + is_within_group: false, + timestamp_present: false, + default_timestamp, + } + } + + fn end(&mut self) -> Result<(), C8yJsonSerializationError> { + if self.is_within_group { + return Err(MeasurementStreamError::UnexpectedEndOfData.into()); + } + + if !self.timestamp_present { + self.visit_timestamp(self.default_timestamp)?; + } + + assert!(self.timestamp_present); + + self.json.write_close_obj(); + Ok(()) + } + + fn write_value_obj(&mut self, value: f64) -> Result<(), C8yJsonSerializationError> { + self.json.write_open_obj(); + self.json.write_key("value")?; + self.json.write_f64(value)?; + self.json.write_close_obj(); + Ok(()) + } + + pub fn into_string(&mut self) -> Result<String, C8yJsonSerializationError> { + self.end()?; + Ok(self.json.clone().into_string()?) + } +} + +impl MeasurementVisitor for C8yJsonSerializer { + type Error = C8yJsonSerializationError; + + fn visit_timestamp(&mut self, timestamp: DateTime<FixedOffset>) -> Result<(), Self::Error> { + if self.is_within_group { + return Err(MeasurementStreamError::UnexpectedTimestamp.into()); + } + + self.json.write_key("time")?; + self.json.write_str(timestamp.to_rfc3339().as_str())?; + + self.timestamp_present = true; + Ok(()) + } + + fn visit_measurement(&mut self, key: &str, value: f64) -> Result<(), Self::Error> { + self.json.write_key(key)?; + + if self.is_within_group { + self.write_value_obj(value)?; + } else { + self.json.write_open_obj(); + self.json.write_key(key)?; + self.write_value_obj(value)?; + self.json.write_close_obj(); + } + Ok(()) + } + + fn visit_start_group(&mut self, group: &str) -> Result<(), Self::Error> { + if self.is_within_group { + return Err(MeasurementStreamError::UnexpectedStartOfGroup.into()); + } + + self.json.write_key(group)?; + self.json.write_open_obj(); + self.is_within_group = true; + Ok(()) + } + + fn visit_end_group(&mut self) -> Result<(), Self::Error> { + if !self.is_within_group { + return Err(MeasurementStreamError::UnexpectedEndOfGroup.into()); + } + + self.json.write_close_obj(); + self.is_within_group = false; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use assert_json_diff::*; + use assert_matches::*; + use serde_json::json; + + use super::*; + use chrono::offset::FixedOffset; + + #[test] + fn serialize_single_value_message() -> anyhow::Result<()> { + let timestamp = FixedOffset::east(5 * 3600) + .ymd(2021, 6, 22) + .and_hms_nano(17, 3, 14, 123456789); + + let mut serializer = C8yJsonSerializer::new(timestamp, None); + serializer.visit_timestamp(timestamp)?; + serializer.visit_measurement("temperature", 25.5)?; + + let output = serializer.into_string()?; + + let expected_output = json!({ + "type": "ThinEdgeMeasurement", + "time": "2021-06-22T17:03:14.123456789+05:00", + "temperature":{ + "temperature":{ + "value": 25.5 + } + } + }); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&output)?, + expected_output + ); + Ok(()) + } + #[test] + fn serialize_multi_value_message() -> anyhow::Result<()> { + let timestamp = FixedOffset::east(5 * 3600) + .ymd(2021, 6, 22) + .and_hms_nano(17, 3, 14, 123456789); + + let mut serializer = C8yJsonSerializer::new(timestamp, None); + serializer.visit_timestamp(timestamp)?; + serializer.visit_measurement("temperature", 25.5)?; + serializer.visit_start_group("location")?; + serializer.visit_measurement("alti", 2100.4)?; + serializer.visit_measurement("longi", 2200.4)?; + serializer.visit_measurement("lati", 2300.4)?; + serializer.visit_end_group()?; + serializer.visit_measurement("pressure", 255.2)?; + + let output = serializer.into_string()?; + + let expected_output = json!({ + "type": "ThinEdgeMeasurement", + "time": "2021-06-22T17:03:14.123456789+05:00", + "temperature":{ + "temperature":{ + "value": 25.5 + } + }, + "location": { + "alti": { + "value": 2100.4 + }, + "longi":{ + "value": 2200.4 + }, + "lati":{ + "value": 2300.4 + }, + }, + "pressure":{ + "pressure":{ + "value":255.2 + } + } + + }); + + assert_json_eq!( + serde_json::from_str::<serde_json::Value>(&output)?, + expected_output + ); + + Ok(()) + } + + #[test] + fn serialize_empty_message() -> anyhow::Result<()> { + let timestamp = FixedOffset::east(5 * 3600) + |