summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_mapper/src/c8y/mapper.rs
blob: 315449fec55a6b7ed41357006e5e75b59320149f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crate::{
    c8y::converter::CumulocityConverter,
    core::{component::TEdgeComponent, mapper::create_mapper, size_threshold::SizeThreshold},
};

use agent_interface::topic::ResponseTopic;
use async_trait::async_trait;
use c8y_api::http_proxy::JwtAuthHttpProxy;
use c8y_smartrest::operations::Operations;
use mqtt_channel::{Config, TopicFilter};
use tedge_config::{
    ConfigSettingAccessor, DeviceIdSetting, DeviceTypeSetting, MqttPortSetting, TEdgeConfig,
};
use tracing::{info, info_span, Instrument};

use super::topic::C8yTopic;

const CUMULOCITY_MAPPER_NAME: &str = "tedge-mapper-c8y";

pub struct CumulocityMapper {}

impl CumulocityMapper {
    pub fn new() -> CumulocityMapper {
        CumulocityMapper {}
    }

    pub fn subscriptions(operations: &Operations) -> Result<TopicFilter, anyhow::Error> {
        let mut topic_filter = TopicFilter::new(ResponseTopic::SoftwareListResponse.as_str())?;
        topic_filter.add(ResponseTopic::SoftwareUpdateResponse.as_str())?;
        topic_filter.add(C8yTopic::SmartRestRequest.as_str())?;
        topic_filter.add(ResponseTopic::RestartResponse.as_str())?;

        for topic in operations.topics_for_operations() {
            topic_filter.add(&topic)?
        }

        Ok(topic_filter)
    }

    pub async fn init_session(&mut self) -> Result<(), anyhow::Error> {
        info!("Initialize tedge sm mapper session");
        let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
        let mqtt_topic = Self::subscriptions(&operations)?;
        let config = Config::default()
            .with_session_name(CUMULOCITY_MAPPER_NAME)
            .with_clean_session(false)
            .with_subscriptions(mqtt_topic);
        mqtt_channel::init_session(&config).await?;
        Ok(())
    }

    pub async fn clear_session(&mut self) -> Result<(), anyhow::Error> {
        info!("Clear tedge sm mapper session");
        let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
        let mqtt_topic = Self::subscriptions(&operations)?;
        let config = Config::default()
            .with_session_name(CUMULOCITY_MAPPER_NAME)
            .with_clean_session(true)
            .with_subscriptions(mqtt_topic);
        mqtt_channel::clear_session(&config).await?;
        Ok(())
    }
}

#[async_trait]
impl TEdgeComponent for CumulocityMapper {
    async fn start(&self, tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
        let size_threshold = SizeThreshold(16 * 1024);

        let operations = Operations::try_new("/etc/tedge/operations", "c8y")?;
        let http_proxy = JwtAuthHttpProxy::try_new(&tedge_config).await?;
        let device_name = tedge_config.query(DeviceIdSetting)?;
        let device_type = tedge_config.query(DeviceTypeSetting)?;
        let mqtt_port = tedge_config.query(MqttPortSetting)?.into();

        let converter = Box::new(CumulocityConverter::new(
            size_threshold,
            device_name,
            device_type,
            operations,
            http_proxy,
        ));

        let mut mapper = create_mapper(CUMULOCITY_MAPPER_NAME, mqtt_port, converter).await?;

        mapper
            .run()
            .instrument(info_span!(CUMULOCITY_MAPPER_NAME))
            .await?;

        Ok(())
    }
}