diff options
-rw-r--r-- | Cargo.lock | 11 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | common/json_writer/Cargo.toml | 11 | ||||
-rw-r--r-- | common/json_writer/src/lib.rs | 163 | ||||
-rw-r--r-- | mapper/cumulocity/c8y_translator_lib/Cargo.toml | 1 | ||||
-rw-r--r-- | mapper/cumulocity/c8y_translator_lib/src/json.rs | 30 | ||||
-rw-r--r-- | mapper/cumulocity/c8y_translator_lib/src/serializer.rs | 98 | ||||
-rw-r--r-- | mapper/tedge_mapper/src/az_converter.rs | 16 | ||||
-rw-r--r-- | mapper/thin_edge_json/Cargo.toml | 1 | ||||
-rw-r--r-- | mapper/thin_edge_json/examples/tej_example.rs | 4 | ||||
-rw-r--r-- | mapper/thin_edge_json/src/serialize.rs | 162 |
11 files changed, 332 insertions, 166 deletions
@@ -279,6 +279,7 @@ dependencies = [ "chrono", "clock", "criterion", + "json-writer", "pretty_assertions", "proptest", "serde_json", @@ -1019,6 +1020,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" [[package]] +name = "json-writer" +version = "0.1.0" +dependencies = [ + "anyhow", + "serde_json", + "thiserror", +] + +[[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2192,6 +2202,7 @@ dependencies = [ "chrono", "clock", "json", + "json-writer", "mockall", "pretty_assertions", "proptest", @@ -6,6 +6,7 @@ members = [ "common/mqtt_client", "common/clock", "common/tedge_users", + "common/json_writer", "tedge", "tedge_config", "mapper/cumulocity/c8y_translator_lib", diff --git a/common/json_writer/Cargo.toml b/common/json_writer/Cargo.toml new file mode 100644 index 00000000..4ea07748 --- /dev/null +++ b/common/json_writer/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "json-writer" +version = "0.1.0" +edition = "2018" + +[dependencies] +thiserror = "1.0" +serde_json = "1" + +[dev-dependencies] +anyhow = "1.0.40" diff --git a/common/json_writer/src/lib.rs b/common/json_writer/src/lib.rs new file mode 100644 index 00000000..c3c372bf --- /dev/null +++ b/common/json_writer/src/lib.rs @@ -0,0 +1,163 @@ +use std::num::FpCategory; + +#[derive(Debug, Clone)] +pub struct JsonWriter { + buffer: Vec<u8>, +} + +#[derive(thiserror::Error, Debug)] +pub enum JsonWriterError { + #[error("JsonWriter produced invalid UTF8 string")] + InvalidUtf8Conversion(#[from] std::string::FromUtf8Error), + + #[error("IoError")] + IoError(#[from] std::io::Error), + + #[error("Serde Json error")] + SerdeJsonError(#[from] serde_json::Error), + + #[error("Invalid f64 value {value:?}")] + InvalidF64Value { value: f64 }, +} + +impl JsonWriter { + pub fn new() -> Self { + Self { buffer: Vec::new() } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + buffer: Vec::with_capacity(capacity), + } + } + + pub fn write_key(&mut self, key: &str) -> Result<(), JsonWriterError> { + self.write_str(key)?; + self.buffer.push(b':'); + Ok(()) + } + + pub fn write_str(&mut self, s: &str) -> Result<(), JsonWriterError> { + Ok(serde_json::to_writer(&mut self.buffer, s)?) + } + + pub fn write_f64(&mut self, value: f64) -> Result<(), JsonWriterError> { + match value.classify() { + FpCategory::Normal | FpCategory::Zero | FpCategory::Subnormal => { + Ok(serde_json::to_writer(&mut self.buffer, &value)?) + } + FpCategory::Infinite | FpCategory::Nan => { + Err(JsonWriterError::InvalidF64Value { value }) + } + } + } + + pub fn write_separator(&mut self) { + self.buffer.push(b','); + } + + pub fn write_open_obj(&mut self) { + self.buffer.push(b'{'); + } + + pub fn write_close_obj(&mut self) { + self.buffer.push(b'}'); + } + + pub fn into_string(self) -> Result<String, JsonWriterError> { + Ok(String::from_utf8(self.buffer)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn write_empty_message() -> anyhow::Result<()> { + let mut jw = JsonWriter::new(); + jw.write_open_obj(); + jw.write_close_obj(); + assert_eq!(jw.into_string()?, "{}"); + Ok(()) + } + + #[test] + fn write_invalid_f64_message() -> anyhow::Result<()> { + let mut jw = JsonWriter::new(); + let value = 1.0 / 0.0; + let error = jw.write_f64(value).unwrap_err(); + assert_eq!(error.to_string(), "Invalid f64 value inf"); + Ok(()) + } + + #[test] + fn write_key_with_quote() -> anyhow::Result<()> { + let mut jw = JsonWriter::with_capacity(128); + jw.write_key("va\"lue")?; + assert_eq!(jw.into_string()?, "\"va\\\"lue\":"); + Ok(()) + } + + #[test] + fn write_timestamp_message() -> anyhow::Result<()> { + let mut jw = JsonWriter::with_capacity(128); + jw.write_open_obj(); + jw.write_key("time")?; + jw.write_str("2013-06-22T17:03:14.123+02:00")?; + jw.write_close_obj(); + assert_eq!( + jw.into_string()?, + r#"{"time":"2013-06-22T17:03:14.123+02:00"}"# + ); + Ok(()) + } + + #[test] + fn write_single_value_message() -> anyhow::Result<()> { + let mut jw = JsonWriter::with_capacity(128); + jw.write_open_obj(); + jw.write_key("time")?; + jw.write_str("2013-06-22T17:03:14.123+02:00")?; + jw.write_separator(); + jw.write_key("temperature")?; + jw.write_f64(128.0)?; + jw.write_close_obj(); + assert_eq!( + jw.into_string()?, + r#"{"time":"2013-06-22T17:03:14.123+02:00","temperature":128.0}"# + ); + Ok(()) + } + + #[test] + fn write_multivalue_message() -> anyhow::Result<()> { + let mut jw = JsonWriter::with_capacity(128); + jw.write_open_obj(); + jw.write_key("time")?; + jw.write_str("2013-06-22T17:03:14.123+02:00")?; + jw.write_separator(); + jw.write_key("temperature")?; + jw.write_f64(128.0)?; + jw.write_separator(); + jw.write_key("location")?; + jw.write_open_obj(); + jw.write_key("altitude")?; + jw.write_f64(1028.0)?; + jw.write_separator(); + jw.write_key("longitude")?; + jw.write_f64(1288.0)?; + jw.write_separator(); + jw.write_key("longitude")?; + jw.write_f64(1280.0)?; + jw.write_close_obj(); + jw.write_close_obj(); + + assert_eq!( + jw.into_string()?, + r#"{"time":"2013-06-22T17:03:14.123+02:00","temperature":128.0,"location":{"altitude":1028.0,"longitude":1288.0,"longitude":1280.0}}"# + ); + + Ok(()) + } +} diff --git a/mapper/cumulocity/c8y_translator_lib/Cargo.toml b/mapper/cumulocity/c8y_translator_lib/Cargo.toml index a1d2c1a1..54e2de85 100644 --- a/mapper/cumulocity/c8y_translator_lib/Cargo.toml +++ b/mapper/cumulocity/c8y_translator_lib/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" chrono = "0.4" thin_edge_json = {path = "../../thin_edge_json"} clock = {path = "../../../common/clock" } +json-writer = {path = "../../../common/json_writer" } thiserror = "1.0" [dev-dependencies] diff --git a/mapper/cumulocity/c8y_translator_lib/src/json.rs b/mapper/cumulocity/c8y_translator_lib/src/json.rs index 8f7d6863..c7448474 100644 --- a/mapper/cumulocity/c8y_translator_lib/src/json.rs +++ b/mapper/cumulocity/c8y_translator_lib/src/json.rs @@ -52,8 +52,8 @@ mod tests { #[test] fn check_single_value_translation() { let single_value_thin_edge_json = r#"{ - "temperature": 23, - "pressure": 220 + "temperature": 23.0, + "pressure": 220.0 }"#; let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0); @@ -65,12 +65,12 @@ mod tests { "time": timestamp.to_rfc3339(), "temperature": { "temperature": { - "value": 23 + "value": 23.0 } }, "pressure": { "pressure": { - "value": 220 + "value": 220.0 } } }); @@ -85,8 +85,8 @@ mod tests { fn check_thin_edge_translation_with_timestamp() { let single_value_thin_edge_json = r#"{ "time" : "2013-06-22T17:03:14.123+02:00", - "temperature": 23, - "pressure": 220 + "temperature": 23.0, + "pressure": 220.0 }"#; let expected_output = r#"{ @@ -94,12 +94,12 @@ mod tests { "time": "2013-06-22T17:03:14.123+02:00", "temperature": { "temperature": { - "value": 23 + "value": 23.0 } }, "pressure" : { "pressure": { - "value" : 220 + "value" : 220.0 } } }"#; @@ -115,13 +115,13 @@ mod tests { #[test] fn check_multi_value_translation() { let multi_value_thin_edge_json = r#"{ - "temperature": 25 , + "temperature": 25.0 , "location": { "latitude": 32.54, "longitude": -117.67, "altitude": 98.6 }, - "pressure": 98 + "pressure": 98.0 }"#; let timestamp = FixedOffset::east(5 * 3600).ymd(2021, 4, 8).and_hms(0, 0, 0); @@ -133,7 +133,7 @@ mod tests { "time": timestamp.to_rfc3339(), "temperature": { "temperature": { - "value": 25 + "value": 25.0 } }, "location": { @@ -149,7 +149,7 @@ mod tests { }, "pressure": { "pressure": { - "value": 98 + "value": 98.0 } } }); @@ -172,7 +172,7 @@ mod tests { "time": "2013-06-22T17:03:14+02:00", "temperature": { "temperature": { - "value": 0 + "value": 0.0 } } }"#; @@ -197,7 +197,7 @@ mod tests { return Ok(()); } let input = format!(r#"{{"time": "2013-06-22T17:03:14.453+02:00", - "{}": 123 + "{}": 123.0 }}"#, measurement); let time = "2013-06-22T17:03:14.453+02:00"; let expected_output = format!(r#"{{ @@ -205,7 +205,7 @@ mod tests { "time": "{}", "{}": {{ "{}": {{ - "value": 123 + "value": 123.0 }} }} }}"#, time, measurement, measurement); diff --git a/mapper/cumulocity/c8y_translator_lib/src/serializer.rs b/mapper/cumulocity/c8y_translator_lib/src/serializer.rs index c01e1a12..6f29a902 100644 --- a/mapper/cumulocity/c8y_translator_lib/src/serializer.rs +++ b/mapper/cumulocity/c8y_translator_lib/src/serializer.rs @@ -1,8 +1,10 @@ use chrono::prelude::*; +use json_writer::{JsonWriter, JsonWriterError}; + use thin_edge_json::{json::ThinEdgeJsonError, measurement::GroupedMeasurementVisitor}; pub struct C8yJsonSerializer { - buffer: String, + json: JsonWriter, is_within_group: bool, needs_separator: bool, timestamp_present: bool, @@ -22,6 +24,9 @@ pub enum C8yJsonSerializationError { #[error("Serializer produced invalid Utf8 string")] InvalidUtf8ConversionToString(std::string::FromUtf8Error), + + #[error(transparent)] + JsonWriterError(#[from] JsonWriterError), } #[derive(thiserror::Error, Debug, PartialEq)] @@ -42,12 +47,14 @@ pub enum MeasurementStreamError { impl C8yJsonSerializer { pub fn new(default_timestamp: DateTime<FixedOffset>) -> Self { let capa = 1024; // XXX: Choose a capacity based on expected JSON length. - let mut buffer = String::with_capacity(capa); + let mut json = JsonWriter::with_capacity(capa); - buffer.push_str(r#"{"type": "ThinEdgeMeasurement""#); + json.write_open_obj(); + let _ = json.write_key("type"); + let _ = json.write_str("ThinEdgeMeasurement"); Self { - buffer, + json, is_within_group: false, needs_separator: true, timestamp_present: false, @@ -66,42 +73,21 @@ impl C8yJsonSerializer { assert!(self.timestamp_present); - self.buffer.push('}'); + self.json.write_close_obj(); Ok(()) } - pub fn bytes(mut self) -> Result<Vec<u8>, C8yJsonSerializationError> { - self.end()?; - Ok(self.buffer.into()) - } - - fn write_key(&mut self, key: &str) { - self.write_str(key); - self.buffer.push(':'); - } - - fn write_str(&mut self, s: &str) { - self.buffer.push('"'); - self.buffer.push_str(s); - self.buffer.push('"'); - } - - fn write_f64(&mut self, value: f64) -> std::fmt::Result { - use std::fmt::Write; - self.buffer.write_fmt(format_args!("{}", value)) - } - - fn write_value_obj(&mut self, value: f64) -> std::fmt::Result { - self.buffer.push('{'); - self.write_key("value"); - self.write_f64(value)?; - self.buffer.push('}'); + fn write_value_obj(&mut self, value: f64) -> Result<(), C8yJsonSerializationError> { + self.json.write_open_obj(); + self.json.write_key("value")?; + self.json.write_f64(value)?; + self.json.write_close_obj(); Ok(()) } - pub fn into_string(self) -> Result<String, C8yJsonSerializationError> { - String::from_utf8(self.bytes()?) - .map_err(C8yJsonSerializationError::InvalidUtf8ConversionToString) + pub fn into_string(&mut self) -> Result<String, C8yJsonSerializationError> { + self.end()?; + Ok(self.json.clone().into_string()?) } } @@ -114,11 +100,11 @@ impl GroupedMeasurementVisitor for C8yJsonSerializer { } if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } - self.write_key("time"); - self.write_str(timestamp.to_rfc3339().as_str()); + self.json.write_key("time")?; + self.json.write_str(timestamp.to_rfc3339().as_str())?; self.needs_separator = true; self.timestamp_present = true; @@ -127,20 +113,20 @@ impl GroupedMeasurementVisitor for C8yJsonSerializer { fn measurement(&mut self, key: &str, value: f64) -> Result<(), Self::Error> { if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } else { self.needs_separator = true; } - self.write_key(key); + self.json.write_key(key)?; if self.is_within_group { self.write_value_obj(value)?; } else { - self.buffer.push('{'); - self.write_key(key); + self.json.write_open_obj(); + self.json.write_key(key)?; self.write_value_obj(value)?; - self.buffer.push('}'); + self.json.write_close_obj(); } Ok(()) } @@ -151,10 +137,10 @@ impl GroupedMeasurementVisitor for C8yJsonSerializer { } if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } - self.write_key(group); - self.buffer.push('{'); + self.json.write_key(group)?; + self.json.write_open_obj(); self.needs_separator = false; self.is_within_group = true; Ok(()) @@ -165,7 +151,7 @@ impl GroupedMeasurementVisitor for C8yJsonSerializer { return Err(MeasurementStreamError::UnexpectedEndOfGroup.into()); } - self.buffer.push('}'); + self.json.write_close_obj(); self.needs_separator = true; self.is_within_group = false; Ok(()) @@ -191,7 +177,7 @@ mod tests { serializer.timestamp(timestamp)?; serializer.measurement("temperature", 25.5)?; - let output = serializer.bytes()?; + let output = serializer.into_string()?; let expected_output = json!({ "type": "ThinEdgeMeasurement", @@ -204,7 +190,7 @@ mod tests { }); assert_json_eq!( - serde_json::from_slice::<serde_json::Value>(&output)?, + serde_json::from_str::<serde_json::Value>(&output)?, expected_output ); Ok(()) @@ -225,7 +211,7 @@ mod tests { serializer.end_group()?; serializer.measurement("pressure", 255.2)?; - let output = serializer.bytes()?; + let output = serializer.into_string()?; let expected_output = json!({ "type": "ThinEdgeMeasurement", @@ -255,7 +241,7 @@ mod tests { }); assert_json_eq!( - serde_json::from_slice::<serde_json::Value>(&output)?, + serde_json::from_str::<serde_json::Value>(&output)?, expected_output ); @@ -268,15 +254,15 @@ mod tests { .ymd(2021, 6, 22) .and_hms_nano(17, 3, 14, 123456789); - let serializer = C8yJsonSerializer::new(timestamp); + let mut serializer = C8yJsonSerializer::new(timestamp); let expected_output = json!({"type": "ThinEdgeMeasurement", "time": "2021-06-22T17:03:14.123456789+05:00"}); - let output = serializer.bytes()?; + let output = serializer.into_string()?; assert_json_eq!( - serde_json::from_slice::<serde_json::Value>(&output)?, + serde_json::from_str::<serde_json::Value>(&output)?, expected_output ); @@ -297,10 +283,10 @@ mod tests { "time":"2021-06-22T17:03:14.123456789+05:00" }); - let output = serializer.bytes()?; + let output = serializer.into_string()?; assert_json_eq!( - serde_json::from_slice::<serde_json::Value>(&output)?, + serde_json::from_str::<serde_json::Value>(&output)?, expected_output ); @@ -383,7 +369,7 @@ mod tests { serializer.measurement("alti", 2100.4)?; serializer.measurement("longi", 2200.4)?; - let expected_err = serializer.bytes(); + let expected_err = serializer.into_string(); assert_matches!( expected_err, diff --git a/mapper/tedge_mapper/src/az_converter.rs b/mapper/tedge_mapper/src/az_converter.rs index 9811f96e..0406af21 100644 --- a/mapper/tedge_mapper/src/az_converter.rs +++ b/mapper/tedge_mapper/src/az_converter.rs @@ -65,11 +65,11 @@ mod tests { }; let input = r#"{ - "temperature": 23 + "temperature": 23.0 }"#; let expected_output = json!({ - "temperature": 23 + "temperature": 23.0 }); let output = converter.convert(input.as_ref()); @@ -91,12 +91,12 @@ mod tests { let input = r#"{ "time" : "2013-06-22T17:03:14.000+02:00", - "temperature": 23 + "temperature": 23.0 }"#; let expected_output = json!({ "time" : "2013-06-22T17:03:14+02:00", - "temperature": 23 + "temperature": 23.0 }); let output = converter.convert(input.as_ref()); @@ -118,12 +118,12 @@ mod tests { let input = r#"{ "time" : "2013-06-22T17:03:14.000+02:00", - "temperature": 23 + "temperature": 23.0 }"#; let expected_output = json!({ "time" : "2013-06-22T17:03:14+02:00", - "temperature": 23 + "temperature": 23.0 }); let output = converter.convert(input.as_ref()); @@ -144,11 +144,11 @@ mod tests { }; let input = r#"{ - "temperature": 23 + "temperature": 23.0 }"#; let expected_output = json!({ - "temperature": 23, + "temperature": 23.0, "time": "2021-04-08T00:00:00+05:00" }); diff --git a/mapper/thin_edge_json/Cargo.toml b/mapper/thin_edge_json/Cargo.toml index c30a1cca..e14451f2 100644 --- a/mapper/thin_edge_json/Cargo.toml +++ b/mapper/thin_edge_json/Cargo.toml @@ -11,6 +11,7 @@ chrono = "0.4" json = "0.12" thiserror = "1.0" clock = {path = "../../common/clock" } +json-writer = {path = "../../common/json_writer" } [dev-dependencies] pretty_assertions = "0.7" diff --git a/mapper/thin_edge_json/examples/tej_example.rs b/mapper/thin_edge_json/examples/tej_example.rs index f0114361..1ffcc881 100644 --- a/mapper/thin_edge_json/examples/tej_example.rs +++ b/mapper/thin_edge_json/examples/tej_example.rs @@ -19,8 +19,8 @@ fn tej_build_serialize() -> anyhow::Result<()> { //Serialize the TEJ to u8 bytes let mut visitor = ThinEdgeJsonSerializer::new(); grp_msg.accept(&mut visitor)?; - let bytes = visitor.bytes()?; - println!("Serialized Tej=> {:?}", std::str::from_utf8(&bytes)); + + println!("Serialized Tej=> {:?}", visitor.into_string()?); Ok(()) } fn main() -> anyhow::Result<()> { diff --git a/mapper/thin_edge_json/src/serialize.rs b/mapper/thin_edge_json/src/serialize.rs index 329700c8..805087de 100644 --- a/mapper/thin_edge_json/src/serialize.rs +++ b/mapper/thin_edge_json/src/serialize.rs @@ -1,10 +1,10 @@ use chrono::offset::FixedOffset; use chrono::DateTime; -use std::io::Write; +use json_writer::{JsonWriter, JsonWriterError}; use crate::measurement::GroupedMeasurementVisitor; pub struct ThinEdgeJsonSerializer { - buffer: String, + json: JsonWriter, is_within_group: bool, needs_separator: bool, default_timestamp: Option<DateTime<FixedOffset>>, @@ -21,6 +21,9 @@ pub enum ThinEdgeJsonSerializationError { #[error("Serializer produced invalid Utf8 string")] InvalidUtf8ConversionToString(std::string::FromUtf8Error), + + #[error(transparent)] + JsonWriterError(#[from] JsonWriterError), } #[derive(thiserror::Error, Debug)] @@ -45,11 +48,11 @@ impl ThinEdgeJsonSerializer { pub fn new_with_timestamp(default_timestamp: Option<DateTime<FixedOffset>>) -> Self { let capa = 1024; // XXX: Choose a capacity based on expected JSON length. - let mut buffer = String::with_capacity(capa); - buffer.push('{'); + let mut json = JsonWriter::with_capacity(capa); + json.write_open_obj(); Self { - buffer, + json, is_within_group: false, needs_separator: false, default_timestamp, @@ -68,35 +71,17 @@ impl ThinEdgeJsonSerializer { } } - self.buffer.push('}'); + self.json.write_close_obj(); Ok(()) } pub fn bytes(mut self) -> Result<Vec<u8>, ThinEdgeJsonSerializationError> { - self.end()?; - Ok(self.buffer.into()) + Ok(self.into_string()?.into_bytes()) } - // XXX: We need to abstract all this into a JsonSerializer. - fn write_key(&mut self, key: &str) { - self.write_str(key); - self.buffer.push(':'); - } - - fn write_str(&mut self, s: &str) { - self.buffer.push('"'); - self.buffer.push_str(s); - self.buffer.push('"'); - } - - fn write_f64(&mut self, value: f64) -> std::fmt::Result { - use std::fmt::Write; - self.buffer.write_fmt(format_args!("{}", value)) - } - - pub fn into_string(self) -> Result<String, ThinEdgeJsonSerializationError> { - String::from_utf8(self.bytes()?) - .map_err(ThinEdgeJsonSerializationError::InvalidUtf8ConversionToString) + pub fn into_string(&mut self) -> Result<String, ThinEdgeJsonSerializationError> { + self.end()?; + Ok(self.json.clone().into_string()?) } } @@ -115,10 +100,11 @@ impl GroupedMeasurementVisitor for ThinEdgeJsonSerializer { } if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } - self.write_key("time"); - self.write_str(timestamp.to_rfc3339().as_str()); + + self.json.write_key("time")?; + self.json.write_str(timestamp.to_rfc3339().as_str())?; self.needs_separator = true; self.timestamp_present = true; Ok(()) @@ -126,10 +112,10 @@ impl GroupedMeasurementVisitor for ThinEdgeJsonSerializer { fn measurement(&mut self, name: &str, value: f64) -> Result<(), Self::Error> { if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } - self.write_key(name); - self.write_f64(value)?; + self.json.write_key(name)?; + self.json.write_f64(value)?; self.needs_separator = true; Ok(()) } @@ -140,10 +126,10 @@ impl GroupedMeasurementVisitor for ThinEdgeJsonSerializer { } if self.needs_separator { - self.buffer.push(','); + self.json.write_separator(); } - self.write_key(group); - self.buffer.push('{'); + self.json.write_key(group)?; + self.json.write_open_obj(); self.needs_separator = false; self.is_within_group = true; Ok(()) @@ -154,7 +140,7 @@ impl GroupedMeasurementVisitor for ThinEdgeJsonSerializer { return Err(MeasurementStreamError::UnexpectedEndOfGroup.into()); } - self.buffer.push('}'); + self.json.write_close_obj(); self.needs_separator = true; self.is_within_group = false; Ok(()) @@ -172,106 +158,112 @@ mod tests { } #[test] - fn serialize_single_value_message() { + fn serialize_single_value_message() -> anyhow::Result<()> { let mut serializer = ThinEdgeJsonSerializer::new(); let timestamp = test_timestamp(); - serializer.timestamp(timestamp).unwrap(); - serializer.measurement("temperature", 25.5).unwrap(); + serializer.timestamp(timestamp)?; + serializer.measurement("temperature", 25.5)?; let body = r#""temperature":25.5"#; - let expected_output: Vec<u8> = - format!(r#"{{"time":"{}",{}}}"#, timestamp.to_rfc3339(), body).into(); - let output = serializer.bytes().unwrap(); + let expected_output = format!(r#"{{"time":"{}",{}}}"#, timestamp.to_rfc3339(), body); + let output = serializer.into_string()?; assert_eq!(output, expected_output); + Ok(()) } #[test] - fn serialize_single_value_no_timestamp_message() { + fn serialize_single_value_no_timestamp_message() -> anyhow::Result<()> { let mut serializer = ThinEdgeJsonSerializer::new(); - serializer.measurement("temperature", 25.5).unwrap(); - let expected_output = b"{\"temperature\":25.5}"; - let output = serializer.bytes().unwrap(); + serializer.measurement("temperature", 25.5)?; + let expected_output = r#"{"temperature":25.5}"#; + let output = serializer.into_string()?; assert_eq!(output, expected_output); + Ok(()) } #[test] - fn serialize_multi_value_message() { + fn serialize_multi_value_message() -> anyhow::Result<()> { let mut serializer = ThinEdgeJsonSerializer::new(); let timestamp = test_timestamp(); - serializer.timestamp(timestamp).unwrap(); - serializer.measurement("temperature", 25.5).unwrap(); - serializer.start_group("location").unwrap(); - serializer.measurement("alti", 2100.4).unwrap(); - serializer.measurement("longi", 2200.4).unwrap(); - serializer.measurement("lati", 2300.4).unwrap(); - serializer.end_group().unwrap(); - serializer.measurement("pressure", 255.0).unwrap(); - let body = r#""temperature":25.5,"location":{"alti":2100.4,"longi":2200.4,"lati":2300.4},"pressure":255}"#; - let expected_output: Vec<u8> = - format!(r#"{{"time":"{}",{}"#, timestamp.to_rfc3339(), body).into(); - let output = serializer.bytes().unwrap(); + serializer.timestamp(timestamp)?; + serializer.measurement("temperature", 25.5)?; + serializer.start_group("location")?; + serializer.measurement("alti", 2100.4)?; + serializer.measurement("longi", 2200.4)?; + serializer.measurement("lati", 2300.4)?; + serializer.end_group()?; + serializer.measurement("pressure", 255.0)?; + let body = r#""temperature":25.5,"location":{"alti":2100.4,"longi":2200.4,"lati":2300.4},"pressure":255.0}"#; + let expected_output = format!(r#"{{"time":"{}",{}"#, timestamp.to_rfc3339(), body); + let output = serializer.into_string()?; assert_eq!(expected_output, output); + Ok(()) } #[test] |