1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
use crate::{
c8y::converter::CumulocityConverter,
core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold},
};
use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
use c8y_api::http_proxy::JwtAuthHttpProxy;
use c8y_smartrest::operations::Operations;
use mqtt_channel::{Config, TopicFilter};
use tedge_config::{
ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
};
use tracing::{info, info_span, Instrument};
use super::topic::C8yTopic;
const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";
pub struct CumulocityMapper {}
impl CumulocityMapper {
pub fn new() -> CumulocityMapper {
CumulocityMapper {}
}
pub fn subscriptions(operations: &Operations) -> Result<TopicFilter, anyhow::Error> {
let mut topic_filter = TopicFilter::new(ResponseTopic::SoftwareListResponse.as_str())?;
topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?;
topic_filter.add(C8yTopic::SmartRestRequest.as_str())?;
topic_filter.add(ResponseTopic::RestartResponse.as_str())?;
for topic in operations.topics_for_operations() {
topic_filter.add(&topic)?
}
Ok(topic_filter)
}
pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
info!("Initialize tedge sm mapper session");
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mqtt_topic = Self::subscriptions(&operations)?;
let config = Config::default()
.with_session_name(CUMULOCITY_MAPPER_NAME)
.with_clean_session(false)
.with_subscriptions(mqtt_topic);
mqtt_channel::init_session(&config).await?;
Ok(())
}
pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
info!("Clear tedge sm mapper session");
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let mqtt_topic = Self::subscriptions(&operations)?;
let config = Config::default()
.with_session_name(CUMULOCITY_MAPPER_NAME)
.with_clean_session(true)
.with_subscriptions(mqtt_topic);
mqtt_channel::clear_session(&config).await?;
Ok(())
}
}
#[async_trait]
impl TEdgeComponent for CumulocityMapper {
async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
let size_threshold = SizeThreshold(16 * 1024);
let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
let device_name = tedge_config.query(DeviceIdSetting)?;
let device_type = tedge_config.query(DeviceTypeSetting)?;
let mqtt_port = tedge_config.query(MqttPortSetting)?.into();
let converter = Box::new(CumulocityConverter::new(
size_threshold,
device_name,
device_type,
operations,
http_proxy,
));
let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;
mapper
.run()
.instrument(info_span!(CUMULOCITY_MAPPER_NAME))
.await?;
Ok(())
}
}
|