summaryrefslogtreecommitdiffstats
path: root/mapper/cumulocity/c8y_mapper/src/mapper.rs
blob: 558adaaffe6455e7c6270b58cd82c7f5be0de1ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use c8y_translator_lib::CumulocityJson;

use log;

use mqtt_client;

use tokio::task::JoinHandle;

pub const IN_TOPIC: &str = "tedge/measurements";
pub const C8Y_TOPIC_C8Y_JSON: &str = "c8y/measurement/measurements/create";
pub const ERRORS_TOPIC: &str = "tedge/errors";

pub struct Mapper {
    client: mqtt_client::Client,
    in_topic: mqtt_client::Topic,
    out_topic: mqtt_client::Topic,
    err_topic: mqtt_client::Topic,
}

impl Mapper {
    pub fn new_from_string(
        client: mqtt_client::Client,
        in_topic: &str,
        out_topic: &str,
        err_topic: &str,
    ) -> Result<Self, mqtt_client::Error> {
        Ok(Self::new(
            client,
            mqtt_client::Topic::new(in_topic)?,
            mqtt_client::Topic::new(out_topic)?,
            mqtt_client::Topic::new(err_topic)?,
        ))
    }

    fn new(
        client: mqtt_client::Client,
        in_topic: mqtt_client::Topic,
        out_topic: mqtt_client::Topic,
        err_topic: mqtt_client::Topic,
    ) -> Self {
        Self {
            client,
            in_topic,
            out_topic,
            err_topic,
        }
    }

    fn subscribe_errors(&self) -> JoinHandle<()> {
        let mut errors = self.client.subscribe_errors();
        tokio::spawn(async move {
            while let Some(error) = errors.next().await {
                log::error!("{}", error);
            }
        })
    }

    async fn subscribe_messages(&self) -> Result<(), mqtt_client::Error> {
        let mut messages = self.client.subscribe(self.in_topic.filter()).await?;
        while let Some(message) = messages.next().await {
            log::debug!("Mapping {:?}", message);
            match Mapper::map(&message.payload) {
                Ok(mapped) => {
                    self.client
                        .publish(mqtt_client::Message::new(&self.out_topic, mapped))
                        .await?;
                }
                Err(error) => {
                    log::debug!("Mapping error: {}", error);
                    self.client
                        .publish(mqtt_client::Message::new(
                            &self.err_topic,
                            error.to_string(),
                        ))
                        .await?;
                }
            }
        }
        Ok(())
    }
    pub async fn run(self) -> Result<(), mqtt_client::Error> {
        let errors_handle = self.subscribe_errors();
        let messages_handle = self.subscribe_messages();
        messages_handle.await?;
        errors_handle
            .await
            .map_err(|_| mqtt_client::Error::JoinError)?;
        Ok(())
    }

    fn map(input: &[u8]) -> Result<Vec<u8>, c8y_translator_lib::ThinEdgeJsonError> {
        CumulocityJson::from_thin_edge_json(input)
    }
}