1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
use c8y_translator_lib::CumulocityJson;
use log;
use mqtt_client;
use tokio::task::JoinHandle;
pub const IN_TOPIC: &str = "tedge/measurements";
pub const C8Y_TOPIC_C8Y_JSON: &str = "c8y/measurement/measurements/create";
pub const ERRORS_TOPIC: &str = "tedge/errors";
pub struct Mapper {
client: mqtt_client::Client,
in_topic: mqtt_client::Topic,
out_topic: mqtt_client::Topic,
err_topic: mqtt_client::Topic,
}
impl Mapper {
pub fn new_from_string(
client: mqtt_client::Client,
in_topic: &str,
out_topic: &str,
err_topic: &str,
) -> Result<Self, mqtt_client::Error> {
Ok(Self::new(
client,
mqtt_client::Topic::new(in_topic)?,
mqtt_client::Topic::new(out_topic)?,
mqtt_client::Topic::new(err_topic)?,
))
}
fn new(
client: mqtt_client::Client,
in_topic: mqtt_client::Topic,
out_topic: mqtt_client::Topic,
err_topic: mqtt_client::Topic,
) -> Self {
Self {
client,
in_topic,
out_topic,
err_topic,
}
}
fn subscribe_errors(&self) -> JoinHandle<()> {
let mut errors = self.client.subscribe_errors();
tokio::spawn(async move {
while let Some(error) = errors.next().await {
log::error!("{}", error);
}
})
}
async fn subscribe_messages(&self) -> Result<(), mqtt_client::Error> {
let mut messages = self.client.subscribe(self.in_topic.filter()).await?;
while let Some(message) = messages.next().await {
log::debug!("Mapping {:?}", message);
match Mapper::map(&message.payload) {
Ok(mapped) => {
self.client
.publish(mqtt_client::Message::new(&self.out_topic, mapped))
.await?;
}
Err(error) => {
log::debug!("Mapping error: {}", error);
self.client
.publish(mqtt_client::Message::new(
&self.err_topic,
error.to_string(),
))
.await?;
}
}
}
Ok(())
}
pub async fn run(self) -> Result<(), mqtt_client::Error> {
let errors_handle = self.subscribe_errors();
let messages_handle = self.subscribe_messages();
messages_handle.await?;
errors_handle
.await
.map_err(|_| mqtt_client::Error::JoinError)?;
Ok(())
}
fn map(input: &[u8]) -> Result<Vec<u8>, c8y_translator_lib::ThinEdgeJsonError> {
CumulocityJson::from_thin_edge_json(input)
}
}
|