diff options
author | Lukasz Woznicki <lukasz.woznicki@softwareag.com> | 2022-02-09 13:55:02 +0000 |
---|---|---|
committer | Lukasz Woznicki <lukasz.woznicki@softwareag.com> | 2022-02-18 09:41:55 +0000 |
commit | ad065ec206ad3b48f9e1cc48080f4d2ecfe85a2e (patch) | |
tree | 2042dc1a03440f61b112006d6f12b532f0610566 /crates | |
parent | b1907dc4780709bad572a7108314c563eb02b70a (diff) |
Move module to subdirectories and adjust use
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates')
-rw-r--r-- | crates/core/c8y_api/src/http_proxy.rs | 34 | ||||
-rw-r--r-- | crates/core/c8y_smartrest/src/topic.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/converter.rs (renamed from crates/core/tedge_mapper/src/az_converter.rs) | 63 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/mapper.rs (renamed from crates/core/tedge_mapper/src/az_mapper.rs) | 9 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/az/mod.rs | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/converter.rs (renamed from crates/core/tedge_mapper/src/c8y_converter.rs) | 527 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/error.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs) | 9 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/fragments.rs (renamed from crates/core/tedge_mapper/src/c8y_fragments.rs) | 5 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mapper.rs | 95 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/mod.rs | 9 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/operations.rs | 219 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/tests.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs) | 209 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y/topic.rs | 128 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/c8y_mapper.rs | 45 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/batcher.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/batcher.rs) | 5 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/collectd.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/collectd.rs) | 0 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/error.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/error.rs) | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/mapper.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/mapper.rs) | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/mod.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/mod.rs) | 0 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/collectd/monitor.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/monitor.rs) | 8 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/main.rs | 40 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/component.rs (renamed from crates/core/tedge_mapper/src/component.rs) | 0 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/converter.rs (renamed from crates/core/tedge_mapper/src/converter.rs) | 8 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/error.rs (renamed from crates/core/tedge_mapper/src/error.rs) | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/mapper.rs (renamed from crates/core/tedge_mapper/src/mapper.rs) | 11 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/mod.rs | 6 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/operations.rs (renamed from crates/core/tedge_mapper/src/operations.rs) | 2 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/mapping/size_threshold.rs (renamed from crates/core/tedge_mapper/src/size_threshold.rs) | 1 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs | 425 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs | 4 | ||||
-rw-r--r-- | crates/core/tedge_mapper/src/tests.rs | 16 |
31 files changed, 1118 insertions, 774 deletions
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs index ceffe1ab..281b6a62 100644 --- a/crates/core/c8y_api/src/http_proxy.rs +++ b/crates/core/c8y_api/src/http_proxy.rs @@ -1,6 +1,7 @@ use crate::json_c8y::{ C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse, }; + use async_trait::async_trait; use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse}; use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter}; @@ -19,7 +20,7 @@ const RETRY_TIMEOUT_SECS: u64 = 60; /// An HttpProxy handles http requests to C8y on behalf of the device. #[async_trait] -pub trait C8YHttpProxy { +pub trait C8YHttpProxy: Send + Sync { async fn init(&mut self) -> Result<(), SMCumulocityMapperError>; fn url_is_in_my_tenant_domain(&self, url: &str) -> bool; @@ -341,6 +342,37 @@ impl C8YHttpProxy for JwtAuthHttpProxy { } } +pub struct FakeC8YHttpProxy {} + +#[async_trait::async_trait] +impl C8YHttpProxy for FakeC8YHttpProxy { + async fn init(&mut self) -> Result<(), SMCumulocityMapperError> { + Ok(()) + } + + fn url_is_in_my_tenant_domain(&self, _url: &str) -> bool { + true + } + + async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> { + Ok(SmartRestJwtResponse::try_new("71,fake-token")?) + } + + async fn send_software_list_http( + &mut self, + _c8y_software_list: &C8yUpdateSoftwareListResponse, + ) -> Result<(), SMCumulocityMapperError> { + Ok(()) + } + + async fn upload_log_binary( + &mut self, + _log_content: &str, + ) -> Result<String, SMCumulocityMapperError> { + Ok("fake/upload/url".into()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs index 2c6c96b0..7df4f5af 100644 --- a/crates/core/c8y_smartrest/src/topic.rs +++ b/crates/core/c8y_smartrest/src/topic.rs @@ -20,7 +20,7 @@ impl C8yTopic { } pub fn to_topic(&self) -> Result<Topic, MqttError> { - Ok(Topic::new(self.as_str())?) + Topic::new(self.as_str()) } } diff --git a/crates/core/tedge_mapper/src/az_converter.rs b/crates/core/tedge_mapper/src/az/converter.rs index d6ca77dc..824d21d1 100644 --- a/crates/core/tedge_mapper/src/az_converter.rs +++ b/crates/core/tedge_mapper/src/az/converter.rs @@ -1,6 +1,6 @@ -use crate::converter::*; -use crate::error::*; -use crate::size_threshold::SizeThreshold; +use crate::mapping::{converter::*, error::*, size_threshold::SizeThreshold}; + +use async_trait::async_trait; use clock::Clock; use mqtt_channel::Message; use thin_edge_json::serialize::ThinEdgeJsonSerializer; @@ -28,6 +28,7 @@ impl AzureConverter { } } +#[async_trait] impl Converter for AzureConverter { type Error = ConversionError; @@ -35,7 +36,7 @@ impl Converter for AzureConverter { &self.mapper_config } - fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> { + async fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> { let input = input.payload_str()?; let () = self.size_threshold.validate(input)?; let default_timestamp = self.add_timestamp.then(|| self.clock.now()); @@ -49,11 +50,19 @@ impl Converter for AzureConverter { #[cfg(test)] mod tests { - use super::*; - use crate::size_threshold::SizeThresholdExceeded; + use crate::{ + az::converter::AzureConverter, + mapping::{ + converter::*, + error::ConversionError, + size_threshold::{SizeThreshold, SizeThresholdExceeded}, + }, + }; + use assert_json_diff::*; use assert_matches::*; - use mqtt_channel::Topic; + use clock::Clock; + use mqtt_channel::{Message, Topic}; use serde_json::json; use time::macros::datetime; @@ -65,13 +74,13 @@ mod tests { } } - #[test] - fn converting_invalid_json_is_invalid() { + #[tokio::test] + async fn converting_invalid_json_is_invalid() { let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); let input = "This is not Thin Edge JSON"; - let result = converter.try_convert(&new_tedge_message(input)); + let result = converter.try_convert(&new_tedge_message(input)).await; assert_matches!(result, Err(ConversionError::FromThinEdgeJsonParser(_))) } @@ -84,8 +93,8 @@ mod tests { messages.pop().unwrap().payload_str().unwrap().to_string() } - #[test] - fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false( + #[tokio::test] + async fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false( ) { let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); @@ -98,7 +107,7 @@ mod tests { "temperature": 23.0 }); - let output = converter.convert(&new_tedge_message(input)); + let output = converter.convert(&new_tedge_message(input)).await; assert_json_eq!( serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) @@ -107,9 +116,9 @@ mod tests { ); } - #[test] - fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false() - { + #[tokio::test] + async fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false( + ) { let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024)); @@ -123,7 +132,7 @@ mod tests { "temperature": 23.0 }); - let output = converter.convert(&new_tedge_message(input)); + let output = converter.convert(&new_tedge_message(input)).await; assert_json_eq!( serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) @@ -132,9 +141,9 @@ mod tests { ); } - #[test] - fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true() - { + #[tokio::test] + async fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true( + ) { let mut converter = AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024)); @@ -148,7 +157,7 @@ mod tests { "temperature": 23.0 }); - let output = converter.convert(&new_tedge_message(input)); + let output = converter.convert(&new_tedge_message(input)).await; assert_json_eq!( serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) @@ -157,8 +166,8 @@ mod tests { ); } - #[test] - fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true( + #[tokio::test] + async fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true( ) { let mut converter = AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024)); @@ -172,7 +181,7 @@ mod tests { "time": "2021-04-08T00:00:00+05:00" }); - let output = converter.convert(&new_tedge_message(input)); + let output = converter.convert(&new_tedge_message(input)).await; assert_json_eq!( serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output)) @@ -181,12 +190,12 @@ mod tests { ); } - #[test] - fn exceeding_threshold_returns_error() { + #[tokio::test] + async fn exceeding_threshold_returns_error() { let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1)); let input = "ABC"; - let result = converter.try_convert(&new_tedge_message(input)); + let result = converter.try_convert(&new_tedge_message(input)).await; assert_matches!( result, diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs index 57a94b66..5bf2b2f3 100644 --- a/crates/core/tedge_mapper/src/az_mapper.rs +++ b/crates/core/tedge_mapper/src/az/mapper.rs @@ -1,7 +1,8 @@ -use crate::az_converter::AzureConverter; -use crate::component::TEdgeComponent; -use crate::mapper::*; -use crate::size_threshold::SizeThreshold; +use crate::{ + az::converter::AzureConverter, + mapping::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold}, +}; + use async_trait::async_trait; use clock::WallClock; use tedge_config::{AzureMapperTimestamp, TEdgeConfig}; diff --git a/crates/core/tedge_mapper/src/az/mod.rs b/crates/core/tedge_mapper/src/az/mod.rs new file mode 100644 index 00000000..fdb7b6f0 --- /dev/null +++ b/crates/core/tedge_mapper/src/az/mod.rs @@ -0,0 +1,2 @@ +mod converter; +pub mod mapper; diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs index 16efb66e..6059ed7c 100644 --- a/crates/core/tedge_mapper/src/c8y_converter.rs +++ b/crates/core/tedge_mapper/src/c8y/converter.rs @@ -1,43 +1,81 @@ -use crate::c8y_fragments::{C8yAgentFragment, C8yDeviceDataFragment}; -use crate::error::*; -use crate::size_threshold::SizeThreshold; -use crate::{converter::*, operations::Operations}; -use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations}; -use c8y_smartrest::{alarm, event}; +use crate::mapping::{ + converter::*, error::*, operations::Operations, size_threshold::SizeThreshold, +}; +use agent_interface::{ + topic::{RequestTopic, ResponseTopic}, + Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest, + RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse, +}; +use async_trait::async_trait; +use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse}; +use c8y_smartrest::{ + alarm, + error::SmartRestDeserializerError, + event::serialize_event, + smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware}, + smartrest_serializer::{ + CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer, + SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed, + SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType, + SmartRestSetSupportedOperations, + }, +}; use c8y_translator::json; -use mqtt_channel::{Message, Topic}; -use std::collections::hash_map::Entry; -use std::collections::{HashMap, HashSet}; -use std::fs::File; -use std::io::Read; -use std::path::Path; -use thin_edge_json::alarm::ThinEdgeAlarm; -use thin_edge_json::event::ThinEdgeEvent; -use tracing::info; +use mqtt_channel::{Message, Topic, TopicFilter}; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + fs::File, + io::Read, + path::Path, + process::Stdio, +}; +use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent}; +use tracing::{debug, info, log::error}; + +use super::{ + error::CumulocityMapperError, + fragments::{C8yAgentFragment, C8yDeviceDataFragment}, + mapper::CumulocityMapper, + topic::{C8yTopic, MapperSubscribeTopic}, +}; const C8Y_CLOUD: &str = "c8y"; const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.json"; -const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations"; +const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update"; const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us"; const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/"; const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/"; const TEDGE_MEASUREMENTS_TOPIC: &str = "tedge/measurements"; const TEDGE_EVENTS_TOPIC: &str = "tedge/events/"; -pub struct CumulocityConverter { +#[derive(Debug)] +pub struct CumulocityConverter<Proxy> +where + Proxy: C8YHttpProxy, +{ pub(crate) size_threshold: SizeThreshold, children: HashSet<String>, pub(crate) mapper_config: MapperConfig, device_name: String, device_type: String, - alarm_converter: AlarmConverter, + operations: Operations, + http_proxy: Proxy, } -impl CumulocityConverter { - pub fn new(size_threshold: SizeThreshold, device_name: String, device_type: String) -> Self { - let topics = vec![ +impl<Proxy> CumulocityConverter<Proxy> +where + Proxy: C8YHttpProxy, +{ + pub fn new( + size_threshold: SizeThreshold, + device_name: String, + device_type: String, + operations: Operations, + http_proxy: Proxy, + ) -> Self { + let mut topic_filter: TopicFilter = vec![ "tedge/measurements", "tedge/measurements/+", "tedge/alarms/+/+", @@ -47,16 +85,18 @@ impl CumulocityConverter { .try_into() .expect("topics that mapper should subscribe to"); + let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap()); + let mapper_config = MapperConfig { - in_topic_filter: topics, + in_topic_filter: topic_filter, out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"), errors_topic: make_valid_topic_or_panic("tedge/errors"), }; - let children: HashSet<String> = HashSet::new(); - let alarm_converter = AlarmConverter::new(); + let children: HashSet<String> = HashSet::new(); + CumulocityConverter { size_threshold, children, @@ -64,6 +104,8 @@ impl CumulocityConverter { device_name, device_type, alarm_converter, + operations, + http_proxy, } } @@ -84,7 +126,7 @@ impl CumulocityConverter { self.children.insert(child_id.clone()); vec.push(Message::new( &Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC), - format!("101,{},{},thin-edge.io-child", child_id, child_id), + format!("101,{child_id},{child_id},thin-edge.io-child"), )); } @@ -106,49 +148,118 @@ impl CumulocityConverter { fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> { let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?; - let smartrest_alarm = event::serialize_event(tedge_event)?; + let smartrest_alarm = serialize_event(tedge_event)?; let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC); Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)]) } } -impl Converter for CumulocityConverter { +#[async_trait] +impl<Proxy> Converter for CumulocityConverter<Proxy> +where + Proxy: C8YHttpProxy, +{ type Error = ConversionError; fn get_mapper_config(&self) -> &MapperConfig { &self.mapper_config } + async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> { + let () = self.size_threshold.validate(message.payload_str()?)?; + + match &message.topic { + topic if topic.name.starts_with("tedge/measurements") => { + self.try_convert_measurement(message) + } + topic if topic.name.starts_with("tedge/alarms") => { + self.alarm_converter.try_convert_alarm(message) + } + topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => { + self.alarm_converter.process_internal_alarm(message); + Ok(vec![]) + } + topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message), + topic => match topic.clone().try_into() { + Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => { + debug!("Software list"); + Ok(validate_and_publish_software_list( + message.payload_str()?, + &mut self.http_proxy, + ) + .await + .unwrap()) + } + Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareUpdateResponse)) => { + debug!("Software update"); + Ok( + publish_operation_status(message.payload_str()?, &mut self.http_proxy) + .await + .unwrap(), + ) + } + Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse)) => { + Ok(publish_restart_operation_status(message.payload_str()?) + .await + .unwrap()) + } + Ok(MapperSubscribeTopic::C8yTopic(_)) => { + debug!("Cumulocity"); + match process_smartrest( + message.payload_str()?, + &self.operations, + &mut self.http_proxy, + ) + .await + { + Err( + ref err @ CumulocityMapperError::FromSmartRestDeserializer( + SmartRestDeserializerError::InvalidParameter { + ref operation, .. + }, + ), + ) => { + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let msg1 = Message::new(&topic, format!("501,{}", operation)); + let msg2 = Message::new( + &topic, + format!("502,{},\"{}\"", operation, &err.to_string()), + ); + error!("{}", err); + return Ok(vec![msg1, msg2]); + } - fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> { - let () = self.size_threshold.validate(input.payload_str()?)?; - - if input.topic.name.starts_with(TEDGE_MEASUREMENTS_TOPIC) { - self.try_convert_measurement(input) - } else if input.topic.name.starts_with(TEDGE_ALARMS_TOPIC) { - self.alarm_converter.try_convert_alarm(input) - } else if input.topic.name.starts_with(INTERNAL_ALARMS_TOPIC) { - self.alarm_converter.process_internal_alarm(input); - Ok(vec![]) - } else if input.topic.name.starts_with(TEDGE_EVENTS_TOPIC) { - self.try_convert_event(input) - } else { - Err(ConversionError::UnsupportedTopic(input.topic.name.clone())) + Err(err) => { + error!("{}", err); + Ok(vec![]) + } + + Ok(msgs) => Ok(msgs), + } + } + _ => Err(ConversionError::UnsupportedTopic( + message.topic.name.clone(), + )), + }, } } fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> { let inventory_fragments_message = create_inventory_fragments_message(&self.device_name)?; - - let supported_operations_message = create_supported_operations_fragments()?; - + let supported_operations_message = create_supported_operations_fragments_message()?; let device_data_message = create_device_data_fragments(&self.device_name, &self.device_type)?; + let supported_log_types_message = create_supported_log_types_message()?; + let pending_operations_message = create_get_pending_operations_message()?; + let software_list_message = create_get_software_list_message()?; Ok(vec![ + inventory_fragments_message, supported_operations_message, device_data_message, - inventory_fragments_message, + supported_log_types_message, + pending_operations_message, + software_list_message, ]) } @@ -159,6 +270,7 @@ impl Converter for CumulocityConverter { } } +#[derive(Debug, Clone, PartialEq, Eq)] enum AlarmConverter { Syncing { pending_alarms_map: HashMap<String, Message>, @@ -295,7 +407,6 @@ impl AlarmConverter { sync_messages } } - fn create_device_data_fragments( device_name: &str, device_type: &str, @@ -307,7 +418,27 @@ fn create_device_data_fragments( Ok(Message::new(&topic, ops_msg.to_string())) } -fn create_supported_operations_fragments() -> Result<Message, ConversionError> { +fn create_get_software_list_message() -> Result<Message, ConversionError> { + let request = SoftwareListRequest::default(); + let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?; + let payload = request.to_json().unwrap(); + Ok(Message::new(&topic, payload)) +} + +fn create_get_pending_operations_message() -> Result<Message, ConversionError> { + let data = SmartRestGetPendingOperations::default(); + let topic = C8yTopic::SmartRestResponse.to_topic()?; + let payload = data.to_smartrest()?; + Ok(Message::new(&topic, payload)) +} + +fn create_supported_log_types_message() -> Result<Message, ConversionError> { + let payload = SmartRestSetSupportedLogType::default().to_smartrest()?; + let topic = C8yTopic::SmartRestResponse.to_topic()?; + Ok(Message::new(&topic, payload)) +} + +fn create_supported_operations_fragments_message() -> Result<Message, ConversionError> { let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?; let ops = ops.get_operations_list(); let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>(); @@ -320,10 +451,182 @@ fn create_supported_operations_fragments() -> Result<Message, ConversionError> { fn create_inventory_fragments_message(device_name: &str) -> Result<Message, ConversionError> { let ops_msg = get_inventory_fragments(INVENTORY_FRAGMENTS_FILE_LOCATION)?; - let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",)); + let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}")); Ok(Message::new(&topic, ops_msg.to_string())) } +async fn publish_restart_operation_status( + json_response: &str, +) -> Result<Vec<Message>, CumulocityMapperError> { + let response = RestartOperationResponse::from_json(json_response)?; + let topic = C8yTopic::SmartRestResponse.to_topic()?; + + match response.status() { + OperationStatus::Executing => { + let smartrest_set_operation = SmartRestSetOperationToExecuting::new( + CumulocitySupportedOperations::C8yRestartRequest, + ) + |