use crate::c8y::dynamic_discovery::*;
use crate::core::{converter::*, error::*, size_threshold::SizeThreshold};
use agent_interface::{
topic::{RequestTopic, ResponseTopic},
Auth, DownloadInfo, Jsonify, OperationStatus, RestartOperationRequest,
RestartOperationResponse, SoftwareListRequest, SoftwareListResponse, SoftwareUpdateResponse,
};
use async_trait::async_trait;
use c8y_api::{
http_proxy::C8YHttpProxy,
json_c8y::{C8yCreateEvent, C8yUpdateSoftwareListResponse},
};
use c8y_smartrest::smartrest_deserializer::SmartRestRequestGeneric;
use c8y_smartrest::{
alarm,
error::SmartRestDeserializerError,
operations::{get_operation, Operations},
smartrest_deserializer::{SmartRestRestartRequest, SmartRestUpdateSoftware},
smartrest_serializer::{
CumulocitySupportedOperations, SmartRestGetPendingOperations, SmartRestSerializer,
SmartRestSetOperationToExecuting, SmartRestSetOperationToFailed,
SmartRestSetOperationToSuccessful, SmartRestSetSupportedOperations,
},
};
use c8y_translator::json;
use logged_command::LoggedCommand;
use mqtt_channel::{Message, Topic, TopicFilter};
use plugin_sm::operation_logs::OperationLogs;
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fs::File,
io::Read,
path::{Path, PathBuf},
};
use tedge_config::{get_tedge_config, ConfigSettingAccessor, LogPathSetting};
use thin_edge_json::{alarm::ThinEdgeAlarm, event::ThinEdgeEvent};
use time::format_description::well_known::Rfc3339;
use tracing::{debug, info, log::error};
use super::{
error::CumulocityMapperError,
fragments::{C8yAgentFragment, C8yDeviceDataFragment},
mapper::CumulocityMapper,
topic::{C8yTopic, MapperSubscribeTopic},
};
const C8Y_CLOUD: &str = "c8y";
const INVENTORY_FRAGMENTS_FILE_LOCATION: &str = "/etc/tedge/device/inventory.json";
const SUPPORTED_OPERATIONS_DIRECTORY: &str = "/etc/tedge/operations";
const INVENTORY_MANAGED_OBJECTS_TOPIC: &str = "c8y/inventory/managedObjects/update";
const SMARTREST_PUBLISH_TOPIC: &str = "c8y/s/us";
const TEDGE_ALARMS_TOPIC: &str = "tedge/alarms/";
const INTERNAL_ALARMS_TOPIC: &str = "c8y-internal/alarms/";
const TEDGE_EVENTS_TOPIC: &str = "tedge/events/";
const C8Y_JSON_MQTT_EVENTS_TOPIC: &str = "c8y/event/events/create";
const TEDGE_AGENT_LOG_DIR: &str = "tedge/agent";
const CREATE_EVENT_SMARTREST_CODE: u16 = 400;
#[derive(Debug)]
pub struct CumulocityConverter<Proxy>
where
Proxy: C8YHttpProxy,
{
pub(crate) size_threshold: SizeThreshold,
children: HashSet<String>,
pub(crate) mapper_config: MapperConfig,
device_name: String,
device_type: String,
alarm_converter: AlarmConverter,
pub operations: Operations,
operation_logs: OperationLogs,
http_proxy: Proxy,
}
impl<Proxy> CumulocityConverter<Proxy>
where
Proxy: C8YHttpProxy,
{
pub fn new(
size_threshold: SizeThreshold,
device_name: String,
device_type: String,
operations: Operations,
http_proxy: Proxy,
) -> Result<Self, CumulocityMapperError> {
let mut topic_filter: TopicFilter = vec![
"tedge/measurements",
"tedge/measurements/+",
"tedge/alarms/+/+",
"c8y-internal/alarms/+/+",
"tedge/events/+",
"tedge/events/+/+",
]
.try_into()
.expect("topics that mapper should subscribe to");
let () = topic_filter.add_all(CumulocityMapper::subscriptions(&operations).unwrap());
let mapper_config = MapperConfig {
in_topic_filter: topic_filter,
out_topic: make_valid_topic_or_panic("c8y/measurement/measurements/create"),
errors_topic: make_valid_topic_or_panic("tedge/errors"),
};
let alarm_converter = AlarmConverter::new();
let children: HashSet<String> = HashSet::new();
let tedge_config = get_tedge_config()?;
let logs_path = tedge_config.query(LogPathSetting)?;
let log_dir = PathBuf::from(&format!("{}/{TEDGE_AGENT_LOG_DIR}", logs_path));
let operation_logs = OperationLogs::try_new(log_dir)?;
Ok(CumulocityConverter {
size_threshold,
children,
mapper_config,
device_name,
device_type,
alarm_converter,
operations,
operation_logs,
http_proxy,
})
}
#[cfg(test)]
pub fn from_logs_path(
<