diff options
author | initard <alex.solomes@softwareag.com> | 2022-01-20 22:05:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-20 22:05:56 +0100 |
commit | f9658c812b13410c2bbbaecbce02291e15a03cf8 (patch) | |
tree | 5db4f6668296849eb439ae293f84dc1592475597 /crates/core/tedge_mapper/src/c8y_converter.rs | |
parent | 84140584dd24103baea69cfb3bc378d0134a6a8b (diff) |
Feature/686/c8y platform fragments (#757)
* adding c8y bridge to config (#686)
Signed-off-by: initard <solo@softwareag.com>
* creating inventory fragments and refactoring supported operations
fragments (#686)
Signed-off-by: initard <solo@softwareag.com>
* adding device name to c8y converter (#686)
Signed-off-by: initard <solo@softwareag.com>
* added errors to ConversionError to support json (#686)
Signed-off-by: initard <solo@softwareag.com>
* adding c8y_fragments module to project (#686)
Signed-off-by: initard <solo@softwareag.com>
* c8y fragments file (#686)
Signed-off-by: initard <solo@softwareag.com>
* matching on read_json_from_file in case it is not present (#686)
Signed-off-by: initard <solo@softwareag.com>
* fixing existing tests to take device name in converter (#686)
Signed-off-by: initard <solo@softwareag.com>
* added tests for inventory from file + individual fragment (#686)
Signed-off-by: initard <solo@softwareag.com>
* test fix (#686)
Signed-off-by: initard <solo@softwareag.com>
* removing unit test, does not work (#686)
Signed-off-by: initard <solo@softwareag.com>
* adding newly created topic for test (#686)
Signed-off-by: initard <solo@softwareag.com>
* removed unit test and refactoring (#686)
Signed-off-by: initard <solo@softwareag.com>
* rename to inventory (#686)
Signed-off-by: initard <solo@softwareag.com>
* refactoring with consts and renaming error messages (#686)
Signed-off-by: initard <solo@softwareag.com>
* removing unwrap (#686)
Signed-off-by: initard <solo@softwareag.com>
Co-authored-by: initard <solo@softwareag.com>
Diffstat (limited to 'crates/core/tedge_mapper/src/c8y_converter.rs')
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_converter.rs | 112 |
1 files changed, 99 insertions, 13 deletions
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y_converter.rs index 0fb6db98..59114f81 100644 --- a/crates/core/tedge_mapper/src/c8y_converter.rs +++ b/crates/core/tedge_mapper/src/c8y_converter.rs @@ -1,3 +1,4 @@ +use crate::c8y_fragments::C8yAgentFragment; use crate::error::*; use crate::size_threshold::SizeThreshold; use crate::{converter::*, operations::Operations}; @@ -6,18 +7,27 @@ use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSuppo use c8y_translator::json; use mqtt_channel::{Message, Topic}; use std::collections::HashSet; +use std::fs::File; +use std::io::Read; +use std::path::Path; use thin_edge_json::alarm::ThinEdgeAlarm; +use tracing::info; +const C8Y_CLOUD: &str = "c8y"; +const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.json"; +const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; +const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations"; const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; pub struct CumulocityConverter { pub(crate) size_threshold: SizeThreshold, children: HashSet<String>, pub(crate) mapper_config: MapperConfig, + device_name: String, } impl CumulocityConverter { - pub fn new(size_threshold: SizeThreshold) -> Self { + pub fn new(size_threshold: SizeThreshold, device_name: String) -> Self { let mut topic_fiter = make_valid_topic_filter_or_panic("tedge/measurements"); let () = topic_fiter .add("tedge/measurements/+") @@ -33,10 +43,12 @@ impl CumulocityConverter { }; let children: HashSet<String> = HashSet::new(); + CumulocityConverter { size_threshold, children, mapper_config, + device_name, } } @@ -108,17 +120,72 @@ impl Converter for CumulocityConverter { } fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> { - let ops = Operations::try_new("/etc/tedge/operations", "c8y")?; - let ops = ops.get_operations_list(); - let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>(); - - let ops_msg = SmartRestSetSupportedOperations::new(&ops); - let topic = Topic::new_unchecked("c8y/s/us"); - let msg = Message::new(&topic, ops_msg.to_smartrest()?); - Ok(vec![msg]) + let inventory_fragments_message = create_inventory_fragments_message(&self.device_name)?; + + let supported_operations_message = create_supported_operations_fragments()?; + + Ok(vec![ + supported_operations_message, + inventory_fragments_message, + ]) } } +fn create_supported_operations_fragments() -> Result<Message, ConversionError> { + let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?; + let ops = ops.get_operations_list(); + let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>(); + + let ops_msg = SmartRestSetSupportedOperations::new(&ops); + let topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); + Ok(Message::new(&topic, ops_msg.to_smartrest()?)) +} + +fn create_inventory_fragments_message(device_name: &str) -> Result<Message, ConversionError> { + let ops_msg = get_inventory_fragments(INVENTORY_FRAGMENTS_FILE_LOCATION)?; + + let topic = Topic::new_unchecked(&format!( + "{}/{}", + INVENTORY_MANAGED_OBJECTS_TOPIC, device_name + )); + Ok(Message::new(&topic, ops_msg.to_string())) +} + +/// reads a json file to serde_json::Value +/// +/// # Example +/// ``` +/// let json_value = read_json_from_file("/path/to/a/file").unwrap(); +/// ``` +fn read_json_from_file(file_path: &str) -> Result<serde_json::Value, ConversionError> { + let mut file = File::open(Path::new(file_path))?; + let mut data = String::new(); + file.read_to_string(&mut data)?; + let json: serde_json::Value = serde_json::from_str(&data)?; + Ok(json) +} + +/// gets a serde_json::Value of inventory +fn get_inventory_fragments(file_path: &str) -> Result<serde_json::Value, ConversionError> { + let agent_fragment = C8yAgentFragment::new()?; + let json_fragment = agent_fragment.to_json()?; + + match read_json_from_file(file_path) { + Ok(mut json) => { + json.as_object_mut() + .ok_or_else(|| return ConversionError::FromOptionError)? + .insert("c8y_Agent".to_string(), json_fragment); + Ok(json) + } + Err(_) => { + info!( + "Inventory fragments file not found at {}", + INVENTORY_FRAGMENTS_FILE_LOCATION + ); + Ok(json_fragment) + } + } +} fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionError> { match topic.strip_prefix("tedge/measurements/").map(String::from) { Some(maybe_id) if maybe_id.is_empty() => { @@ -131,6 +198,8 @@ fn get_child_id_from_topic(topic: &str) -> Result<Option<String>, ConversionErro #[cfg(test)] mod test { use super::*; + + use crate::c8y_converter::CumulocityConverter; use test_case::test_case; #[test_case("tedge/measurements/test", Some("test".to_string()); "valid child id")] @@ -151,7 +220,12 @@ mod test { #[test] fn convert_thin_edge_json_with_child_id() { - let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let device_name = String::from("test"); + + let mut converter = Box::new(CumulocityConverter::new( + SizeThreshold(16 * 1024), + device_name, + )); let in_topic = "tedge/measurements/child1"; let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; let in_message = Message::new(&Topic::new_unchecked(in_topic), in_payload); @@ -182,7 +256,12 @@ mod test { #[test] fn convert_first_thin_edge_json_invalid_then_valid_with_child_id() { - let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let device_name = String::from("test"); + + let mut converter = Box::new(CumulocityConverter::new( + SizeThreshold(16 * 1024), + device_name, + )); let in_topic = "tedge/measurements/child1"; let in_invalid_payload = r#"{"temp": invalid}"#; let in_valid_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; @@ -218,7 +297,12 @@ mod test { #[test] fn convert_two_thin_edge_json_messages_given_different_child_id() { - let mut converter = Box::new(CumulocityConverter::new(SizeThreshold(16 * 1024))); + let device_name = String::from("test"); + + let mut converter = Box::new(CumulocityConverter::new( + SizeThreshold(16 * 1024), + device_name, + )); let in_payload = r#"{"temp": 1, "time": "2021-11-16T17:45:40.571760714+01:00"}"#; // First message from "child1" @@ -269,7 +353,9 @@ mod test { #[test] fn check_c8y_threshold_packet_size() -> Result<(), anyhow::Error> { let size_threshold = SizeThreshold(16 * 1024); - let converter = CumulocityConverter::new(size_threshold); + let device_name = String::from("test"); + + let converter = CumulocityConverter::new(size_threshold, device_name); let buffer = create_packet(1024 * 20); let err = converter.size_threshold.validate(&buffer).unwrap_err(); assert_eq!( |