summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorLukasz Woznicki <lukasz.woznicki@softwareag.com>2022-02-09 13:55:02 +0000
committerLukasz Woznicki <lukasz.woznicki@softwareag.com>2022-02-18 09:41:55 +0000
commitad065ec206ad3b48f9e1cc48080f4d2ecfe85a2e (patch)
tree2042dc1a03440f61b112006d6f12b532f0610566 /crates/core
parentb1907dc4780709bad572a7108314c563eb02b70a (diff)
Move module to subdirectories and adjust use
Signed-off-by: Lukasz Woznicki <lukasz.woznicki@softwareag.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/c8y_api/src/http_proxy.rs34
-rw-r--r--crates/core/c8y_smartrest/src/topic.rs2
-rw-r--r--crates/core/tedge_mapper/src/az/converter.rs (renamed from crates/core/tedge_mapper/src/az_converter.rs)63
-rw-r--r--crates/core/tedge_mapper/src/az/mapper.rs (renamed from crates/core/tedge_mapper/src/az_mapper.rs)9
-rw-r--r--crates/core/tedge_mapper/src/az/mod.rs2
-rw-r--r--crates/core/tedge_mapper/src/c8y/converter.rs (renamed from crates/core/tedge_mapper/src/c8y_converter.rs)527
-rw-r--r--crates/core/tedge_mapper/src/c8y/error.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/error.rs)9
-rw-r--r--crates/core/tedge_mapper/src/c8y/fragments.rs (renamed from crates/core/tedge_mapper/src/c8y_fragments.rs)5
-rw-r--r--crates/core/tedge_mapper/src/c8y/mapper.rs95
-rw-r--r--crates/core/tedge_mapper/src/c8y/mod.rs9
-rw-r--r--crates/core/tedge_mapper/src/c8y/operations.rs219
-rw-r--r--crates/core/tedge_mapper/src/c8y/tests.rs (renamed from crates/core/tedge_mapper/src/sm_c8y_mapper/tests.rs)209
-rw-r--r--crates/core/tedge_mapper/src/c8y/topic.rs128
-rw-r--r--crates/core/tedge_mapper/src/c8y_mapper.rs45
-rw-r--r--crates/core/tedge_mapper/src/collectd/batcher.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/batcher.rs)5
-rw-r--r--crates/core/tedge_mapper/src/collectd/collectd.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/collectd.rs)0
-rw-r--r--crates/core/tedge_mapper/src/collectd/error.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/error.rs)2
-rw-r--r--crates/core/tedge_mapper/src/collectd/mapper.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/mapper.rs)4
-rw-r--r--crates/core/tedge_mapper/src/collectd/mod.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/mod.rs)0
-rw-r--r--crates/core/tedge_mapper/src/collectd/monitor.rs (renamed from crates/core/tedge_mapper/src/collectd_mapper/monitor.rs)8
-rw-r--r--crates/core/tedge_mapper/src/main.rs40
-rw-r--r--crates/core/tedge_mapper/src/mapping/component.rs (renamed from crates/core/tedge_mapper/src/component.rs)0
-rw-r--r--crates/core/tedge_mapper/src/mapping/converter.rs (renamed from crates/core/tedge_mapper/src/converter.rs)8
-rw-r--r--crates/core/tedge_mapper/src/mapping/error.rs (renamed from crates/core/tedge_mapper/src/error.rs)4
-rw-r--r--crates/core/tedge_mapper/src/mapping/mapper.rs (renamed from crates/core/tedge_mapper/src/mapper.rs)11
-rw-r--r--crates/core/tedge_mapper/src/mapping/mod.rs6
-rw-r--r--crates/core/tedge_mapper/src/mapping/operations.rs (renamed from crates/core/tedge_mapper/src/operations.rs)2
-rw-r--r--crates/core/tedge_mapper/src/mapping/size_threshold.rs (renamed from crates/core/tedge_mapper/src/size_threshold.rs)1
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mapper.rs425
-rw-r--r--crates/core/tedge_mapper/src/sm_c8y_mapper/mod.rs4
-rw-r--r--crates/core/tedge_mapper/src/tests.rs16
31 files changed, 1118 insertions, 774 deletions
diff --git a/crates/core/c8y_api/src/http_proxy.rs b/crates/core/c8y_api/src/http_proxy.rs
index ceffe1ab..281b6a62 100644
--- a/crates/core/c8y_api/src/http_proxy.rs
+++ b/crates/core/c8y_api/src/http_proxy.rs
@@ -1,6 +1,7 @@
use crate::json_c8y::{
C8yCreateEvent, C8yManagedObject, C8yUpdateSoftwareListResponse, InternalIdResponse,
};
+
use async_trait::async_trait;
use c8y_smartrest::{error::SMCumulocityMapperError, smartrest_deserializer::SmartRestJwtResponse};
use mqtt_channel::{Connection, PubChannel, StreamExt, Topic, TopicFilter};
@@ -19,7 +20,7 @@ const RETRY_TIMEOUT_SECS: u64 = 60;
/// An HttpProxy handles http requests to C8y on behalf of the device.
#[async_trait]
-pub trait C8YHttpProxy {
+pub trait C8YHttpProxy: Send + Sync {
async fn init(&mut self) -> Result<(), SMCumulocityMapperError>;
fn url_is_in_my_tenant_domain(&self, url: &str) -> bool;
@@ -341,6 +342,37 @@ impl C8YHttpProxy for JwtAuthHttpProxy {
}
}
+pub struct FakeC8YHttpProxy {}
+
+#[async_trait::async_trait]
+impl C8YHttpProxy for FakeC8YHttpProxy {
+ async fn init(&mut self) -> Result<(), SMCumulocityMapperError> {
+ Ok(())
+ }
+
+ fn url_is_in_my_tenant_domain(&self, _url: &str) -> bool {
+ true
+ }
+
+ async fn get_jwt_token(&mut self) -> Result<SmartRestJwtResponse, SMCumulocityMapperError> {
+ Ok(SmartRestJwtResponse::try_new("71,fake-token")?)
+ }
+
+ async fn send_software_list_http(
+ &mut self,
+ _c8y_software_list: &C8yUpdateSoftwareListResponse,
+ ) -> Result<(), SMCumulocityMapperError> {
+ Ok(())
+ }
+
+ async fn upload_log_binary(
+ &mut self,
+ _log_content: &str,
+ ) -> Result<String, SMCumulocityMapperError> {
+ Ok("fake/upload/url".into())
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/crates/core/c8y_smartrest/src/topic.rs b/crates/core/c8y_smartrest/src/topic.rs
index 2c6c96b0..7df4f5af 100644
--- a/crates/core/c8y_smartrest/src/topic.rs
+++ b/crates/core/c8y_smartrest/src/topic.rs
@@ -20,7 +20,7 @@ impl C8yTopic {
}
pub fn to_topic(&self) -> Result<Topic, MqttError> {
- Ok(Topic::new(self.as_str())?)
+ Topic::new(self.as_str())
}
}
diff --git a/crates/core/tedge_mapper/src/az_converter.rs b/crates/core/tedge_mapper/src/az/converter.rs
index d6ca77dc..824d21d1 100644
--- a/crates/core/tedge_mapper/src/az_converter.rs
+++ b/crates/core/tedge_mapper/src/az/converter.rs
@@ -1,6 +1,6 @@
-use crate::converter::*;
-use crate::error::*;
-use crate::size_threshold::SizeThreshold;
+use crate::mapping::{converter::*, error::*, size_threshold::SizeThreshold};
+
+use async_trait::async_trait;
use clock::Clock;
use mqtt_channel::Message;
use thin_edge_json::serialize::ThinEdgeJsonSerializer;
@@ -28,6 +28,7 @@ impl AzureConverter {
}
}
+#[async_trait]
impl Converter for AzureConverter {
type Error = ConversionError;
@@ -35,7 +36,7 @@ impl Converter for AzureConverter {
&self.mapper_config
}
- fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
+ async fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, Self::Error> {
let input = input.payload_str()?;
let () = self.size_threshold.validate(input)?;
let default_timestamp = self.add_timestamp.then(|| self.clock.now());
@@ -49,11 +50,19 @@ impl Converter for AzureConverter {
#[cfg(test)]
mod tests {
- use super::*;
- use crate::size_threshold::SizeThresholdExceeded;
+ use crate::{
+ az::converter::AzureConverter,
+ mapping::{
+ converter::*,
+ error::ConversionError,
+ size_threshold::{SizeThreshold, SizeThresholdExceeded},
+ },
+ };
+
use assert_json_diff::*;
use assert_matches::*;
- use mqtt_channel::Topic;
+ use clock::Clock;
+ use mqtt_channel::{Message, Topic};
use serde_json::json;
use time::macros::datetime;
@@ -65,13 +74,13 @@ mod tests {
}
}
- #[test]
- fn converting_invalid_json_is_invalid() {
+ #[tokio::test]
+ async fn converting_invalid_json_is_invalid() {
let mut converter =
AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
let input = "This is not Thin Edge JSON";
- let result = converter.try_convert(&new_tedge_message(input));
+ let result = converter.try_convert(&new_tedge_message(input)).await;
assert_matches!(result, Err(ConversionError::FromThinEdgeJsonParser(_)))
}
@@ -84,8 +93,8 @@ mod tests {
messages.pop().unwrap().payload_str().unwrap().to_string()
}
- #[test]
- fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false(
+ #[tokio::test]
+ async fn converting_input_without_timestamp_produces_output_without_timestamp_given_add_timestamp_is_false(
) {
let mut converter =
AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
@@ -98,7 +107,7 @@ mod tests {
"temperature": 23.0
});
- let output = converter.convert(&new_tedge_message(input));
+ let output = converter.convert(&new_tedge_message(input)).await;
assert_json_eq!(
serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
@@ -107,9 +116,9 @@ mod tests {
);
}
- #[test]
- fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false()
- {
+ #[tokio::test]
+ async fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_false(
+ ) {
let mut converter =
AzureConverter::new(false, Box::new(TestClock), SizeThreshold(255 * 1024));
@@ -123,7 +132,7 @@ mod tests {
"temperature": 23.0
});
- let output = converter.convert(&new_tedge_message(input));
+ let output = converter.convert(&new_tedge_message(input)).await;
assert_json_eq!(
serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
@@ -132,9 +141,9 @@ mod tests {
);
}
- #[test]
- fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true()
- {
+ #[tokio::test]
+ async fn converting_input_with_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true(
+ ) {
let mut converter =
AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024));
@@ -148,7 +157,7 @@ mod tests {
"temperature": 23.0
});
- let output = converter.convert(&new_tedge_message(input));
+ let output = converter.convert(&new_tedge_message(input)).await;
assert_json_eq!(
serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
@@ -157,8 +166,8 @@ mod tests {
);
}
- #[test]
- fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true(
+ #[tokio::test]
+ async fn converting_input_without_timestamp_produces_output_with_timestamp_given_add_timestamp_is_true(
) {
let mut converter =
AzureConverter::new(true, Box::new(TestClock), SizeThreshold(255 * 1024));
@@ -172,7 +181,7 @@ mod tests {
"time": "2021-04-08T00:00:00+05:00"
});
- let output = converter.convert(&new_tedge_message(input));
+ let output = converter.convert(&new_tedge_message(input)).await;
assert_json_eq!(
serde_json::from_str::<serde_json::Value>(&extract_first_message_payload(output))
@@ -181,12 +190,12 @@ mod tests {
);
}
- #[test]
- fn exceeding_threshold_returns_error() {
+ #[tokio::test]
+ async fn exceeding_threshold_returns_error() {
let mut converter = AzureConverter::new(false, Box::new(TestClock), SizeThreshold(1));
let input = "ABC";
- let result = converter.try_convert(&new_tedge_message(input));
+ let result = converter.try_convert(&new_tedge_message(input)).await;
assert_matches!(
result,
diff --git a/crates/core/tedge_mapper/src/az_mapper.rs b/crates/core/tedge_mapper/src/az/mapper.rs
index 57a94b66..5bf2b2f3 100644
--- a/crates/core/tedge_mapper/src/az_mapper.rs
+++ b/crates/core/tedge_mapper/src/az/mapper.rs
@@ -1,7 +1,8 @@
-use crate::az_converter::AzureConverter;
-use crate::component::TEdgeComponent;
-use crate::mapper::*;
-use crate::size_threshold::SizeThreshold;
+use crate::{
+ az::converter::AzureConverter,
+ mapping::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold},
+};
+
use async_trait::async_trait;
use clock::WallClock;
use tedge_config::{AzureMapperTimestamp, TEdgeConfig};
diff --git a/crates/core/tedge_mapper/src/az/mod.rs b/crates/core/tedge_mapper/src/az/mod.rs
new file mode 100644
index 00000000..fdb7b6f0
--- /dev/null
+++ b/crates/core/tedge_mapper/src/az/mod.rs
@@ -0,0 +1,2 @@
+mod converter;
+pub mod mapper;
diff --git a/crates/core/tedge_mapper/src/c8y_converter.rs b/crates/core/tedge_mapper/src/c8y/converter.rs
index 16efb66e..6059ed7c 100644
--- a/crates/core/tedge_mapper/src/c8y_converter.rs
+++ b/crates/core/tedge_mapper/src/c8y/converter.rs
@@ -1,43 +1,81 @@
-use crate::c8y_fragments::{C8yAgentFragment, C8yDeviceDataFragment};
-use crate::error::*;
-use crate::size_threshold::SizeThreshold;
-use crate::{converter::*, operations::Operations};
-use c8y_smartrest::smartrest_serializer::{SmartRestSerializer, SmartRestSetSupportedOperations};
-use c8y_smartrest::{alarm, event};
+use crate::mapping::{
+ converter::*, error::*, operations::Operations, size_threshold::SizeThreshold,
+};
+use agent_interface::{
+ topic::{RequestTopic, ResponseTopic},
+ Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest,
+ RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
+};
+use async_trait::async_trait;
+use c8y_api::{http_proxy::C8YHttpProxy, json_c8y::C8yUpdateSoftwareListResponse};
+use c8y_smartrest::{
+ alarm,
+ error::SmartRestDeserializerError,
+ event::serialize_event,
+ smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
+ smartrest_serializer::{
+ CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer,
+ SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed,
+ SmartRestSetOperationToSuccessful, SmartRestSetSupportedLogType,
+ SmartRestSetSupportedOperations,
+ },
+};
use c8y_translator::json;
-use mqtt_channel::{Message, Topic};
-use std::collections::hash_map::Entry;
-use std::collections::{HashMap, HashSet};
-use std::fs::File;
-use std::io::Read;
-use std::path::Path;
-use thin_edge_json::alarm::ThinEdgeAlarm;
-use thin_edge_json::event::ThinEdgeEvent;
-use tracing::info;
+use mqtt_channel::{Message, Topic, TopicFilter};
+use std::{
+ collections::{hash_map::Entry, HashMap, HashSet},
+ fs::File,
+ io::Read,
+ path::Path,
+ process::Stdio,
+};
+use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
+use tracing::{debug, info, log::error};
+
+use super::{
+ error::CumulocityMapperError,
+ fragments::{C8yAgentFragment, C8yDeviceDataFragment},
+ mapper::CumulocityMapper,
+ topic::{C8yTopic, MapperSubscribeTopic},
+};
const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.json";
-const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations";
+const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_MEASUREMENTS_TOPIC: &str = "tedge/measurements";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
-pub struct CumulocityConverter {
+#[derive(Debug)]
+pub struct CumulocityConverter<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
pub(crate) size_threshold: SizeThreshold,
children: HashSet<String>,
pub(crate) mapper_config: MapperConfig,
device_name: String,
device_type: String,
-
alarm_converter: AlarmConverter,
+ operations: Operations,
+ http_proxy: Proxy,
}
-impl CumulocityConverter {
- pub fn new(size_threshold: SizeThreshold, device_name: String, device_type: String) -> Self {
- let topics = vec![
+impl<Proxy> CumulocityConverter<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
+ pub fn new(
+ size_threshold: SizeThreshold,
+ device_name: String,
+ device_type: String,
+ operations: Operations,
+ http_proxy: Proxy,
+ ) -> Self {
+ let mut topic_filter: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
"tedge/alarms/+/+",
@@ -47,16 +85,18 @@ impl CumulocityConverter {
.try_into()
.expect("topics that mapper should subscribe to");
+ let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap());
+
let mapper_config = MapperConfig {
- in_topic_filter: topics,
+ in_topic_filter: topic_filter,
out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
errors_topic: make_valid_topic_or_panic("tedge/errors"),
};
- let children: HashSet<String> = HashSet::new();
-
let alarm_converter = AlarmConverter::new();
+ let children: HashSet<String> = HashSet::new();
+
CumulocityConverter {
size_threshold,
children,
@@ -64,6 +104,8 @@ impl CumulocityConverter {
device_name,
device_type,
alarm_converter,
+ operations,
+ http_proxy,
}
}
@@ -84,7 +126,7 @@ impl CumulocityConverter {
self.children.insert(child_id.clone());
vec.push(Message::new(
&Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC),
- format!("101,{},{},thin-edge.io-child", child_id, child_id),
+ format!("101,{child_id},{child_id},thin-edge.io-child"),
));
}
@@ -106,49 +148,118 @@ impl CumulocityConverter {
fn try_convert_event(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
let tedge_event = ThinEdgeEvent::try_from(input.topic.name.as_str(), input.payload_str()?)?;
- let smartrest_alarm = event::serialize_event(tedge_event)?;
+ let smartrest_alarm = serialize_event(tedge_event)?;
let smartrest_topic = Topic::new_unchecked(SMARTREST_PUBLISH_TOPIC);
Ok(vec![Message::new(&smartrest_topic, smartrest_alarm)])
}
}
-impl Converter for CumulocityConverter {
+#[async_trait]
+impl<Proxy> Converter for CumulocityConverter<Proxy>
+where
+ Proxy: C8YHttpProxy,
+{
type Error = ConversionError;
fn get_mapper_config(&self) -> &MapperConfig {
&self.mapper_config
}
+ async fn try_convert(&mut self, message: &Message) -> Result<Vec<Message>, ConversionError> {
+ let () = self.size_threshold.validate(message.payload_str()?)?;
+
+ match &message.topic {
+ topic if topic.name.starts_with("tedge/measurements") => {
+ self.try_convert_measurement(message)
+ }
+ topic if topic.name.starts_with("tedge/alarms") => {
+ self.alarm_converter.try_convert_alarm(message)
+ }
+ topic if topic.name.starts_with(INTERNAL_ALARMS_TOPIC) => {
+ self.alarm_converter.process_internal_alarm(message);
+ Ok(vec![])
+ }
+ topic if topic.name.starts_with(TEDGE_EVENTS_TOPIC) => self.try_convert_event(message),
+ topic => match topic.clone().try_into() {
+ Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareListResponse)) => {
+ debug!("Software list");
+ Ok(validate_and_publish_software_list(
+ message.payload_str()?,
+ &mut self.http_proxy,
+ )
+ .await
+ .unwrap())
+ }
+ Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::SoftwareUpdateResponse)) => {
+ debug!("Software update");
+ Ok(
+ publish_operation_status(message.payload_str()?, &mut self.http_proxy)
+ .await
+ .unwrap(),
+ )
+ }
+ Ok(MapperSubscribeTopic::ResponseTopic(ResponseTopic::RestartResponse)) => {
+ Ok(publish_restart_operation_status(message.payload_str()?)
+ .await
+ .unwrap())
+ }
+ Ok(MapperSubscribeTopic::C8yTopic(_)) => {
+ debug!("Cumulocity");
+ match process_smartrest(
+ message.payload_str()?,
+ &self.operations,
+ &mut self.http_proxy,
+ )
+ .await
+ {
+ Err(
+ ref err @ CumulocityMapperError::FromSmartRestDeserializer(
+ SmartRestDeserializerError::InvalidParameter {
+ ref operation, ..
+ },
+ ),
+ ) => {
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let msg1 = Message::new(&topic, format!("501,{}", operation));
+ let msg2 = Message::new(
+ &topic,
+ format!("502,{},\"{}\"", operation, &err.to_string()),
+ );
+ error!("{}", err);
+ return Ok(vec![msg1, msg2]);
+ }
- fn try_convert(&mut self, input: &Message) -> Result<Vec<Message>, ConversionError> {
- let () = self.size_threshold.validate(input.payload_str()?)?;
-
- if input.topic.name.starts_with(TEDGE_MEASUREMENTS_TOPIC) {
- self.try_convert_measurement(input)
- } else if input.topic.name.starts_with(TEDGE_ALARMS_TOPIC) {
- self.alarm_converter.try_convert_alarm(input)
- } else if input.topic.name.starts_with(INTERNAL_ALARMS_TOPIC) {
- self.alarm_converter.process_internal_alarm(input);
- Ok(vec![])
- } else if input.topic.name.starts_with(TEDGE_EVENTS_TOPIC) {
- self.try_convert_event(input)
- } else {
- Err(ConversionError::UnsupportedTopic(input.topic.name.clone()))
+ Err(err) => {
+ error!("{}", err);
+ Ok(vec![])
+ }
+
+ Ok(msgs) => Ok(msgs),
+ }
+ }
+ _ => Err(ConversionError::UnsupportedTopic(
+ message.topic.name.clone(),
+ )),
+ },
}
}
fn try_init_messages(&self) -> Result<Vec<Message>, ConversionError> {
let inventory_fragments_message = create_inventory_fragments_message(&self.device_name)?;
-
- let supported_operations_message = create_supported_operations_fragments()?;
-
+ let supported_operations_message = create_supported_operations_fragments_message()?;
let device_data_message =
create_device_data_fragments(&self.device_name, &self.device_type)?;
+ let supported_log_types_message = create_supported_log_types_message()?;
+ let pending_operations_message = create_get_pending_operations_message()?;
+ let software_list_message = create_get_software_list_message()?;
Ok(vec![
+ inventory_fragments_message,
supported_operations_message,
device_data_message,
- inventory_fragments_message,
+ supported_log_types_message,
+ pending_operations_message,
+ software_list_message,
])
}
@@ -159,6 +270,7 @@ impl Converter for CumulocityConverter {
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
enum AlarmConverter {
Syncing {
pending_alarms_map: HashMap<String, Message>,
@@ -295,7 +407,6 @@ impl AlarmConverter {
sync_messages
}
}
-
fn create_device_data_fragments(
device_name: &str,
device_type: &str,
@@ -307,7 +418,27 @@ fn create_device_data_fragments(
Ok(Message::new(&topic, ops_msg.to_string()))
}
-fn create_supported_operations_fragments() -> Result<Message, ConversionError> {
+fn create_get_software_list_message() -> Result<Message, ConversionError> {
+ let request = SoftwareListRequest::default();
+ let topic = Topic::new(RequestTopic::SoftwareListRequest.as_str())?;
+ let payload = request.to_json().unwrap();
+ Ok(Message::new(&topic, payload))
+}
+
+fn create_get_pending_operations_message() -> Result<Message, ConversionError> {
+ let data = SmartRestGetPendingOperations::default();
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ let payload = data.to_smartrest()?;
+ Ok(Message::new(&topic, payload))
+}
+
+fn create_supported_log_types_message() -> Result<Message, ConversionError> {
+ let payload = SmartRestSetSupportedLogType::default().to_smartrest()?;
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+ Ok(Message::new(&topic, payload))
+}
+
+fn create_supported_operations_fragments_message() -> Result<Message, ConversionError> {
let ops = Operations::try_new(SUPPORTED_OPERATIONS_DIRECTORY, C8Y_CLOUD)?;
let ops = ops.get_operations_list();
let ops = ops.iter().map(|op| op as &str).collect::<Vec<&str>>();
@@ -320,10 +451,182 @@ fn create_supported_operations_fragments() -> Result<Message, ConversionError> {
fn create_inventory_fragments_message(device_name: &str) -> Result<Message, ConversionError> {
let ops_msg = get_inventory_fragments(INVENTORY_FRAGMENTS_FILE_LOCATION)?;
- let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}",));
+ let topic = Topic::new_unchecked(&format!("{INVENTORY_MANAGED_OBJECTS_TOPIC}/{device_name}"));
Ok(Message::new(&topic, ops_msg.to_string()))
}
+async fn publish_restart_operation_status(
+ json_response: &str,
+) -> Result<Vec<Message>, CumulocityMapperError> {
+ let response = RestartOperationResponse::from_json(json_response)?;
+ let topic = C8yTopic::SmartRestResponse.to_topic()?;
+
+ match response.status() {
+ OperationStatus::Executing => {
+ let smartrest_set_operation = Sma