diff options
Diffstat (limited to 'crates/common/mqtt_client/examples/simple_mapper.rs')
-rw-r--r-- | crates/common/mqtt_client/examples/simple_mapper.rs | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/crates/common/mqtt_client/examples/simple_mapper.rs b/crates/common/mqtt_client/examples/simple_mapper.rs new file mode 100644 index 00000000..c3345e30 --- /dev/null +++ b/crates/common/mqtt_client/examples/simple_mapper.rs @@ -0,0 +1,69 @@ +use json::JsonValue; +use log::{debug, error, info}; +use mqtt_client::{Client, Config, Message, MqttClient, Topic}; + +#[tokio::main] +pub async fn main() -> Result<(), Box<dyn std::error::Error>> { + let name = "c8y_mapper"; + let in_topic = Topic::new("tedge/measurements")?; + let out_topic = Topic::new("c8y/s/us")?; + let err_topic = Topic::new("tedge/errors")?; + + env_logger::init(); + + info!("Mapping ThinEdge messages"); + let mqtt = Client::connect(name, &Config::default()).await?; + let mut errors = mqtt.subscribe_errors(); + tokio::spawn(async move { + while let Some(error) = errors.next().await { + error!("{}", error); + } + }); + + let mut messages = mqtt.subscribe(in_topic.filter()).await?; + while let Some(message) = messages.next().await { + debug!("Mapping {:?}", message); + match translate(message.payload_str()?) { + Ok(translation) => { + let _ = mqtt.publish(Message::new(&out_topic, translation)).await?; + } + Err(error) => { + debug!("Translation error: {}", error); + let _ = mqtt.publish(Message::new(&err_topic, error)).await?; + } + } + } + + Ok(()) +} + +const C8Y_TEMPLATE_TEMPERATURE: &str = "211"; + +/// Naive mapper which extracts the temperature field from a ThinEdge Json value. +/// +/// `{ "temperature": 12.4 }` is translated into `"211,12.4"` +fn translate(input: &str) -> Result<String, String> { + let json = json::parse(input).map_err(|err| format!("ERROR: {}", err))?; + match json { + JsonValue::Object(obj) => { + for (k, v) in obj.iter() { + if k != "temperature" { + return Err(format!("ERROR: unknown measurement type '{}'", k)); + } + match v { + JsonValue::Number(num) => { + let value: f64 = (*num).into(); + if value == 0.0 || value.is_normal() { + return Ok(format!("{},{}", C8Y_TEMPLATE_TEMPERATURE, value)); + } else { + return Err(format!("ERROR: value out of range '{}'", v)); + } + } + _ => return Err(format!("ERROR: expected a number, not '{}'", v)), + } + } + Err(String::from("ERROR: empty measurement")) + } + _ => return Err(format!("ERROR: expected a JSON object, not {}", json)), + } +} |