summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y_converter.rs
diff options
context:
space:
mode:
authorinitard <alex.solomes@softwareag.com>2022-01-20 22:05:56 +0100
committerGitHub <noreply@github.com>2022-01-20 22:05:56 +0100
commitf9658c812b13410c2bbbaecbce02291e15a03cf8 (patch)
tree5db4f6668296849eb439ae293f84dc1592475597 /crates/core/tedge_mapper/src/c8y_converter.rs
parent84140584dd24103baea69cfb3bc378d0134a6a8b (diff)
Feature/686/c8y platform fragments (#757)
* adding c8y bridge to config (#686) Signed-off-by: initard <solo@softwareag.com> * creating inventory fragments and refactoring supported operations fragments (#686) Signed-off-by: initard <solo@softwareag.com> * adding device name to c8y converter (#686) Signed-off-by: initard <solo@softwareag.com> * added errors to ConversionError to support json (#686) Signed-off-by: initard <solo@softwareag.com> * adding c8y_fragments module to project (#686) Signed-off-by: initard <solo@softwareag.com> * c8y fragments file (#686) Signed-off-by: initard <solo@softwareag.com> * matching on read_json_from_file in case it is not present (#686) Signed-off-by: initard <solo@softwareag.com> * fixing existing tests to take device name in converter (#686) Signed-off-by: initard <solo@softwareag.com> * added tests for inventory from file + individual fragment (#686) Signed-off-by: initard <solo@softwareag.com> * test fix (#686) Signed-off-by: initard <solo@softwareag.com> * removing unit test, does not work (#686) Signed-off-by: initard <solo@softwareag.com> * adding newly created topic for test (#686) Signed-off-by: initard <solo@softwareag.com> * removed unit test and refactoring (#686) Signed-off-by: initard <solo@softwareag.com> * rename to inventory (#686) Signed-off-by: initard <solo@softwareag.com> * refactoring with consts and renaming error messages (#686) Signed-off-by: initard <solo@softwareag.com> * removing unwrap (#686) Signed-off-by: initard <solo@softwareag.com> Co-authored-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y_converter.rs')
-rw-r--r--crates/core/tedge_mapper/src/c8y_converter.rs112
1 files changed, 99 insertions, 13 deletions
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs
index 0fb6db98..59114f81 100644
--- a/crates/core/tedge_mapper/src/c8y_converter.rs
+++ b/crates/core/tedge_mapper/src/c8y_converter.rs
@@ -1,3 +1,4 @@
+use crate::c8y_fragments::C8yAgentFragment;
use crate::error::*;
use crate::size_threshold::SizeThreshold;
use crate::{converter::*, operations::Operations};
@@ -6,18 +7,27 @@ use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSuppo
use c8y_translator::json;
use mqtt_channel::{Message, Topic};
use std::collections::HashSet;
+use std::fs::File;
+use std::io::Read;
+use std::path::Path;
use thin_edge_json::alarm::ThinEdgeAlarm;
+use tracing::info;
+const C8Y_CLOUD: &str = "c8y";
+const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.json";
+const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
+const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations";
const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
pub struct CumulocityConverter {
pub(crate) size_threshold: SizeThreshold,
children: HashSet<String>,
pub(crate) mapper_config: MapperConfig,
+ device_name: String,
}
impl CumulocityConverter {
- pub fn new(size_threshold: SizeThreshold) -> Self {
+ pub fn new(size_threshold: SizeThreshold, device_name: String) -> Self {
let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements");
let () = topic_fiter
.add("tedge/measurements/+")
@@ -33,10 +43,12 @@ impl CumulocityConverter {
};
let children: HashSet<String> = HashSet::new();
+
CumulocityConverter {
size_threshold,
children,
mapper_config,
+ device_name,
}
}
@@ -108,17 +120,72 @@ impl Converter for CumulocityConverter {
}
fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> {
- let ops = Operations::try_new("/etc/tedge/operations", "c8y")?;
- let ops = ops.get_operations_list();
- let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>();
-
- let ops_msg = SmartRestSetSupportedOperations::new(&ops);
- let topic = Topic::new_unchecked("c8y/s/us");
- let msg = Message::new(&topic, ops_msg.to_smartrest()?);
- Ok(vec![msg])
+ let inventory_fragments_message = create_inventory_fragments_message(&self.device_name)?;
+
+ let supported_operations_message = create_supported_operations_fragments()?;
+
+ Ok(vec![
+ supported_operations_message,
+ inventory_fragments_message,
+ ])
}
}
+fn create_supported_operations_fragments() -> Result<Message, ConversionError> {
+ let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?;
+ let ops = ops.get_operations_list();
+ let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>();
+
+ let ops_msg = SmartRestSetSupportedOperations::new(&ops);
+ let topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
+ Ok(Message::new(&topic, ops_msg.to_smartrest()?))
+}
+
+fn create_inventory_fragments_message(device_name: &str) -> Result<Message, ConversionError> {
+ let ops_msg = get_inventory_fragments(INVENTORY_FRAGMENTS_FILE_LOCATION)?;
+
+ let topic = Topic::new_unchecked(&format!(
+ "{}/{}",
+ INVENTORY_MANAGED_OBJECTS_TOPIC, device_name
+ ));
+ Ok(Message::new(&topic, ops_msg.to_string()))
+}
+
+/// reads a json file to serde_json::Value
+///
+/// # Example
+/// ```
+/// let json_value = read_json_from_file("/path/to/a/file").unwrap();
+/// ```
+fn read_json_from_file(file_path: &str) -> Result<serde_json::Value, ConversionError> {
+ let mut file = File::open(Path::new(file_path))?;
+ let mut data = String::new();
+ file.read_to_string(&mut data)?;
+ let json: serde_json::Value = serde_json::from_str(&data)?;
+ Ok(json)
+}
+
+/// gets a serde_json::Value of inventory
+fn get_inventory_fragments(file_path: &str) -> Result<serde_json::Value, ConversionError> {
+ let agent_fragment = C8yAgentFragment::new()?;
+ let json_fragment = agent_fragment.to_json()?;
+
+ match read_json_from_file(file_path) {
+ Ok(mut json) => {
+ json.as_object_mut()
+ .ok_or_else(|| return ConversionError::FromOptionError)?
+ .insert("c8y_Agent".to_string(), json_fragment);
+ Ok(json)
+ }
+ Err(_) => {
+ info!(
+ "Inventory fragments file not found at {}",
+ INVENTORY_FRAGMENTS_FILE_LOCATION
+ );
+ Ok(json_fragment)
+ }
+ }
+}
fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> {
match topic.strip_prefix("tedge/measurements/").map(String::from) {
Some(maybe_id) if maybe_id.is_empty() => {
@@ -131,6 +198,8 @@ fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionErro
#[cfg(test)]
mod test {
use super::*;
+
+ use crate::c8y_converter::CumulocityConverter;
use test_case::test_case;
#[test_case("tedge/measurements/test", Some("test".to_string()); "valid child id")]
@@ -151,7 +220,12 @@ mod test {
#[test]
fn convert_thin_edge_json_with_child_id() {
- let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let device_name = String::from("test");
+
+ let mut converter = Box::new(CumulocityConverter::new(
+ SizeThreshold(16 * 1024),
+ device_name,
+ ));
let in_topic = "tedge/measurements/child1";
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload);
@@ -182,7 +256,12 @@ mod test {
#[test]
fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() {
- let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let device_name = String::from("test");
+
+ let mut converter = Box::new(CumulocityConverter::new(
+ SizeThreshold(16 * 1024),
+ device_name,
+ ));
let in_topic = "tedge/measurements/child1";
let in_invalid_payload = r#"{"temp": invalid}"#;
let in_valid_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
@@ -218,7 +297,12 @@ mod test {
#[test]
fn convert_two_thin_edge_json_messages_given_different_child_id() {
- let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024)));
+ let device_name = String::from("test");
+
+ let mut converter = Box::new(CumulocityConverter::new(
+ SizeThreshold(16 * 1024),
+ device_name,
+ ));
let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#;
// First message from "child1"
@@ -269,7 +353,9 @@ mod test {
#[test]
fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(16 * 1024);
- let converter = CumulocityConverter::new(size_threshold);
+ let device_name = String::from("test");
+
+ let converter = CumulocityConverter::new(size_threshold, device_name);
let buffer = create_packet(1024 * 20);
let err = converter.size_threshold.validate(&buffer).unwrap_err();
assert_eq!(