use crate::mapper::mqtt_config;
use crate::sm_c8y_mapper::json_c8y::{C8yCreateEvent, C8yManagedObject};
use crate::sm_c8y_mapper::{error::*, json_c8y::C8yUpdateSoftwareListResponse, topic::*};
use crate::{component::TEdgeComponent, sm_c8y_mapper::json_c8y::InternalIdResponse};
use async_trait::async_trait;
use c8y_smartrest::smartrest_deserializer::SmartRestLogRequest;
use c8y_smartrest::smartrest_serializer::CumulocitySupportedOperations;
use c8y_smartrest::{
error::SmartRestDeserializerError,
smartrest_deserializer::{SmartRestJwtResponse, SmartRestUpdateSoftware},
smartrest_serializer::{
SmartRestGetPendingOperations, SmartRestSerializer, SmartRestSetOperationToExecuting,
SmartRestSetOperationToFailed, SmartRestSetOperationToSuccessful,
SmartRestSetSupportedLogType, SmartRestSetSupportedOperations,
},
};
use chrono::{DateTime, FixedOffset, Local};
use json_sm::{
Auth, DownloadInfo, Jsonify, SoftwareListRequest, SoftwareListResponse,
SoftwareOperationStatus, SoftwareUpdateResponse,
};
use mqtt_client::{Client, MqttClient, MqttClientError, MqttMessageStream, Topic, TopicFilter};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{convert::TryInto, time::Duration};
use tedge_config::{C8yUrlSetting, ConfigSettingAccessorStringExt, DeviceIdSetting, TEdgeConfig};
use tokio::time::Instant;
use tracing::{debug, error, info, instrument};
const AGENT_LOG_DIR: &str = "/var/log/tedge/agent";
const RETRY_TIMEOUT_SECS: u64 = 60;
pub struct CumulocitySoftwareManagementMapper {}
impl CumulocitySoftwareManagementMapper {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl TEdgeComponent for CumulocitySoftwareManagementMapper {
#[instrument(skip(self, tedge_config), name = "sm-c8y-mapper")]
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let mqtt_config = mqtt_config(&tedge_config)?;
let mqtt_client = Client::connect("SM-C8Y-Mapper", &mqtt_config).await?;
let mut sm_mapper = CumulocitySoftwareManagement::new(mqtt_client, tedge_config);
let messages = sm_mapper.subscribe().await?;
let () = sm_mapper.init().await?;
let () = sm_mapper.run(messages).await?;
Ok(())
}
}
#[derive(Debug)]
pub struct CumulocitySoftwareManagement {
pub client: Client,
config: TEdgeConfig,
c8y_internal_id: String,
}
impl CumulocitySoftwareManagement {
pub fn new(client: Client, config: TEdgeConfig) -> Self {
Self {
client,
config,
c8y_internal_id: "".into(),
}
}
pub async fn subscribe(&self) -> Result<Box<dyn MqttMessageStream>, anyhow::Error> {
let mut topic_filter = TopicFilter::new(IncomingTopic::SoftwareListResponse.as_str())?;
topic_filter.add(IncomingTopic::SoftwareUpdateResponse.as_str())?;
topic_filter.add(IncomingTopic::SmartRestRequest.as_str())?;
let messages = self.client.subscribe(topic_filter).await?;
Ok(messages)
}
#[instrument(skip(self), name = "init")]
async fn init(&mut self) -> Result<(), anyhow::Error> {
info!("Initialisation");
while self.c8y_internal_id.is_empty() {
if let Err(error) = self.try_get_and_set_internal_id().await {
error!(
"An error ocurred while retrieving internal Id, operation will retry in {} seconds and mapper will reinitialise.\n Error: {:?}",
RETRY_TIMEOUT_SECS, error
);
tokio::time::sleep_until(Instant::now() + Duration::from_secs(RETRY_TIMEOUT_SECS))
.await;
continue;
};
}
info!("Initialisation done.");
Ok(())
}
pub async fn run(&self, mut messages: Box<dyn MqttMessageStream>) -> Result<(), anyhow::Error> {
info!("Running");
let () = self.publish_supported_operations().await?;
let () = self.publish_supported_log_types().await?;
let () = self.publish_get_pending_operations